Simple RAG (Retrieval-Augmented Generation) with MicroDC¶
This notebook demonstrates how to build a simple RAG system that:
- Accepts a PDF file as input
- Extracts and chunks the text
- Generates embeddings using MicroDC (offloading compute to the cloud)
- Stores embeddings in a vector database
- Answers questions by retrieving relevant context and using MicroDC for generation
Benefits: Run on a low-power computer by offloading all heavy computation to MicroDC!
Prerequisites¶
# Install MicroDC client from GitLab (v1 branch)
pip install git+https://gitlab.com/microdc/python-client.git@v1
# Install additional dependencies
pip install pypdf numpy scikit-learn
Setup and Imports¶
In [ ]:
Copied!
from typing import List, Tuple
import numpy as np
from microDC import Client, EmbedCall, LLMCall
# For PDF processing
try:
from pypdf import PdfReader
except ImportError:
print("Installing pypdf...")
!pip install pypdf
from pypdf import PdfReader
# For similarity search
from sklearn.metrics.pairwise import cosine_similarity
print("All dependencies loaded!")
from typing import List, Tuple
import numpy as np
from microDC import Client, EmbedCall, LLMCall
# For PDF processing
try:
from pypdf import PdfReader
except ImportError:
print("Installing pypdf...")
!pip install pypdf
from pypdf import PdfReader
# For similarity search
from sklearn.metrics.pairwise import cosine_similarity
print("All dependencies loaded!")
Step 1: PDF Text Extraction and Chunking¶
In [ ]:
Copied!
def extract_text_from_pdf(pdf_path: str) -> str:
"""
Extract all text from a PDF file.
"""
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""
Split text into overlapping chunks.
Args:
text: Input text to chunk
chunk_size: Maximum characters per chunk
overlap: Number of characters to overlap between chunks
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk.rfind(".")
if last_period > chunk_size // 2: # Only if we're past halfway
chunk = chunk[: last_period + 1]
end = start + last_period + 1
chunks.append(chunk.strip())
start = end - overlap
return [c for c in chunks if len(c) > 50] # Filter very short chunks
# Test with a sample PDF (replace with your PDF path)
# For demo, we'll create sample text
sample_text = """
Artificial Intelligence and Machine Learning.
Machine learning is a subset of artificial intelligence that focuses on enabling
computers to learn from data without being explicitly programmed. The field has
grown tremendously in recent years due to increases in computing power and data availability.
Deep Learning is a subset of machine learning that uses neural networks with multiple
layers. These networks can learn hierarchical representations of data, making them
particularly effective for tasks like image recognition and natural language processing.
Natural Language Processing (NLP) is another important area of AI that deals with
the interaction between computers and human language. Modern NLP systems use
transformer architectures and large language models to achieve human-like understanding.
Distributed Computing allows us to process large amounts of data and run complex
models by distributing the workload across multiple machines. This is essential for
training large AI models and serving them at scale.
"""
# Chunk the text
chunks = chunk_text(sample_text, chunk_size=300, overlap=50)
print(f"Created {len(chunks)} chunks from the document\n")
for i, chunk in enumerate(chunks):
print(f"Chunk {i + 1}: {chunk[:100]}...")
print()
def extract_text_from_pdf(pdf_path: str) -> str:
"""
Extract all text from a PDF file.
"""
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""
Split text into overlapping chunks.
Args:
text: Input text to chunk
chunk_size: Maximum characters per chunk
overlap: Number of characters to overlap between chunks
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk.rfind(".")
if last_period > chunk_size // 2: # Only if we're past halfway
chunk = chunk[: last_period + 1]
end = start + last_period + 1
chunks.append(chunk.strip())
start = end - overlap
return [c for c in chunks if len(c) > 50] # Filter very short chunks
# Test with a sample PDF (replace with your PDF path)
# For demo, we'll create sample text
sample_text = """
Artificial Intelligence and Machine Learning.
Machine learning is a subset of artificial intelligence that focuses on enabling
computers to learn from data without being explicitly programmed. The field has
grown tremendously in recent years due to increases in computing power and data availability.
Deep Learning is a subset of machine learning that uses neural networks with multiple
layers. These networks can learn hierarchical representations of data, making them
particularly effective for tasks like image recognition and natural language processing.
Natural Language Processing (NLP) is another important area of AI that deals with
the interaction between computers and human language. Modern NLP systems use
transformer architectures and large language models to achieve human-like understanding.
Distributed Computing allows us to process large amounts of data and run complex
models by distributing the workload across multiple machines. This is essential for
training large AI models and serving them at scale.
"""
# Chunk the text
chunks = chunk_text(sample_text, chunk_size=300, overlap=50)
print(f"Created {len(chunks)} chunks from the document\n")
for i, chunk in enumerate(chunks):
print(f"Chunk {i + 1}: {chunk[:100]}...")
print()
Step 2: Generate Embeddings using MicroDC¶
In [ ]:
Copied!
def generate_embeddings(chunks: List[str], client: Client) -> np.ndarray:
"""
Generate embeddings for all chunks using MicroDC.
All computation happens in the cloud - perfect for low-power devices!
"""
print(f"Generating embeddings for {len(chunks)} chunks...")
# Create embedding job
job = EmbedCall(model="text-embedding-3-small")
for chunk in chunks:
job.add_text(chunk)
# Submit to MicroDC
job_id = client.send_job(job)
print(f"Job submitted: {job_id[:8]}...")
# Wait for completion
client.wait_for_all()
# Get results
result = client.get_job_details(job_id)
if result.is_successful():
embeddings = np.array(result.result)
print(f"Generated embeddings with shape: {embeddings.shape}")
return embeddings
else:
raise Exception(f"Embedding generation failed: {result.error_message}")
# Initialize MicroDC client
client = Client()
# Generate embeddings
chunk_embeddings = generate_embeddings(chunks, client)
print(f"\nEmbedding dimension: {chunk_embeddings.shape[1]}")
def generate_embeddings(chunks: List[str], client: Client) -> np.ndarray:
"""
Generate embeddings for all chunks using MicroDC.
All computation happens in the cloud - perfect for low-power devices!
"""
print(f"Generating embeddings for {len(chunks)} chunks...")
# Create embedding job
job = EmbedCall(model="text-embedding-3-small")
for chunk in chunks:
job.add_text(chunk)
# Submit to MicroDC
job_id = client.send_job(job)
print(f"Job submitted: {job_id[:8]}...")
# Wait for completion
client.wait_for_all()
# Get results
result = client.get_job_details(job_id)
if result.is_successful():
embeddings = np.array(result.result)
print(f"Generated embeddings with shape: {embeddings.shape}")
return embeddings
else:
raise Exception(f"Embedding generation failed: {result.error_message}")
# Initialize MicroDC client
client = Client()
# Generate embeddings
chunk_embeddings = generate_embeddings(chunks, client)
print(f"\nEmbedding dimension: {chunk_embeddings.shape[1]}")
Step 3: Create Vector Store and Retrieval Function¶
In [ ]:
Copied!
class SimpleVectorStore:
"""
A simple in-memory vector store for similarity search.
For production, consider using Pinecone, Weaviate, or Chroma.
"""
def __init__(self, chunks: List[str], embeddings: np.ndarray):
self.chunks = chunks
self.embeddings = embeddings
def search(self, query_embedding: np.ndarray, top_k: int = 3) -> List[Tuple[str, float]]:
"""
Find the top_k most similar chunks to the query.
Returns:
List of (chunk_text, similarity_score) tuples
"""
# Calculate cosine similarity
similarities = cosine_similarity([query_embedding], self.embeddings)[0]
# Get top_k indices
top_indices = np.argsort(similarities)[-top_k:][::-1]
# Return chunks with scores
results = [(self.chunks[i], similarities[i]) for i in top_indices]
return results
# Create vector store
vector_store = SimpleVectorStore(chunks, chunk_embeddings)
print("Vector store created!")
class SimpleVectorStore:
"""
A simple in-memory vector store for similarity search.
For production, consider using Pinecone, Weaviate, or Chroma.
"""
def __init__(self, chunks: List[str], embeddings: np.ndarray):
self.chunks = chunks
self.embeddings = embeddings
def search(self, query_embedding: np.ndarray, top_k: int = 3) -> List[Tuple[str, float]]:
"""
Find the top_k most similar chunks to the query.
Returns:
List of (chunk_text, similarity_score) tuples
"""
# Calculate cosine similarity
similarities = cosine_similarity([query_embedding], self.embeddings)[0]
# Get top_k indices
top_indices = np.argsort(similarities)[-top_k:][::-1]
# Return chunks with scores
results = [(self.chunks[i], similarities[i]) for i in top_indices]
return results
# Create vector store
vector_store = SimpleVectorStore(chunks, chunk_embeddings)
print("Vector store created!")
Step 4: Implement RAG Query Function¶
In [ ]:
Copied!
def query_rag(
question: str, vector_store: SimpleVectorStore, client: Client, top_k: int = 3
) -> str:
"""
Answer a question using RAG:
1. Generate embedding for the question (via MicroDC)
2. Retrieve relevant chunks from vector store
3. Generate answer using LLM with context (via MicroDC)
"""
print(f"\nQuestion: {question}")
print("=" * 60)
# Step 1: Embed the question
print("1. Generating query embedding...")
embed_job = EmbedCall(model="text-embedding-3-small")
embed_job.add_text(question)
job_id = client.send_job(embed_job)
client.wait_for_all()
result = client.get_job_details(job_id)
if not result.is_successful():
return f"Error generating embedding: {result.error_message}"
query_embedding = np.array(result.result[0])
# Step 2: Retrieve relevant chunks
print("2. Retrieving relevant context...")
relevant_chunks = vector_store.search(query_embedding, top_k=top_k)
print(f" Found {len(relevant_chunks)} relevant chunks:")
for i, (chunk, score) in enumerate(relevant_chunks):
print(f" - Chunk {i + 1} (similarity: {score:.3f}): {chunk[:80]}...")
# Step 3: Generate answer using LLM
print("3. Generating answer...")
# Build context from retrieved chunks
context = "\n\n".join([chunk for chunk, _ in relevant_chunks])
# Create prompt
prompt = f"""Answer the following question based on the provided context.
If the answer cannot be found in the context, say so.
Context:
{context}
Question: {question}
Answer:"""
# Submit LLM job
llm_job = LLMCall(model="llama3.3", temperature=0.3, max_tokens=300)
llm_job.add_user_message(prompt)
job_id = client.send_job(llm_job)
client.wait_for_all()
result = client.get_job_details(job_id)
if result.is_successful():
print("\n" + "=" * 60)
print("Answer:")
print(result.result)
return result.result
else:
return f"Error generating answer: {result.error_message}"
# Test the RAG system
print("RAG System Ready! Testing with sample questions...\n")
def query_rag(
question: str, vector_store: SimpleVectorStore, client: Client, top_k: int = 3
) -> str:
"""
Answer a question using RAG:
1. Generate embedding for the question (via MicroDC)
2. Retrieve relevant chunks from vector store
3. Generate answer using LLM with context (via MicroDC)
"""
print(f"\nQuestion: {question}")
print("=" * 60)
# Step 1: Embed the question
print("1. Generating query embedding...")
embed_job = EmbedCall(model="text-embedding-3-small")
embed_job.add_text(question)
job_id = client.send_job(embed_job)
client.wait_for_all()
result = client.get_job_details(job_id)
if not result.is_successful():
return f"Error generating embedding: {result.error_message}"
query_embedding = np.array(result.result[0])
# Step 2: Retrieve relevant chunks
print("2. Retrieving relevant context...")
relevant_chunks = vector_store.search(query_embedding, top_k=top_k)
print(f" Found {len(relevant_chunks)} relevant chunks:")
for i, (chunk, score) in enumerate(relevant_chunks):
print(f" - Chunk {i + 1} (similarity: {score:.3f}): {chunk[:80]}...")
# Step 3: Generate answer using LLM
print("3. Generating answer...")
# Build context from retrieved chunks
context = "\n\n".join([chunk for chunk, _ in relevant_chunks])
# Create prompt
prompt = f"""Answer the following question based on the provided context.
If the answer cannot be found in the context, say so.
Context:
{context}
Question: {question}
Answer:"""
# Submit LLM job
llm_job = LLMCall(model="llama3.3", temperature=0.3, max_tokens=300)
llm_job.add_user_message(prompt)
job_id = client.send_job(llm_job)
client.wait_for_all()
result = client.get_job_details(job_id)
if result.is_successful():
print("\n" + "=" * 60)
print("Answer:")
print(result.result)
return result.result
else:
return f"Error generating answer: {result.error_message}"
# Test the RAG system
print("RAG System Ready! Testing with sample questions...\n")
Step 5: Test the RAG System¶
In [ ]:
Copied!
# Question 1
answer1 = query_rag("What is deep learning?", vector_store, client)
# Question 1
answer1 = query_rag("What is deep learning?", vector_store, client)
In [ ]:
Copied!
# Question 2
answer2 = query_rag("How does distributed computing help with AI?", vector_store, client)
# Question 2
answer2 = query_rag("How does distributed computing help with AI?", vector_store, client)
In [ ]:
Copied!
# Question 3
answer3 = query_rag("What are transformer architectures?", vector_store, client)
# Question 3
answer3 = query_rag("What are transformer architectures?", vector_store, client)
Step 6: Process Your Own PDF File¶
In [ ]:
Copied!
def process_pdf_and_create_rag(pdf_path: str, client: Client) -> SimpleVectorStore:
"""
Complete pipeline: PDF -> Chunks -> Embeddings -> Vector Store
"""
print(f"Processing PDF: {pdf_path}")
print("=" * 60)
# Extract text
print("1. Extracting text from PDF...")
text = extract_text_from_pdf(pdf_path)
print(f" Extracted {len(text)} characters")
# Chunk text
print("2. Chunking text...")
chunks = chunk_text(text, chunk_size=500, overlap=50)
print(f" Created {len(chunks)} chunks")
# Generate embeddings
print("3. Generating embeddings via MicroDC...")
embeddings = generate_embeddings(chunks, client)
# Create vector store
print("4. Creating vector store...")
vector_store = SimpleVectorStore(chunks, embeddings)
print("\n" + "=" * 60)
print("RAG system ready! You can now ask questions about your PDF.")
print("=" * 60)
return vector_store
# Example usage:
# pdf_path = "path/to/your/document.pdf"
# my_vector_store = process_pdf_and_create_rag(pdf_path, client)
# query_rag("Your question here?", my_vector_store, client)
print("To use with your PDF:")
print("1. Upload your PDF file to this notebook directory")
print("2. Run: my_vector_store = process_pdf_and_create_rag('your_file.pdf', client)")
print("3. Ask questions: query_rag('Your question?', my_vector_store, client)")
def process_pdf_and_create_rag(pdf_path: str, client: Client) -> SimpleVectorStore:
"""
Complete pipeline: PDF -> Chunks -> Embeddings -> Vector Store
"""
print(f"Processing PDF: {pdf_path}")
print("=" * 60)
# Extract text
print("1. Extracting text from PDF...")
text = extract_text_from_pdf(pdf_path)
print(f" Extracted {len(text)} characters")
# Chunk text
print("2. Chunking text...")
chunks = chunk_text(text, chunk_size=500, overlap=50)
print(f" Created {len(chunks)} chunks")
# Generate embeddings
print("3. Generating embeddings via MicroDC...")
embeddings = generate_embeddings(chunks, client)
# Create vector store
print("4. Creating vector store...")
vector_store = SimpleVectorStore(chunks, embeddings)
print("\n" + "=" * 60)
print("RAG system ready! You can now ask questions about your PDF.")
print("=" * 60)
return vector_store
# Example usage:
# pdf_path = "path/to/your/document.pdf"
# my_vector_store = process_pdf_and_create_rag(pdf_path, client)
# query_rag("Your question here?", my_vector_store, client)
print("To use with your PDF:")
print("1. Upload your PDF file to this notebook directory")
print("2. Run: my_vector_store = process_pdf_and_create_rag('your_file.pdf', client)")
print("3. Ask questions: query_rag('Your question?', my_vector_store, client)")
Step 7: Interactive Q&A Session¶
In [ ]:
Copied!
def interactive_qa(vector_store: SimpleVectorStore, client: Client):
"""
Interactive Q&A session - keep asking questions until 'quit'
"""
print("\nInteractive Q&A Mode")
print("Type your questions (or 'quit' to exit)\n")
while True:
question = input("\nYou: ").strip()
if question.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
if not question:
continue
try:
query_rag(question, vector_store, client)
except Exception as e:
print(f"Error: {e}")
# Uncomment to start interactive mode:
# interactive_qa(vector_store, client)
def interactive_qa(vector_store: SimpleVectorStore, client: Client):
"""
Interactive Q&A session - keep asking questions until 'quit'
"""
print("\nInteractive Q&A Mode")
print("Type your questions (or 'quit' to exit)\n")
while True:
question = input("\nYou: ").strip()
if question.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
if not question:
continue
try:
query_rag(question, vector_store, client)
except Exception as e:
print(f"Error: {e}")
# Uncomment to start interactive mode:
# interactive_qa(vector_store, client)
Performance Benefits of Using MicroDC¶
By using MicroDC for this RAG pipeline, you can:
Run on Low-Power Hardware: No GPU required! All embedding generation and LLM inference happens in the cloud.
Scale Easily: Process large PDFs without worrying about memory constraints.
Cost-Effective: Pay only for what you use, no need to maintain expensive GPU infrastructure.
Fast Processing: Leverage distributed computing power for quick results.
Next Steps¶
To improve this RAG system:
- Better Chunking: Use semantic chunking or sentence-based splitting
- Persistent Storage: Save embeddings to disk to avoid regenerating
- Better Vector DB: Use Pinecone, Weaviate, or ChromaDB for production
- Multi-PDF Support: Extend to handle multiple documents
- Metadata Filtering: Add metadata (page numbers, sections) for better retrieval
- Reranking: Add a reranking step after initial retrieval
- Caching: Cache frequent queries to reduce API calls
Additional Resources¶
- MicroDC Documentation
- RAG Best Practices
- See
03_batch_processing.ipynbfor handling large-scale document processing