Job Classes
All job types inherit from BaseCall and provide specialized configurations for different inference tasks.
LLMComplete
Single-turn text generation using a prompt.
LLMComplete
dataclass
Bases: BaseCall
LLM Completion job - for text completion, generation, and single-turn tasks.
This class is designed for generation endpoints that accept a single prompt
without system context. It's optimized for:
- Text completion
- Code generation
- Creative writing
- Single-turn responses
- Q&A without conversation context
Note: This does NOT support system messages. Use LLMChat for conversational
interactions with system/user/assistant message flows.
Example
job = LLMComplete(model="llama3.3")
job.set_prompt("Write a haiku about Python")
job.temperature = 0.9
Source code in microdc/jobs/llm_complete.py
| @dataclass
class LLMComplete(BaseCall):
"""
LLM Completion job - for text completion, generation, and single-turn tasks.
This class is designed for generation endpoints that accept a single prompt
without system context. It's optimized for:
- Text completion
- Code generation
- Creative writing
- Single-turn responses
- Q&A without conversation context
Note: This does NOT support system messages. Use LLMChat for conversational
interactions with system/user/assistant message flows.
Example:
>>> job = LLMComplete(model="llama3.3")
>>> job.set_prompt("Write a haiku about Python")
>>> job.temperature = 0.9
"""
model: str = ""
prompt: str = ""
file_tokens: List[str] = field(default_factory=list)
input_modalities: List[str] = field(default_factory=lambda: ["text"])
output_modalities: List[str] = field(default_factory=lambda: ["text"])
temperature: float = 0.7
max_tokens: Optional[int] = None
top_p: float = 1.0
top_k: Optional[int] = None
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
stop: Optional[List[str]] = None
stream: bool = False
def __post_init__(self) -> None:
self.type = "llm"
def set_prompt(self, prompt: str) -> None:
"""Set the generation prompt."""
self.prompt = prompt
def add_file(self, file_token: str) -> None:
"""
Add a file token to the completion job.
Args:
file_token: File ID from client.upload_file() response
"""
self.file_tokens.append(file_token)
def add_files(self, file_tokens: List[str]) -> None:
"""
Add multiple file tokens to the completion job.
Args:
file_tokens: List of file IDs from client.upload_file() responses
"""
self.file_tokens.extend(file_tokens)
def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with prompt and parameters
inner_payload: Dict[str, Any] = {
"prompt": self.prompt,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"stream": self.stream,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
if self.top_k is not None:
inner_payload["top_k"] = self.top_k
if self.stop is not None:
inner_payload["stop"] = self.stop
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "llm",
"model": self.model,
"llm_interaction_type": "generation",
"input_modalities": self.input_modalities,
"output_modalities": self.output_modalities,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0,
}
if self.file_tokens:
payload["file_ids"] = self.file_tokens
return payload
def validate(self) -> None:
"""Validate LLM completion job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.prompt:
raise ValidationError("Prompt is required for LLMComplete")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
if not 0.0 <= self.top_p <= 1.0:
raise ValidationError("top_p must be between 0.0 and 1.0")
if self.max_tokens is not None and self.max_tokens <= 0:
raise ValidationError("max_tokens must be positive")
if self.top_k is not None and self.top_k <= 0:
raise ValidationError("top_k must be positive")
|
set_prompt(prompt)
Set the generation prompt.
Source code in microdc/jobs/llm_complete.py
| def set_prompt(self, prompt: str) -> None:
"""Set the generation prompt."""
self.prompt = prompt
|
add_file(file_token)
Add a file token to the completion job.
Parameters:
| Name |
Type |
Description |
Default |
file_token
|
str
|
File ID from client.upload_file() response
|
required
|
Source code in microdc/jobs/llm_complete.py
| def add_file(self, file_token: str) -> None:
"""
Add a file token to the completion job.
Args:
file_token: File ID from client.upload_file() response
"""
self.file_tokens.append(file_token)
|
add_files(file_tokens)
Add multiple file tokens to the completion job.
Parameters:
| Name |
Type |
Description |
Default |
file_tokens
|
List[str]
|
List of file IDs from client.upload_file() responses
|
required
|
Source code in microdc/jobs/llm_complete.py
| def add_files(self, file_tokens: List[str]) -> None:
"""
Add multiple file tokens to the completion job.
Args:
file_tokens: List of file IDs from client.upload_file() responses
"""
self.file_tokens.extend(file_tokens)
|
to_api_payload()
Convert to API request format matching JobRequest schema.
Source code in microdc/jobs/llm_complete.py
| def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with prompt and parameters
inner_payload: Dict[str, Any] = {
"prompt": self.prompt,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"stream": self.stream,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
if self.top_k is not None:
inner_payload["top_k"] = self.top_k
if self.stop is not None:
inner_payload["stop"] = self.stop
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "llm",
"model": self.model,
"llm_interaction_type": "generation",
"input_modalities": self.input_modalities,
"output_modalities": self.output_modalities,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0,
}
if self.file_tokens:
payload["file_ids"] = self.file_tokens
return payload
|
validate()
Validate LLM completion job configuration.
Source code in microdc/jobs/llm_complete.py
| def validate(self) -> None:
"""Validate LLM completion job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.prompt:
raise ValidationError("Prompt is required for LLMComplete")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
if not 0.0 <= self.top_p <= 1.0:
raise ValidationError("top_p must be between 0.0 and 1.0")
if self.max_tokens is not None and self.max_tokens <= 0:
raise ValidationError("max_tokens must be positive")
if self.top_k is not None and self.top_k <= 0:
raise ValidationError("top_k must be positive")
|
LLMChat
Multi-turn conversational AI with system/user/assistant messages.
LLMChat
dataclass
Bases: BaseCall
LLM Chat job - for conversational and multi-turn interactions.
This class supports full message-based interactions with system, user,
and assistant roles. It's designed for:
- Conversational AI
- Multi-turn dialogue
- Question answering with context
- Assistant-style interactions
- Context-aware responses
Example
chat = LLMChat(model="gpt-4")
chat.set_system("You are a helpful assistant")
chat.add_user_message("What is Python?")
chat.add_assistant_message("Python is a programming language")
chat.add_user_message("Tell me more")
Source code in microdc/jobs/llm_chat.py
| @dataclass
class LLMChat(BaseCall):
"""
LLM Chat job - for conversational and multi-turn interactions.
This class supports full message-based interactions with system, user,
and assistant roles. It's designed for:
- Conversational AI
- Multi-turn dialogue
- Question answering with context
- Assistant-style interactions
- Context-aware responses
Example:
>>> chat = LLMChat(model="gpt-4")
>>> chat.set_system("You are a helpful assistant")
>>> chat.add_user_message("What is Python?")
>>> chat.add_assistant_message("Python is a programming language")
>>> chat.add_user_message("Tell me more")
"""
model: str = ""
system: str = ""
messages: List[Dict[str, str]] = field(default_factory=list)
input_modalities: List[str] = field(default_factory=lambda: ["text"])
output_modalities: List[str] = field(default_factory=lambda: ["text"])
temperature: float = 0.7
max_tokens: Optional[int] = None
top_p: float = 1.0
top_k: Optional[int] = None
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
stop: Optional[List[str]] = None
stream: bool = False
def __post_init__(self) -> None:
self.type = "llm"
def set_system(self, content: str) -> None:
"""Set the system message for the conversation context."""
self.system = content
def add_user_message(self, content: str) -> None:
"""Add a user message to the conversation."""
self.messages.append({"role": "user", "content": content})
def add_assistant_message(self, content: str) -> None:
"""Add an assistant message to the conversation."""
self.messages.append({"role": "assistant", "content": content})
def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with system, messages and parameters
inner_payload: Dict[str, Any] = {
"system": self.system,
"messages": self.messages,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"stream": self.stream,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
if self.top_k is not None:
inner_payload["top_k"] = self.top_k
if self.stop is not None:
inner_payload["stop"] = self.stop
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "llm",
"model": self.model,
"llm_interaction_type": "chat",
"input_modalities": self.input_modalities,
"output_modalities": self.output_modalities,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0,
}
return payload
def validate(self) -> None:
"""Validate LLM chat job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.messages:
raise ValidationError("At least one message is required for LLMChat")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
if not 0.0 <= self.top_p <= 1.0:
raise ValidationError("top_p must be between 0.0 and 1.0")
if self.max_tokens is not None and self.max_tokens <= 0:
raise ValidationError("max_tokens must be positive")
if self.top_k is not None and self.top_k <= 0:
raise ValidationError("top_k must be positive")
|
set_system(content)
Set the system message for the conversation context.
Source code in microdc/jobs/llm_chat.py
| def set_system(self, content: str) -> None:
"""Set the system message for the conversation context."""
self.system = content
|
add_user_message(content)
Add a user message to the conversation.
Source code in microdc/jobs/llm_chat.py
| def add_user_message(self, content: str) -> None:
"""Add a user message to the conversation."""
self.messages.append({"role": "user", "content": content})
|
add_assistant_message(content)
Add an assistant message to the conversation.
Source code in microdc/jobs/llm_chat.py
| def add_assistant_message(self, content: str) -> None:
"""Add an assistant message to the conversation."""
self.messages.append({"role": "assistant", "content": content})
|
to_api_payload()
Convert to API request format matching JobRequest schema.
Source code in microdc/jobs/llm_chat.py
| def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with system, messages and parameters
inner_payload: Dict[str, Any] = {
"system": self.system,
"messages": self.messages,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"stream": self.stream,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
if self.top_k is not None:
inner_payload["top_k"] = self.top_k
if self.stop is not None:
inner_payload["stop"] = self.stop
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "llm",
"model": self.model,
"llm_interaction_type": "chat",
"input_modalities": self.input_modalities,
"output_modalities": self.output_modalities,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0,
}
return payload
|
validate()
Validate LLM chat job configuration.
Source code in microdc/jobs/llm_chat.py
| def validate(self) -> None:
"""Validate LLM chat job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.messages:
raise ValidationError("At least one message is required for LLMChat")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
if not 0.0 <= self.top_p <= 1.0:
raise ValidationError("top_p must be between 0.0 and 1.0")
if self.max_tokens is not None and self.max_tokens <= 0:
raise ValidationError("max_tokens must be positive")
if self.top_k is not None and self.top_k <= 0:
raise ValidationError("top_k must be positive")
|
LLMEmbed
Text embedding generation for semantic search and RAG.
LLMEmbed
dataclass
Bases: BaseCall
Embedding generation job configuration.
Use this for generating text embeddings for semantic search, similarity,
clustering, and other vector-based operations.
Example
job = LLMEmbed(model="text-embedding-ada-002")
job.add_texts(["Hello world", "Goodbye world"])
job.normalize = True
Source code in microdc/jobs/embed_call.py
| @dataclass
class LLMEmbed(BaseCall):
"""
Embedding generation job configuration.
Use this for generating text embeddings for semantic search, similarity,
clustering, and other vector-based operations.
Example:
>>> job = LLMEmbed(model="text-embedding-ada-002")
>>> job.add_texts(["Hello world", "Goodbye world"])
>>> job.normalize = True
"""
model: str = ""
input_texts: List[str] = field(default_factory=list)
dimensions: Optional[int] = None
normalize: bool = True
encoding_format: str = "float" # float or base64
def __post_init__(self) -> None:
self.type = "embed"
def add_text(self, text: str) -> None:
"""Add a text to the embedding batch."""
self.input_texts.append(text)
def add_texts(self, texts: List[str]) -> None:
"""Add multiple texts to the embedding batch."""
self.input_texts.extend(texts)
def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with texts and parameters
inner_payload: Dict[str, Any] = {
"texts": self.input_texts, # API expects 'texts' field
"normalize": self.normalize,
"encoding_format": self.encoding_format,
}
if self.dimensions is not None:
inner_payload["dimensions"] = self.dimensions
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "embed",
"model": self.model,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0, # Placeholder for future feature
}
return payload
def validate(self) -> None:
"""Validate embedding job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.input_texts:
raise ValidationError("At least one input text is required")
if any(not text.strip() for text in self.input_texts):
raise ValidationError("All input texts must be non-empty")
if self.encoding_format not in ("float", "base64"):
raise ValidationError("encoding_format must be 'float' or 'base64'")
if self.dimensions is not None and self.dimensions <= 0:
raise ValidationError("dimensions must be positive")
|
add_text(text)
Add a text to the embedding batch.
Source code in microdc/jobs/embed_call.py
| def add_text(self, text: str) -> None:
"""Add a text to the embedding batch."""
self.input_texts.append(text)
|
add_texts(texts)
Add multiple texts to the embedding batch.
Source code in microdc/jobs/embed_call.py
| def add_texts(self, texts: List[str]) -> None:
"""Add multiple texts to the embedding batch."""
self.input_texts.extend(texts)
|
to_api_payload()
Convert to API request format matching JobRequest schema.
Source code in microdc/jobs/embed_call.py
| def to_api_payload(self) -> Dict[str, Any]:
"""Convert to API request format matching JobRequest schema."""
# Build inner payload with texts and parameters
inner_payload: Dict[str, Any] = {
"texts": self.input_texts, # API expects 'texts' field
"normalize": self.normalize,
"encoding_format": self.encoding_format,
}
if self.dimensions is not None:
inner_payload["dimensions"] = self.dimensions
# Build outer payload matching JobRequest schema
payload: Dict[str, Any] = {
"type": "embed",
"model": self.model,
"payload": inner_payload,
"priority": self.priority,
"estimated_cost": 0.0, # Placeholder for future feature
}
return payload
|
validate()
Validate embedding job configuration.
Source code in microdc/jobs/embed_call.py
| def validate(self) -> None:
"""Validate embedding job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.input_texts:
raise ValidationError("At least one input text is required")
if any(not text.strip() for text in self.input_texts):
raise ValidationError("All input texts must be non-empty")
if self.encoding_format not in ("float", "base64"):
raise ValidationError("encoding_format must be 'float' or 'base64'")
if self.dimensions is not None and self.dimensions <= 0:
raise ValidationError("dimensions must be positive")
|
DocumentCall
Document processing for file-based analysis workflows.
DocumentCall
dataclass
Bases: BaseCall
Document processing job configuration for file-only processing models.
Document processing models like docling analyze files without requiring text prompts.
Files MUST be uploaded before creating the document processing job.
Workflow
- Upload file(s) using client.upload_file()
- Get file token(s) from upload response
- Create DocumentCall with model and file tokens
- Submit job using client.send_job()
Example
Step 1: Upload file
upload_result = client.upload_file("document.pdf")
file_token = upload_result['id']
Step 2: Create document processing job
job = DocumentCall(model="docling")
job.add_file(file_token)
Step 3: Submit job
job_id = client.send_job(job)
Source code in microdc/jobs/document_call.py
| @dataclass
class DocumentCall(BaseCall):
"""
Document processing job configuration for file-only processing models.
Document processing models like docling analyze files without requiring text prompts.
Files MUST be uploaded before creating the document processing job.
Workflow:
1. Upload file(s) using client.upload_file()
2. Get file token(s) from upload response
3. Create DocumentCall with model and file tokens
4. Submit job using client.send_job()
Example:
>>> # Step 1: Upload file
>>> upload_result = client.upload_file("document.pdf")
>>> file_token = upload_result['id']
>>>
>>> # Step 2: Create document processing job
>>> job = DocumentCall(model="docling")
>>> job.add_file(file_token)
>>>
>>> # Step 3: Submit job
>>> job_id = client.send_job(job)
"""
model: str = ""
file_tokens: List[str] = field(default_factory=list)
max_tokens: Optional[int] = None
temperature: float = 0.7
def __post_init__(self) -> None:
self.type = "document"
def add_file(self, file_token: str) -> None:
"""
Add a file token to the document processing job.
Args:
file_token: Token from file upload response
Note:
Files must be uploaded using client.upload_file() before
adding their tokens to the job.
"""
self.file_tokens.append(file_token)
def add_files(self, file_tokens: List[str]) -> None:
"""
Add multiple file tokens to the document processing job.
Args:
file_tokens: List of tokens from file upload responses
"""
self.file_tokens.extend(file_tokens)
def to_api_payload(self) -> Dict[str, Any]:
"""
Convert to API request format.
Note: Document processing jobs do NOT include 'input' field in payload.
They only process uploaded files referenced by file_ids at the top level.
The API expects file_ids (not file_tokens) at the top level of the request.
"""
# Build inner payload with document processing parameters
inner_payload: Dict[str, Any] = {
"temperature": self.temperature,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
# Build outer payload matching JobRequest schema
# file_ids is at top level, not in payload
payload: Dict[str, Any] = {
"type": "document",
"model": self.model,
"payload": inner_payload,
"priority": self.priority,
"file_ids": self.file_tokens, # API expects 'file_ids' at top level
"estimated_cost": 0.0, # Placeholder for future feature
}
if self.timeout is not None:
payload["timeout"] = self.timeout
if self.callback_url is not None:
payload["callback_url"] = self.callback_url
if self.metadata:
payload["metadata"] = self.metadata
return payload
def validate(self) -> None:
"""Validate document processing job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.file_tokens:
raise ValidationError(
"At least one file token is required. Upload files first using client.upload_file()"
)
if self.max_tokens is not None and self.max_tokens < 1:
raise ValidationError("max_tokens must be positive")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
|
add_file(file_token)
Add a file token to the document processing job.
Parameters:
| Name |
Type |
Description |
Default |
file_token
|
str
|
Token from file upload response
|
required
|
Note
Files must be uploaded using client.upload_file() before
adding their tokens to the job.
Source code in microdc/jobs/document_call.py
| def add_file(self, file_token: str) -> None:
"""
Add a file token to the document processing job.
Args:
file_token: Token from file upload response
Note:
Files must be uploaded using client.upload_file() before
adding their tokens to the job.
"""
self.file_tokens.append(file_token)
|
add_files(file_tokens)
Add multiple file tokens to the document processing job.
Parameters:
| Name |
Type |
Description |
Default |
file_tokens
|
List[str]
|
List of tokens from file upload responses
|
required
|
Source code in microdc/jobs/document_call.py
| def add_files(self, file_tokens: List[str]) -> None:
"""
Add multiple file tokens to the document processing job.
Args:
file_tokens: List of tokens from file upload responses
"""
self.file_tokens.extend(file_tokens)
|
to_api_payload()
Convert to API request format.
Note: Document processing jobs do NOT include 'input' field in payload.
They only process uploaded files referenced by file_ids at the top level.
The API expects file_ids (not file_tokens) at the top level of the request.
Source code in microdc/jobs/document_call.py
| def to_api_payload(self) -> Dict[str, Any]:
"""
Convert to API request format.
Note: Document processing jobs do NOT include 'input' field in payload.
They only process uploaded files referenced by file_ids at the top level.
The API expects file_ids (not file_tokens) at the top level of the request.
"""
# Build inner payload with document processing parameters
inner_payload: Dict[str, Any] = {
"temperature": self.temperature,
}
if self.max_tokens is not None:
inner_payload["max_tokens"] = self.max_tokens
# Build outer payload matching JobRequest schema
# file_ids is at top level, not in payload
payload: Dict[str, Any] = {
"type": "document",
"model": self.model,
"payload": inner_payload,
"priority": self.priority,
"file_ids": self.file_tokens, # API expects 'file_ids' at top level
"estimated_cost": 0.0, # Placeholder for future feature
}
if self.timeout is not None:
payload["timeout"] = self.timeout
if self.callback_url is not None:
payload["callback_url"] = self.callback_url
if self.metadata:
payload["metadata"] = self.metadata
return payload
|
validate()
Validate document processing job configuration.
Source code in microdc/jobs/document_call.py
| def validate(self) -> None:
"""Validate document processing job configuration."""
if not self.model:
raise ValidationError("Model must be specified")
if not self.file_tokens:
raise ValidationError(
"At least one file token is required. Upload files first using client.upload_file()"
)
if self.max_tokens is not None and self.max_tokens < 1:
raise ValidationError("max_tokens must be positive")
if not 0.0 <= self.temperature <= 2.0:
raise ValidationError("Temperature must be between 0.0 and 2.0")
|
BaseCall
Abstract base class for all job types.
BaseCall
dataclass
Bases: ABC
Base class for all MicroDC job types.
Attributes:
| Name |
Type |
Description |
type |
str
|
Job type identifier (set by subclasses)
|
metadata |
Dict[str, Any]
|
User-defined metadata for tracking
|
priority |
str
|
Job priority (standard, high, low)
|
timeout |
Optional[int]
|
Maximum execution time in seconds
|
callback_url |
Optional[str]
|
Optional webhook URL for notifications
|
Source code in microdc/jobs/base.py
| @dataclass
class BaseCall(ABC):
"""
Base class for all MicroDC job types.
Attributes:
type: Job type identifier (set by subclasses)
metadata: User-defined metadata for tracking
priority: Job priority (standard, high, low)
timeout: Maximum execution time in seconds
callback_url: Optional webhook URL for notifications
"""
type: str = field(init=False)
metadata: Dict[str, Any] = field(default_factory=dict)
priority: str = "standard"
timeout: Optional[int] = None
callback_url: Optional[str] = None
# Internal tracking (set by client)
_job_id: Optional[str] = field(default=None, init=False, repr=False)
_submitted_at: Optional[datetime] = field(default=None, init=False, repr=False)
@abstractmethod
def to_api_payload(self) -> Dict[str, Any]:
"""
Convert job to API request payload.
Returns:
Dict containing API-compatible job specification
"""
pass
@abstractmethod
def validate(self) -> None:
"""
Validate job configuration before submission.
Raises:
ValidationError: If configuration is invalid
"""
pass
|
to_api_payload()
abstractmethod
Convert job to API request payload.
Returns:
| Type |
Description |
Dict[str, Any]
|
Dict containing API-compatible job specification
|
Source code in microdc/jobs/base.py
| @abstractmethod
def to_api_payload(self) -> Dict[str, Any]:
"""
Convert job to API request payload.
Returns:
Dict containing API-compatible job specification
"""
pass
|
validate()
abstractmethod
Validate job configuration before submission.
Raises:
Source code in microdc/jobs/base.py
| @abstractmethod
def validate(self) -> None:
"""
Validate job configuration before submission.
Raises:
ValidationError: If configuration is invalid
"""
pass
|
JobDetails
Data class containing job status and results.
JobDetails
dataclass
Complete job information including status and results.
Maps to JobResponse schema from the API.
Attributes:
| Name |
Type |
Description |
job_id |
str
|
Unique job identifier (UUID)
|
type |
str
|
Job type (LLM, EMBED, etc.)
|
status |
str
|
Current status (queued, processing, completed, failed, cancelled)
|
model |
str
|
|
created_at |
datetime
|
|
started_at |
Optional[datetime]
|
Job start timestamp (if started)
|
completed_at |
Optional[datetime]
|
Job completion timestamp (if completed)
|
estimated_cost |
Optional[float]
|
Estimated cost in credits
|
actual_cost |
Optional[float]
|
Actual cost in credits (if completed)
|
result |
Optional[Any]
|
Job results (if completed)
|
error_message |
Optional[str]
|
Error message (if failed)
|
metadata |
Optional[Dict[str, Any]]
|
|
priority |
str
|
|
retry_count |
int
|
Number of times job has been retried
|
user_id |
Optional[str]
|
User ID who submitted the job
|
Source code in microdc/jobs/job_details.py
| @dataclass
class JobDetails:
"""
Complete job information including status and results.
Maps to JobResponse schema from the API.
Attributes:
job_id: Unique job identifier (UUID)
type: Job type (LLM, EMBED, etc.)
status: Current status (queued, processing, completed, failed, cancelled)
model: Model used for inference
created_at: Job creation timestamp
started_at: Job start timestamp (if started)
completed_at: Job completion timestamp (if completed)
estimated_cost: Estimated cost in credits
actual_cost: Actual cost in credits (if completed)
result: Job results (if completed)
error_message: Error message (if failed)
metadata: User-defined metadata
priority: Job priority
retry_count: Number of times job has been retried
user_id: User ID who submitted the job
"""
job_id: str
type: str
status: str
model: str
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
estimated_cost: Optional[float] = None
actual_cost: Optional[float] = None
result: Optional[Any] = None
error_message: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
priority: str = "standard"
retry_count: int = 0
user_id: Optional[str] = None
def is_completed(self) -> bool:
"""Check if job is completed (success or failure)."""
return self.status.lower() in ("completed", "failed", "cancelled")
def is_successful(self) -> bool:
"""Check if job completed successfully."""
return self.status.lower() == "completed"
def is_failed(self) -> bool:
"""Check if job failed."""
return self.status.lower() == "failed"
def duration_seconds(self) -> Optional[float]:
"""Calculate job duration in seconds."""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
return None
|
is_completed()
Check if job is completed (success or failure).
Source code in microdc/jobs/job_details.py
| def is_completed(self) -> bool:
"""Check if job is completed (success or failure)."""
return self.status.lower() in ("completed", "failed", "cancelled")
|
is_successful()
Check if job completed successfully.
Source code in microdc/jobs/job_details.py
| def is_successful(self) -> bool:
"""Check if job completed successfully."""
return self.status.lower() == "completed"
|
is_failed()
Check if job failed.
Source code in microdc/jobs/job_details.py
| def is_failed(self) -> bool:
"""Check if job failed."""
return self.status.lower() == "failed"
|
duration_seconds()
Calculate job duration in seconds.
Source code in microdc/jobs/job_details.py
| def duration_seconds(self) -> Optional[float]:
"""Calculate job duration in seconds."""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
return None
|