Batch Processing with MicroDC¶
This notebook demonstrates how to efficiently process large batches of jobs using MicroDC's distributed computing platform. Perfect for:
- Processing multiple documents
- Generating embeddings for large datasets
- Running inference on many inputs
- Parallel data processing pipelines
Prerequisites¶
# Install MicroDC client from GitLab (v1 branch)
pip install git+https://gitlab.com/microdc/python-client.git@v1
# Install additional dependencies
pip install pandas tqdm
Setup¶
In [ ]:
Copied!
import time
from typing import Any, Dict, List
from microDC import Client, EmbedCall, LLMCall
# For progress tracking
try:
from tqdm import tqdm
except ImportError:
print("Installing tqdm for progress bars...")
!pip install tqdm
from tqdm import tqdm
# For data handling
try:
import pandas as pd
except ImportError:
print("Installing pandas...")
!pip install pandas
import pandas as pd
print("Setup complete!")
import time
from typing import Any, Dict, List
from microDC import Client, EmbedCall, LLMCall
# For progress tracking
try:
from tqdm import tqdm
except ImportError:
print("Installing tqdm for progress bars...")
!pip install tqdm
from tqdm import tqdm
# For data handling
try:
import pandas as pd
except ImportError:
print("Installing pandas...")
!pip install pandas
import pandas as pd
print("Setup complete!")
Example 1: Batch Text Classification¶
In [ ]:
Copied!
# Sample dataset: customer reviews
reviews = [
"This product is amazing! Works exactly as described.",
"Terrible quality. Broke after one day.",
"Pretty good, but could be better. The price is fair.",
"Best purchase I've made this year! Highly recommend.",
"Not worth the money. Very disappointed.",
"Decent product. Does the job.",
"Exceeded my expectations! Will buy again.",
"Poor customer service and low quality product.",
"It's okay, nothing special but works fine.",
"Absolutely love it! Five stars!",
]
def classify_sentiment_batch(texts: List[str], client: Client) -> List[Dict[str, Any]]:
"""
Classify sentiment for a batch of texts using callbacks for async processing.
"""
results = {}
job_to_text = {} # Map job IDs to original texts
# Callback to collect results
def collect_result(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
results[job_id] = details.result
# Set callback
client.set_callback(collect_result)
# Submit all jobs
print("Submitting jobs...")
for text in tqdm(texts):
job = LLMCall(model="llama3.3", temperature=0.1, max_tokens=50)
prompt = f"""Classify the sentiment of this review as 'positive', 'negative', or 'neutral'.
Respond with just the single word.
Review: {text}
Sentiment:"""
job.add_user_message(prompt)
job_id = client.send_job(job)
job_to_text[job_id] = text
# Wait for all jobs
print("\nProcessing jobs...")
client.wait_for_all()
# Combine results with original texts
output = []
for job_id, text in job_to_text.items():
sentiment = results.get(job_id, "error").strip().lower()
output.append({"text": text, "sentiment": sentiment})
return output
# Run batch classification
client = Client()
classifications = classify_sentiment_batch(reviews, client)
# Display results
df = pd.DataFrame(classifications)
print("\nResults:")
print(df)
print("\nSentiment Distribution:")
print(df["sentiment"].value_counts())
# Sample dataset: customer reviews
reviews = [
"This product is amazing! Works exactly as described.",
"Terrible quality. Broke after one day.",
"Pretty good, but could be better. The price is fair.",
"Best purchase I've made this year! Highly recommend.",
"Not worth the money. Very disappointed.",
"Decent product. Does the job.",
"Exceeded my expectations! Will buy again.",
"Poor customer service and low quality product.",
"It's okay, nothing special but works fine.",
"Absolutely love it! Five stars!",
]
def classify_sentiment_batch(texts: List[str], client: Client) -> List[Dict[str, Any]]:
"""
Classify sentiment for a batch of texts using callbacks for async processing.
"""
results = {}
job_to_text = {} # Map job IDs to original texts
# Callback to collect results
def collect_result(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
results[job_id] = details.result
# Set callback
client.set_callback(collect_result)
# Submit all jobs
print("Submitting jobs...")
for text in tqdm(texts):
job = LLMCall(model="llama3.3", temperature=0.1, max_tokens=50)
prompt = f"""Classify the sentiment of this review as 'positive', 'negative', or 'neutral'.
Respond with just the single word.
Review: {text}
Sentiment:"""
job.add_user_message(prompt)
job_id = client.send_job(job)
job_to_text[job_id] = text
# Wait for all jobs
print("\nProcessing jobs...")
client.wait_for_all()
# Combine results with original texts
output = []
for job_id, text in job_to_text.items():
sentiment = results.get(job_id, "error").strip().lower()
output.append({"text": text, "sentiment": sentiment})
return output
# Run batch classification
client = Client()
classifications = classify_sentiment_batch(reviews, client)
# Display results
df = pd.DataFrame(classifications)
print("\nResults:")
print(df)
print("\nSentiment Distribution:")
print(df["sentiment"].value_counts())
Example 2: Batch Embedding Generation for Large Datasets¶
In [ ]:
Copied!
def generate_embeddings_batch(texts: List[str], client: Client, batch_size: int = 100):
"""
Generate embeddings for large datasets by splitting into batches.
Args:
texts: List of texts to embed
client: MicroDC client
batch_size: Number of texts per batch job
"""
all_embeddings = []
job_ids = []
# Split into batches
batches = [texts[i : i + batch_size] for i in range(0, len(texts), batch_size)]
print(f"Processing {len(texts)} texts in {len(batches)} batches...")
# Submit all batch jobs
for batch in tqdm(batches, desc="Submitting batches"):
job = EmbedCall(model="text-embedding-3-small")
for text in batch:
job.add_text(text)
job_id = client.send_job(job)
job_ids.append(job_id)
# Wait for all jobs
print("\nWaiting for jobs to complete...")
client.wait_for_all()
# Collect results
print("Collecting results...")
for job_id in tqdm(job_ids, desc="Retrieving embeddings"):
result = client.get_job_details(job_id)
if result.is_successful():
all_embeddings.extend(result.result)
else:
print(f"Error in job {job_id}: {result.error_message}")
return all_embeddings
# Example: Generate embeddings for product descriptions
product_descriptions = [
f"Product {i}: High quality item with excellent features." for i in range(50)
]
embeddings = generate_embeddings_batch(product_descriptions, client, batch_size=10)
print(f"\nGenerated {len(embeddings)} embeddings")
print(f"Embedding dimension: {len(embeddings[0])}")
def generate_embeddings_batch(texts: List[str], client: Client, batch_size: int = 100):
"""
Generate embeddings for large datasets by splitting into batches.
Args:
texts: List of texts to embed
client: MicroDC client
batch_size: Number of texts per batch job
"""
all_embeddings = []
job_ids = []
# Split into batches
batches = [texts[i : i + batch_size] for i in range(0, len(texts), batch_size)]
print(f"Processing {len(texts)} texts in {len(batches)} batches...")
# Submit all batch jobs
for batch in tqdm(batches, desc="Submitting batches"):
job = EmbedCall(model="text-embedding-3-small")
for text in batch:
job.add_text(text)
job_id = client.send_job(job)
job_ids.append(job_id)
# Wait for all jobs
print("\nWaiting for jobs to complete...")
client.wait_for_all()
# Collect results
print("Collecting results...")
for job_id in tqdm(job_ids, desc="Retrieving embeddings"):
result = client.get_job_details(job_id)
if result.is_successful():
all_embeddings.extend(result.result)
else:
print(f"Error in job {job_id}: {result.error_message}")
return all_embeddings
# Example: Generate embeddings for product descriptions
product_descriptions = [
f"Product {i}: High quality item with excellent features." for i in range(50)
]
embeddings = generate_embeddings_batch(product_descriptions, client, batch_size=10)
print(f"\nGenerated {len(embeddings)} embeddings")
print(f"Embedding dimension: {len(embeddings[0])}")
Example 3: Multi-Document Summarization¶
In [ ]:
Copied!
# Sample documents
documents = {
"doc1.txt": """Artificial Intelligence has revolutionized many industries in recent years.
Machine learning algorithms can now perform tasks that were once thought to require human intelligence.
From image recognition to natural language processing, AI systems continue to improve.""",
"doc2.txt": """Climate change remains one of the most pressing challenges of our time.
Rising global temperatures are causing sea levels to rise and weather patterns to shift.
Immediate action is needed to reduce carbon emissions and transition to renewable energy.""",
"doc3.txt": """The quantum computing field has made significant breakthroughs recently.
Quantum computers leverage quantum mechanics to solve certain problems exponentially faster than classical computers.
This could revolutionize cryptography, drug discovery, and optimization problems.""",
"doc4.txt": """Remote work has become increasingly common in the post-pandemic world.
Companies are adapting to hybrid work models that combine office and remote work.
This shift has implications for real estate, urban planning, and work-life balance.""",
"doc5.txt": """Renewable energy sources like solar and wind power are becoming more cost-effective.
Battery technology improvements are making energy storage more viable.
Many countries are setting ambitious targets for transitioning to clean energy.""",
}
def summarize_documents_batch(docs: Dict[str, str], client: Client) -> pd.DataFrame:
"""
Summarize multiple documents in parallel.
"""
summaries = {}
job_to_doc = {}
# Callback to collect summaries
def collect_summary(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
summaries[job_id] = details.result
client.set_callback(collect_summary)
# Submit all summarization jobs
print(f"Submitting {len(docs)} documents for summarization...")
for doc_name, content in docs.items():
job = LLMCall(model="llama3.3", temperature=0.3, max_tokens=100)
job.add_user_message(f"Summarize the following text in one sentence:\n\n{content}")
job_id = client.send_job(job)
job_to_doc[job_id] = doc_name
# Wait for completion
print("Processing...")
client.wait_for_all()
# Compile results
results = []
for job_id, doc_name in job_to_doc.items():
results.append(
{
"document": doc_name,
"original_length": len(docs[doc_name]),
"summary": summaries.get(job_id, "Error"),
}
)
return pd.DataFrame(results)
# Run batch summarization
summary_df = summarize_documents_batch(documents, client)
print("\nDocument Summaries:")
print("=" * 80)
for _, row in summary_df.iterrows():
print(f"\n{row['document']} ({row['original_length']} chars):")
print(f"{row['summary']}")
print("\n" + "=" * 80)
# Sample documents
documents = {
"doc1.txt": """Artificial Intelligence has revolutionized many industries in recent years.
Machine learning algorithms can now perform tasks that were once thought to require human intelligence.
From image recognition to natural language processing, AI systems continue to improve.""",
"doc2.txt": """Climate change remains one of the most pressing challenges of our time.
Rising global temperatures are causing sea levels to rise and weather patterns to shift.
Immediate action is needed to reduce carbon emissions and transition to renewable energy.""",
"doc3.txt": """The quantum computing field has made significant breakthroughs recently.
Quantum computers leverage quantum mechanics to solve certain problems exponentially faster than classical computers.
This could revolutionize cryptography, drug discovery, and optimization problems.""",
"doc4.txt": """Remote work has become increasingly common in the post-pandemic world.
Companies are adapting to hybrid work models that combine office and remote work.
This shift has implications for real estate, urban planning, and work-life balance.""",
"doc5.txt": """Renewable energy sources like solar and wind power are becoming more cost-effective.
Battery technology improvements are making energy storage more viable.
Many countries are setting ambitious targets for transitioning to clean energy.""",
}
def summarize_documents_batch(docs: Dict[str, str], client: Client) -> pd.DataFrame:
"""
Summarize multiple documents in parallel.
"""
summaries = {}
job_to_doc = {}
# Callback to collect summaries
def collect_summary(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
summaries[job_id] = details.result
client.set_callback(collect_summary)
# Submit all summarization jobs
print(f"Submitting {len(docs)} documents for summarization...")
for doc_name, content in docs.items():
job = LLMCall(model="llama3.3", temperature=0.3, max_tokens=100)
job.add_user_message(f"Summarize the following text in one sentence:\n\n{content}")
job_id = client.send_job(job)
job_to_doc[job_id] = doc_name
# Wait for completion
print("Processing...")
client.wait_for_all()
# Compile results
results = []
for job_id, doc_name in job_to_doc.items():
results.append(
{
"document": doc_name,
"original_length": len(docs[doc_name]),
"summary": summaries.get(job_id, "Error"),
}
)
return pd.DataFrame(results)
# Run batch summarization
summary_df = summarize_documents_batch(documents, client)
print("\nDocument Summaries:")
print("=" * 80)
for _, row in summary_df.iterrows():
print(f"\n{row['document']} ({row['original_length']} chars):")
print(f"{row['summary']}")
print("\n" + "=" * 80)
Example 4: Progress Tracking with Custom Metadata¶
In [ ]:
Copied!
class BatchJobTracker:
"""
Track batch job progress with detailed metrics.
"""
def __init__(self, client: Client):
self.client = client
self.jobs = {}
self.results = {}
self.start_time = None
self.end_time = None
# Set callback
self.client.set_callback(self._on_complete)
def _on_complete(self, client: Client, job_id: str):
"""Handle job completion."""
details = client.get_job_details(job_id)
self.results[job_id] = details
# Update progress
completed = len(self.results)
total = len(self.jobs)
print(f"Progress: {completed}/{total} jobs completed ({completed / total * 100:.1f}%)")
def submit_job(self, job, metadata: Dict[str, Any] = None):
"""Submit a job with metadata."""
if self.start_time is None:
self.start_time = time.time()
job_id = self.client.send_job(job)
self.jobs[job_id] = metadata or {}
return job_id
def wait_all(self):
"""Wait for all jobs and collect metrics."""
self.client.wait_for_all()
self.end_time = time.time()
def get_metrics(self) -> Dict[str, Any]:
"""Get batch processing metrics."""
total_time = self.end_time - self.start_time if self.end_time else 0
successful = sum(1 for r in self.results.values() if r.is_successful())
failed = len(self.results) - successful
return {
"total_jobs": len(self.jobs),
"successful": successful,
"failed": failed,
"total_time_seconds": total_time,
"avg_time_per_job": total_time / len(self.jobs) if self.jobs else 0,
"jobs_per_second": len(self.jobs) / total_time if total_time > 0 else 0,
}
def get_results_df(self) -> pd.DataFrame:
"""Get results as a DataFrame."""
data = []
for job_id, metadata in self.jobs.items():
result = self.results.get(job_id)
data.append(
{
"job_id": job_id[:8] + "...",
"status": "success" if result and result.is_successful() else "failed",
**metadata,
}
)
return pd.DataFrame(data)
# Example: Batch translation
tracker = BatchJobTracker(Client())
phrases = [
"Hello, how are you?",
"Thank you very much",
"Good morning",
"Have a nice day",
"See you later",
]
languages = ["Spanish", "French", "German", "Italian", "Japanese"]
print("Submitting translation jobs...\n")
for phrase in phrases:
for lang in languages:
job = LLMCall(model="llama3.3", temperature=0.1, max_tokens=50)
job.add_user_message(f"Translate to {lang}: {phrase}")
tracker.submit_job(job, metadata={"phrase": phrase, "language": lang})
print(f"\nSubmitted {len(tracker.jobs)} jobs\n")
tracker.wait_all()
# Display metrics
print("\n" + "=" * 60)
print("BATCH PROCESSING METRICS")
print("=" * 60)
metrics = tracker.get_metrics()
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.2f}")
else:
print(f"{key}: {value}")
print("\n" + "=" * 60)
print("JOB RESULTS")
print("=" * 60)
print(tracker.get_results_df())
class BatchJobTracker:
"""
Track batch job progress with detailed metrics.
"""
def __init__(self, client: Client):
self.client = client
self.jobs = {}
self.results = {}
self.start_time = None
self.end_time = None
# Set callback
self.client.set_callback(self._on_complete)
def _on_complete(self, client: Client, job_id: str):
"""Handle job completion."""
details = client.get_job_details(job_id)
self.results[job_id] = details
# Update progress
completed = len(self.results)
total = len(self.jobs)
print(f"Progress: {completed}/{total} jobs completed ({completed / total * 100:.1f}%)")
def submit_job(self, job, metadata: Dict[str, Any] = None):
"""Submit a job with metadata."""
if self.start_time is None:
self.start_time = time.time()
job_id = self.client.send_job(job)
self.jobs[job_id] = metadata or {}
return job_id
def wait_all(self):
"""Wait for all jobs and collect metrics."""
self.client.wait_for_all()
self.end_time = time.time()
def get_metrics(self) -> Dict[str, Any]:
"""Get batch processing metrics."""
total_time = self.end_time - self.start_time if self.end_time else 0
successful = sum(1 for r in self.results.values() if r.is_successful())
failed = len(self.results) - successful
return {
"total_jobs": len(self.jobs),
"successful": successful,
"failed": failed,
"total_time_seconds": total_time,
"avg_time_per_job": total_time / len(self.jobs) if self.jobs else 0,
"jobs_per_second": len(self.jobs) / total_time if total_time > 0 else 0,
}
def get_results_df(self) -> pd.DataFrame:
"""Get results as a DataFrame."""
data = []
for job_id, metadata in self.jobs.items():
result = self.results.get(job_id)
data.append(
{
"job_id": job_id[:8] + "...",
"status": "success" if result and result.is_successful() else "failed",
**metadata,
}
)
return pd.DataFrame(data)
# Example: Batch translation
tracker = BatchJobTracker(Client())
phrases = [
"Hello, how are you?",
"Thank you very much",
"Good morning",
"Have a nice day",
"See you later",
]
languages = ["Spanish", "French", "German", "Italian", "Japanese"]
print("Submitting translation jobs...\n")
for phrase in phrases:
for lang in languages:
job = LLMCall(model="llama3.3", temperature=0.1, max_tokens=50)
job.add_user_message(f"Translate to {lang}: {phrase}")
tracker.submit_job(job, metadata={"phrase": phrase, "language": lang})
print(f"\nSubmitted {len(tracker.jobs)} jobs\n")
tracker.wait_all()
# Display metrics
print("\n" + "=" * 60)
print("BATCH PROCESSING METRICS")
print("=" * 60)
metrics = tracker.get_metrics()
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.2f}")
else:
print(f"{key}: {value}")
print("\n" + "=" * 60)
print("JOB RESULTS")
print("=" * 60)
print(tracker.get_results_df())
Example 5: Error Handling and Retry Logic¶
In [ ]:
Copied!
def robust_batch_processing(texts: List[str], client: Client, max_retries: int = 3):
"""
Batch processing with automatic retry for failed jobs.
"""
results = {}
failed_jobs = []
def handle_completion(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
results[job_id] = details.result
else:
failed_jobs.append((job_id, details.error_message))
client.set_callback(handle_completion)
# Initial submission
job_to_text = {}
for text in texts:
job = LLMCall(model="llama3.3", max_tokens=100)
job.add_user_message(f"Summarize: {text}")
job_id = client.send_job(job)
job_to_text[job_id] = text
client.wait_for_all()
# Retry failed jobs
retry_count = 0
while failed_jobs and retry_count < max_retries:
retry_count += 1
print(f"\nRetry attempt {retry_count}/{max_retries} for {len(failed_jobs)} failed jobs...")
retry_jobs = failed_jobs.copy()
failed_jobs = []
for job_id, error in retry_jobs:
text = job_to_text[job_id]
job = LLMCall(model="llama3.3", max_tokens=100)
job.add_user_message(f"Summarize: {text}")
new_job_id = client.send_job(job)
job_to_text[new_job_id] = text
client.wait_for_all()
# Report final status
print(f"\nCompleted: {len(results)} successful, {len(failed_jobs)} failed")
if failed_jobs:
print("\nFailed jobs:")
for job_id, error in failed_jobs:
print(f" {job_id[:8]}...: {error}")
return results
# Test with sample data
test_texts = [f"Sample text {i} for processing." for i in range(5)]
final_results = robust_batch_processing(test_texts, client)
print(f"\nProcessed {len(final_results)} texts successfully")
def robust_batch_processing(texts: List[str], client: Client, max_retries: int = 3):
"""
Batch processing with automatic retry for failed jobs.
"""
results = {}
failed_jobs = []
def handle_completion(client: Client, job_id: str):
details = client.get_job_details(job_id)
if details.is_successful():
results[job_id] = details.result
else:
failed_jobs.append((job_id, details.error_message))
client.set_callback(handle_completion)
# Initial submission
job_to_text = {}
for text in texts:
job = LLMCall(model="llama3.3", max_tokens=100)
job.add_user_message(f"Summarize: {text}")
job_id = client.send_job(job)
job_to_text[job_id] = text
client.wait_for_all()
# Retry failed jobs
retry_count = 0
while failed_jobs and retry_count < max_retries:
retry_count += 1
print(f"\nRetry attempt {retry_count}/{max_retries} for {len(failed_jobs)} failed jobs...")
retry_jobs = failed_jobs.copy()
failed_jobs = []
for job_id, error in retry_jobs:
text = job_to_text[job_id]
job = LLMCall(model="llama3.3", max_tokens=100)
job.add_user_message(f"Summarize: {text}")
new_job_id = client.send_job(job)
job_to_text[new_job_id] = text
client.wait_for_all()
# Report final status
print(f"\nCompleted: {len(results)} successful, {len(failed_jobs)} failed")
if failed_jobs:
print("\nFailed jobs:")
for job_id, error in failed_jobs:
print(f" {job_id[:8]}...: {error}")
return results
# Test with sample data
test_texts = [f"Sample text {i} for processing." for i in range(5)]
final_results = robust_batch_processing(test_texts, client)
print(f"\nProcessed {len(final_results)} texts successfully")
Best Practices for Batch Processing¶
Optimal Batch Sizes:
- For embeddings: 50-100 texts per job
- For LLM calls: Submit individually but in parallel
Use Callbacks: Leverage callbacks for true async processing instead of blocking waits
Error Handling: Always implement retry logic for failed jobs
Progress Tracking: Use tqdm or custom trackers for visibility
Metadata: Track job metadata to correlate results with inputs
Rate Limiting: MicroDC handles rate limiting automatically, but be mindful of your quota
Resource Cleanup: Use context managers or ensure proper client cleanup
Performance Tips¶
- Parallel Submission: Submit all jobs first, then wait (don't submit-wait-submit-wait)
- Connection Reuse: Use a single Client instance for all jobs
- Batch Embeddings: Group multiple texts in one EmbedCall when possible
- Monitor Metrics: Track timing to identify bottlenecks
Next Steps¶
- Scale up to thousands of documents
- Integrate with data pipelines (Airflow, Prefect)
- Build monitoring dashboards
- Implement checkpointing for long-running jobs