Chapter 4: Orchestration Frameworks - LangGraph and LlamaIndex
Introduction
Building production-grade multi-agent systems requires robust orchestration frameworks that can manage complex workflows, state persistence, and intelligent document retrieval. In this chapter, we'll explore two foundational frameworks that power ResearcherAI:
- LangGraph: A stateful workflow orchestration framework for building sophisticated agent systems
- LlamaIndex: An advanced RAG (Retrieval-Augmented Generation) framework for intelligent document indexing and retrieval
These frameworks work together to create a powerful, production-ready system that can handle complex research queries across multiple data sources.
Think of LangGraph as a state machine orchestrator (like Redux for backend workflows) and LlamaIndex as a smart search engine with built-in AI capabilities (like Elasticsearch + GPT combined).
Why These Frameworks Matter
The Challenges They Solve
-
State Management Complexity
- Multi-step workflows need to maintain state across operations
- Conversations require context preservation
- Error recovery needs state rollback capabilities
-
Document Retrieval Accuracy
- Simple keyword search isn't enough for research queries
- Need semantic understanding of document content
- Require intelligent ranking and filtering
-
Scalability and Performance
- Large document collections need efficient indexing
- Complex workflows need parallel execution
- Production systems need fault tolerance
What Makes Them Production-Ready
LangGraph:
- Built-in state persistence with checkpointing
- Conditional routing for adaptive workflows
- Stream processing for real-time feedback
- Error handling and retry mechanisms
LlamaIndex:
- Vector database abstraction layer
- Advanced query optimization
- Metadata-aware retrieval
- Response synthesis and post-processing
LangGraph: Stateful Workflow Orchestration
Core Concepts
LangGraph treats your multi-agent system as a directed graph where:
- Nodes represent operations (agent actions)
- Edges represent transitions between operations
- State flows through the graph and is updated at each node
- Checkpointing enables persistence and recovery
If you've used React Router or state machines, LangGraph is similar but for backend agent workflows. Each node is like a route handler, and edges are like navigation rules.
Architecture Overview
Implementation Details
State Definition
The first step is defining your workflow state - the data structure that flows through your graph:
from typing import TypedDict, List, Dict
from langgraph.graph import StateGraph, END
class AgentState(TypedDict):
"""State shared across all agents in the workflow"""
# User input
query: str
# Data collected from sources
papers: List[Dict]
# Processing results
graph_data: Dict # Knowledge graph representation
vector_data: Dict # Vector embeddings
llamaindex_data: Dict # LlamaIndex RAG data
# Reasoning outputs
reasoning_result: Dict
critic_feedback: Dict
reflection_feedback: Dict
# Workflow metadata
messages: List[str] # Execution log
current_step: str # Current node name
error: str | None # Error tracking
retry_count: int # Retry attempts
stage_outputs: Dict # Per-stage results
This is like defining a TypeScript interface for your Redux state. Each field represents a piece of data that different "reducers" (nodes) can read and update.
Building the Workflow Graph
class LangGraphOrchestrator:
"""Production-grade LangGraph workflow orchestrator"""
def __init__(self):
# Initialize agent dependencies
self.data_collector = DataCollectorAgent()
self.graph_agent = KnowledgeGraphAgent()
self.vector_agent = VectorAgent()
self.llamaindex = LlamaIndexRAG()
self.reasoning_agent = ReasoningAgent()
self.evaluator = EvaluatorAgent()
# Create workflow
self.workflow = self._build_workflow()
# State persistence
self.memory = MemorySaver()
def _build_workflow(self) -> CompiledGraph:
"""Build and compile the LangGraph workflow"""
# Create the graph
workflow = StateGraph(AgentState)
# Add nodes (operations)
workflow.add_node("data_collection", self.data_collection_node)
workflow.add_node("graph_processing", self.graph_processing_node)
workflow.add_node("vector_processing", self.vector_processing_node)
workflow.add_node("llamaindex_indexing", self.llamaindex_indexing_node)
workflow.add_node("reasoning", self.reasoning_node)
workflow.add_node("self_reflection", self.self_reflection_node)
workflow.add_node("critic_review", self.critic_review_node)
workflow.add_node("correction", self.correction_node)
# Define edges (transitions)
workflow.set_entry_point("data_collection")
# Linear flow for data processing
workflow.add_edge("data_collection", "graph_processing")
workflow.add_edge("graph_processing", "vector_processing")
workflow.add_edge("vector_processing", "llamaindex_indexing")
workflow.add_edge("llamaindex_indexing", "reasoning")
# Self-reflection and quality control
workflow.add_edge("reasoning", "self_reflection")
workflow.add_edge("self_reflection", "critic_review")
# Conditional routing based on quality
workflow.add_conditional_edges(
"critic_review",
self._should_continue,
{
"continue": "correction",
"end": END
}
)
workflow.add_edge("correction", "reasoning")
# Compile with state persistence
return workflow.compile(checkpointer=self.memory)
- MemorySaver: Enables conversation persistence across sessions
- Conditional Edges: Dynamic routing based on runtime conditions
- Checkpointing: Automatic state snapshots at each node
Implementing Workflow Nodes
Each node is a function that receives state, performs an operation, and updates state:
1. Data Collection Node
def data_collection_node(self, state: AgentState) -> AgentState:
"""Collect research papers from multiple sources"""
query = state["query"]
max_per_source = state.get("max_per_source", 3)
logger.info(f"DATA COLLECTION: Starting for query: {query}")
# Collect from 7 sources
papers = self.data_collector.collect_papers(
query=query,
max_per_source=max_per_source
)
# Update state
state["papers"] = papers
state["messages"].append(f"Collected {len(papers)} papers")
state["stage_outputs"]["data_collection"] = {
"papers_collected": len(papers),
"sources": ["arxiv", "semantic_scholar", "pubmed", "zenodo",
"web", "huggingface", "kaggle"]
}
return state
Each node is like an Express.js middleware function - it receives the current state (like req), performs operations, and passes updated state to the next node (like calling next()).
2. LlamaIndex Indexing Node
def llamaindex_indexing_node(self, state: AgentState) -> AgentState:
"""Index documents in LlamaIndex for RAG retrieval"""
papers = state["papers"]
logger.info(f"LLAMAINDEX INDEXING: Indexing {len(papers)} papers")
try:
# Use LlamaIndex to create vector index
result = self.llamaindex.index_documents(papers)
state["llamaindex_data"] = result
state["stage_outputs"]["llamaindex_indexing"] = {
"documents_indexed": result.get("documents_indexed", 0),
"vector_store": result.get("vector_store", "Unknown"),
"collection": result.get("collection_name", "N/A"),
"embedding_dim": result.get("embedding_dim", 384)
}
state["messages"].append(
f"Indexed {result.get('documents_indexed', 0)} docs in LlamaIndex"
)
except Exception as e:
logger.error(f"LlamaIndex indexing failed: {e}")
state["error"] = str(e)
state["llamaindex_data"] = {}
return state
3. Reasoning Node with RAG Enhancement
def reasoning_node(self, state: AgentState) -> AgentState:
"""Generate answer using multi-source context"""
query = state["query"]
# Gather context from all sources
graph_context = self._extract_graph_context(state["graph_data"])
vector_context = self._extract_vector_context(state["vector_data"])
# Get additional context from LlamaIndex RAG
llamaindex_context = ""
if state.get("llamaindex_data", {}).get("documents_indexed", 0) > 0:
try:
rag_result = self.llamaindex.query(query, top_k=3)
# Format RAG results
llamaindex_context = "\n\nLlamaIndex RAG Context:\n"
llamaindex_context += f"Answer: {rag_result.get('answer', '')}\n"
# Include sources with scores
for i, source in enumerate(rag_result.get('sources', []), 1):
llamaindex_context += f"\nSource {i} (score: {source['score']:.3f}):\n"
llamaindex_context += f"{source['text']}\n"
except Exception as e:
logger.warning(f"LlamaIndex query failed: {e}")
# Generate comprehensive answer
prompt = f"""
Based on the research question and multiple sources of context, provide a comprehensive answer.
Question: {query}
Knowledge Graph Context:
{graph_context}
Vector Search Context:
{vector_context}
{llamaindex_context}
Provide a detailed, well-cited answer:"""
response = self.model.generate_content(prompt)
answer = response.text.strip()
# Store results
state["reasoning_result"] = {
"answer": answer,
"sources_used": {
"graph": len(state["graph_data"].get("nodes", [])),
"vector": len(state["vector_data"].get("documents", [])),
"llamaindex": len(rag_result.get('sources', []))
}
}
return state
This demonstrates how LangGraph orchestrates multiple RAG sources (knowledge graph, vector search, LlamaIndex) to provide comprehensive answers with diverse perspectives.
4. Self-Reflection Node
def self_reflection_node(self, state: AgentState) -> AgentState:
"""Self-critical evaluation of generated answer"""
answer = state["reasoning_result"].get("answer", "")
query = state["query"]
reflection_prompt = f"""
Evaluate the quality of this answer critically.
Question: {query}
Answer: {answer}
Provide a JSON evaluation with:
{{
"quality_score": <0-1>,
"completeness": <0-1>,
"accuracy_confidence": <0-1>,
"suggestions": ["improvement 1", "improvement 2"],
"needs_improvement": <true/false>
}}
"""
response = self.model.generate_content(reflection_prompt)
try:
reflection = json.loads(response.text)
state["reflection_feedback"] = reflection
logger.info(f"Self-reflection score: {reflection.get('quality_score', 0)}")
except json.JSONDecodeError:
# Fallback if JSON parsing fails
state["reflection_feedback"] = {
"quality_score": 0.7,
"needs_improvement": False
}
return state
5. Conditional Routing Function
def _should_continue(self, state: AgentState) -> str:
"""Decide whether to continue improvement or end"""
# Check quality thresholds
quality_score = state.get("reflection_feedback", {}).get("quality_score", 0.8)
retry_count = state.get("retry_count", 0)
# Maximum 2 improvement iterations
if retry_count >= 2:
logger.info("Maximum retries reached, ending workflow")
return "end"
# Quality threshold
if quality_score >= 0.75:
logger.info(f"Quality acceptable ({quality_score:.2f}), ending workflow")
return "end"
else:
logger.info(f"Quality low ({quality_score:.2f}), attempting correction")
state["retry_count"] = retry_count + 1
return "continue"
Conditional routing enables adaptive workflows that adjust based on quality metrics, preventing infinite loops while ensuring high-quality outputs.
Executing the Workflow
def process_query(
self,
query: str,
thread_id: str = "default",
max_per_source: int = 3
) -> Dict:
"""Execute the workflow for a research query"""
# Initial state
initial_state: AgentState = {
"query": query,
"papers": [],
"graph_data": {},
"vector_data": {},
"llamaindex_data": {},
"reasoning_result": {},
"critic_feedback": {},
"reflection_feedback": {},
"messages": [],
"current_step": "",
"error": None,
"retry_count": 0,
"stage_outputs": {}
}
# Configuration for state persistence
config = {
"configurable": {"thread_id": thread_id},
"max_per_source": max_per_source,
"recursion_limit": 15
}
# Execute workflow with streaming
logger.info(f"\n{'='*60}")
logger.info(f"Starting LangGraph workflow for: {query}")
logger.info(f"Thread ID: {thread_id}")
logger.info(f"{'='*60}\n")
final_state = None
try:
# Stream execution step-by-step
for step_state in self.workflow.stream(initial_state, config):
# Log progress
for node_name, node_output in step_state.items():
logger.info(f"✓ Completed node: {node_name}")
# Store final state
final_state = node_output
# Extract results
if final_state:
return {
"answer": final_state.get("reasoning_result", {}).get("answer", ""),
"sources": final_state.get("papers", []),
"stage_outputs": final_state.get("stage_outputs", {}),
"messages": final_state.get("messages", []),
"quality_score": final_state.get("reflection_feedback", {}).get("quality_score", 0),
"iterations": final_state.get("retry_count", 0)
}
except Exception as e:
logger.error(f"Workflow execution failed: {e}")
return {
"error": str(e),
"answer": "Workflow execution encountered an error"
}
The workflow.stream() method enables real-time progress monitoring, useful for UIs that need to show incremental progress during long-running operations.
State Persistence and Memory
One of LangGraph's most powerful features is built-in state persistence:
from langgraph.checkpoint.memory import MemorySaver
# Initialize with memory
self.memory = MemorySaver()
workflow = workflow.compile(checkpointer=self.memory)
# Execute with thread ID for persistence
config = {"configurable": {"thread_id": "user_123_session_5"}}
result = workflow.invoke(initial_state, config)
# Later, continue the same conversation
config = {"configurable": {"thread_id": "user_123_session_5"}}
result2 = workflow.invoke(new_state, config)
Benefits:
- Conversation context preservation across requests
- Ability to resume interrupted workflows
- Multi-turn dialogues with memory
- Session management for multiple users
This is similar to session storage in web applications - each thread_id is like a session ID that maintains state across multiple HTTP requests.
Advanced Features
1. Error Handling and Graceful Degradation
def vector_processing_node(self, state: AgentState) -> AgentState:
"""Process papers with error handling"""
try:
result = self.vector_agent.process_papers(state["papers"])
state["vector_data"] = result
except Exception as e:
logger.error(f"Vector processing failed: {e}")
# Graceful degradation - continue with empty vector data
state["vector_data"] = {
"documents": [],
"vectors": [],
"error": str(e)
}
state["messages"].append(f"Vector processing failed (non-critical): {e}")
return state
2. Parallel Node Execution
While the current implementation uses sequential execution, LangGraph supports parallel execution:
# Fork workflow for parallel processing
workflow.add_edge("data_collection", "graph_processing")
workflow.add_edge("data_collection", "vector_processing")
workflow.add_edge("data_collection", "llamaindex_indexing")
# Join after parallel execution
workflow.add_edge(["graph_processing", "vector_processing", "llamaindex_indexing"],
"reasoning")
3. Dynamic Graph Modification
def _build_dynamic_workflow(self, config: Dict) -> CompiledGraph:
"""Build workflow based on configuration"""
workflow = StateGraph(AgentState)
# Add nodes conditionally
if config.get("use_graph", True):
workflow.add_node("graph_processing", self.graph_processing_node)
if config.get("use_vectors", True):
workflow.add_node("vector_processing", self.vector_processing_node)
if config.get("use_llamaindex", True):
workflow.add_node("llamaindex_indexing", self.llamaindex_indexing_node)
# Dynamic edge connections
# ...
return workflow.compile(checkpointer=self.memory)
LlamaIndex: Advanced RAG Framework
Core Concepts
LlamaIndex provides a comprehensive framework for building RAG (Retrieval-Augmented Generation) systems with:
- Document Loaders: Ingest from 100+ data sources
- Node Parsers: Intelligent document chunking
- Vector Stores: Abstraction over vector databases
- Retrievers: Sophisticated retrieval strategies
- Query Engines: End-to-end question answering
- Response Synthesizers: Generate answers from retrieved context
LlamaIndex is like a full-stack ORM for documents - similar to how Prisma abstracts database operations, LlamaIndex abstracts document indexing, retrieval, and querying.
Architecture Overview
Implementation Details
Complete RAG System Setup
from llama_index.core import (
VectorStoreIndex,
Document,
StorageContext,
Settings,
get_response_synthesizer
)
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from qdrant_client import QdrantClient
class LlamaIndexRAG:
"""Production-grade RAG system using LlamaIndex"""
def __init__(
self,
collection_name: str = "research_papers_llamaindex",
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
use_qdrant: bool = True,
qdrant_url: str = None,
qdrant_api_key: str = None
):
self.collection_name = collection_name
# Configure global settings
Settings.embed_model = HuggingFaceEmbedding(
model_name=embedding_model
)
Settings.chunk_size = 512
Settings.chunk_overlap = 50
# Setup vector store
self.storage_context = self._setup_vector_store(
use_qdrant, qdrant_url, qdrant_api_key
)
self.index = None
self.query_engine = None
- chunk_size: 512 tokens balances context preservation and retrieval precision
- chunk_overlap: 50 tokens prevents loss of context at chunk boundaries
- embedding_model: all-MiniLM-L6-v2 provides good quality at 384 dimensions
Vector Store Configuration
def _setup_vector_store(
self,
use_qdrant: bool,
qdrant_url: str,
qdrant_api_key: str
) -> StorageContext:
"""Setup vector store with Qdrant or in-memory fallback"""
if use_qdrant:
try:
# Connect to Qdrant
if qdrant_url:
client = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key
)
else:
# Local Qdrant
client = QdrantClient(host="localhost", port=6333)
# Create Qdrant vector store
vector_store = QdrantVectorStore(
client=client,
collection_name=self.collection_name
)
logger.info(f"✓ Connected to Qdrant: {self.collection_name}")
return StorageContext.from_defaults(
vector_store=vector_store
)
except Exception as e:
logger.warning(f"Qdrant connection failed: {e}, using in-memory")
# Fallback to in-memory vector store
logger.info("Using in-memory vector store")
return StorageContext.from_defaults()
- Production: Use Qdrant for persistent, scalable vector storage
- Development: Use in-memory for quick testing without infrastructure
Document Indexing
def index_documents(self, papers: List[Dict]) -> Dict:
"""Index research papers into LlamaIndex"""
# Convert papers to LlamaIndex Documents
documents = []
for paper in papers:
# Create rich document text
text = f"Title: {paper.get('title', '')}\n\n"
text += f"Abstract: {paper.get('abstract', '')}\n\n"
text += f"Authors: {', '.join(paper.get('authors', []))}\n"
text += f"Year: {paper.get('year', 'Unknown')}\n"
# Full text if available
if paper.get('full_text'):
text += f"\n\nFull Text:\n{paper['full_text']}\n"
# Metadata for filtering and ranking
metadata = {
"title": paper.get("title", ""),
"authors": paper.get("authors", []),
"year": paper.get("year", ""),
"source": paper.get("source", ""),
"url": paper.get("url", ""),
"paper_id": paper.get("id", ""),
"citations": paper.get("citations", 0)
}
# Create Document with metadata
doc = Document(
text=text,
metadata=metadata,
excluded_embed_metadata_keys=["url", "paper_id"] # Don't embed URLs
)
documents.append(doc)
logger.info(f"Indexing {len(documents)} documents...")
# Create vector index
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=self.storage_context,
show_progress=True
)
# Setup query engine
self._setup_query_engine()
return {
"documents_indexed": len(documents),
"collection_name": self.collection_name,
"vector_store": "Qdrant" if self.storage_context.vector_store else "In-Memory",
"embedding_dim": 384 # all-MiniLM-L6-v2 dimension
}
- Include searchable metadata (title, authors, year) for filtering
- Exclude non-semantic metadata (URLs, IDs) from embeddings to reduce noise
- Use citations count for ranking boost
Query Engine Configuration
def _setup_query_engine(self, top_k: int = 5) -> None:
"""Configure advanced query engine"""
# Retriever with similarity-based top-k
retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=top_k
)
# Response synthesis strategy
response_synthesizer = get_response_synthesizer(
response_mode="compact" # Options: compact, refine, tree_summarize
)
# Query engine with post-processing
self.query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer,
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=0.7) # Filter low-relevance
]
)
logger.info(f"Query engine configured with top_k={top_k}")
Response Modes Explained:
| Mode | Description | Best For |
|---|---|---|
compact | Concatenates chunks and generates single response | Most queries, fast |
refine | Iteratively refines answer with each chunk | Complex questions |
tree_summarize | Builds tree of summaries | Long documents |
Response modes are like different SQL query strategies - compact is like a simple JOIN, refine is like iterative aggregation, and tree_summarize is like hierarchical rollups.
Advanced Querying
def query(
self,
question: str,
top_k: int = 5,
filters: Dict = None
) -> Dict:
"""Query indexed documents with advanced features"""
if not self.query_engine:
return {
"error": "No documents indexed yet",
"answer": "",
"sources": []
}
logger.info(f"Querying: {question} (top_k={top_k})")
try:
# Execute query
response = self.query_engine.query(question)
# Extract sources with metadata and scores
sources = []
for node in response.source_nodes:
sources.append({
"text": node.text[:200] + "..." if len(node.text) > 200 else node.text,
"score": node.score,
"metadata": node.metadata,
"node_id": node.node_id
})
# Sort by relevance score
sources.sort(key=lambda x: x["score"], reverse=True)
result = {
"answer": str(response),
"sources": sources,
"num_sources": len(sources),
"query": question
}
logger.info(f"✓ Retrieved {len(sources)} relevant sources")
return result
except Exception as e:
logger.error(f"Query failed: {e}")
return {
"error": str(e),
"answer": "Query execution failed",
"sources": []
}
Metadata Filtering
def query_with_filters(
self,
question: str,
year_min: int = None,
year_max: int = None,
authors: List[str] = None,
min_citations: int = None
) -> Dict:
"""Query with metadata filters"""
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter, FilterOperator
# Build filter list
filters_list = []
if year_min:
filters_list.append(
ExactMatchFilter(key="year", value=year_min, operator=FilterOperator.GTE)
)
if year_max:
filters_list.append(
ExactMatchFilter(key="year", value=year_max, operator=FilterOperator.LTE)
)
if authors:
for author in authors:
filters_list.append(
ExactMatchFilter(key="authors", value=author, operator=FilterOperator.CONTAINS)
)
if min_citations:
filters_list.append(
ExactMatchFilter(key="citations", value=min_citations, operator=FilterOperator.GTE)
)
# Create metadata filters
metadata_filters = MetadataFilters(filters=filters_list)
# Create filtered retriever
filtered_retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=5,
filters=metadata_filters
)
# Execute query with filters
query_engine = RetrieverQueryEngine(
retriever=filtered_retriever,
response_synthesizer=self.query_engine._response_synthesizer
)
response = query_engine.query(question)
# Format response
return self._format_response(response, question)
Metadata filtering enables precise queries like "papers by X published after 2020 with >100 citations" - critical for research applications.
Advanced RAG Patterns
1. Hybrid Search (Vector + Keyword)
from llama_index.core.retrievers import BaseRetriever
from llama_index.retrievers.bm25 import BM25Retriever
def create_hybrid_retriever(self) -> BaseRetriever:
"""Combine vector similarity and BM25 keyword search"""
# Vector retriever
vector_retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=10
)
# BM25 keyword retriever
bm25_retriever = BM25Retriever.from_defaults(
index=self.index,
similarity_top_k=10
)
# Combine with reciprocal rank fusion
from llama_index.core.retrievers import QueryFusionRetriever
hybrid_retriever = QueryFusionRetriever(
[vector_retriever, bm25_retriever],
similarity_top_k=5,
num_queries=1, # No query generation
mode="reciprocal_rerank", # Fusion strategy
use_async=False
)
return hybrid_retriever
Benefits:
- Vector search: Captures semantic similarity
- BM25: Captures exact keyword matches
- Fusion: Combines best of both approaches
2. Hierarchical Document Structure
def create_hierarchical_index(self, papers: List[Dict]):
"""Create parent-child document structure"""
from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.core.node_parser import get_leaf_nodes
# Parse documents into hierarchy
node_parser = HierarchicalNodeParser.from_defaults(
chunk_sizes=[2048, 512, 128] # Parent -> Child -> Leaf
)
# Create documents
documents = [Document(text=paper['full_text']) for paper in papers]
# Parse into hierarchical nodes
nodes = node_parser.get_nodes_from_documents(documents)
# Get leaf nodes for indexing
leaf_nodes = get_leaf_nodes(nodes)
# Create index from leaf nodes
self.index = VectorStoreIndex(
leaf_nodes,
storage_context=self.storage_context
)
Hierarchical indexing enables zooming from document summaries (parent) to specific details (leaf) - useful for long research papers.
3. Query Transformations
def query_with_transformations(self, question: str) -> Dict:
"""Apply query transformations before retrieval"""
from llama_index.core.indices.query.query_transform import HyDEQueryTransform
# Hypothetical Document Embeddings (HyDE)
# Generates hypothetical answer, embeds it, uses for retrieval
hyde = HyDEQueryTransform(include_original=True)
# Transform query
transformed_query = hyde(question)
# Retrieve with transformed query
response = self.query_engine.query(transformed_query)
return self._format_response(response, question)
HyDE Explained:
- Generate hypothetical answer to question
- Embed hypothetical answer
- Use embedding to find similar documents
- Generate actual answer from retrieved documents
HyDE works well when the question uses different terminology than the documents (e.g., asking in layman's terms about technical content).
4. Response Evaluation and Scoring
def query_with_evaluation(self, question: str) -> Dict:
"""Query with automatic response evaluation"""
from llama_index.core.evaluation import FaithfulnessEvaluator, RelevancyEvaluator
# Execute query
response = self.query_engine.query(question)
# Evaluate faithfulness (answer supported by sources?)
faithfulness_evaluator = FaithfulnessEvaluator()
faithfulness_result = faithfulness_evaluator.evaluate_response(
query=question,
response=response
)
# Evaluate relevancy (answer addresses question?)
relevancy_evaluator = RelevancyEvaluator()
relevancy_result = relevancy_evaluator.evaluate_response(
query=question,
response=response
)
return {
"answer": str(response),
"sources": self._extract_sources(response),
"evaluation": {
"faithfulness": faithfulness_result.score,
"relevancy": relevancy_result.score,
"feedback": {
"faithfulness": faithfulness_result.feedback,
"relevancy": relevancy_result.feedback
}
}
}
Automatic evaluation ensures responses meet quality standards before returning to users - critical for production RAG systems.
Storage and Persistence
Saving and Loading Indexes
def save_index(self, persist_dir: str = "./storage"):
"""Persist index to disk"""
if self.index:
self.index.storage_context.persist(persist_dir=persist_dir)
logger.info(f"✓ Index saved to {persist_dir}")
else:
logger.warning("No index to save")
def load_index(self, persist_dir: str = "./storage"):
"""Load index from disk"""
from llama_index.core import load_index_from_storage
try:
# Load storage context
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
# Load index
self.index = load_index_from_storage(storage_context)
# Setup query engine
self._setup_query_engine()
logger.info(f"✓ Index loaded from {persist_dir}")
return True
except Exception as e:
logger.error(f"Failed to load index: {e}")
return False
Persisting indexes avoids re-embedding documents on every restart, saving significant API costs and startup time.
Incremental Updates
def add_documents(self, new_papers: List[Dict]) -> Dict:
"""Add new documents to existing index"""
if not self.index:
logger.warning("No existing index, creating new one")
return self.index_documents(new_papers)
# Convert to Documents
new_docs = []
for paper in new_papers:
text = self._format_paper_text(paper)
metadata = self._extract_paper_metadata(paper)
new_docs.append(Document(text=text, metadata=metadata))
# Insert into existing index
for doc in new_docs:
self.index.insert(doc)
logger.info(f"✓ Added {len(new_docs)} documents to index")
# Refresh query engine
self._setup_query_engine()
return {
"documents_added": len(new_docs),
"total_documents": self.index.docstore.get_document_count()
}
Integration: LangGraph + LlamaIndex
Why Use Both?
LangGraph provides:
- Workflow orchestration
- State management
- Conditional logic
- Error handling
LlamaIndex provides:
- Document indexing
- Semantic retrieval
- Response synthesis
- Query optimization
Together, they create a production-grade system where:
- LangGraph manages the overall workflow
- LlamaIndex handles document intelligence
- Each focuses on its strength
Integration Architecture
Complete Integration Example
class ProductionRAGSystem:
"""Complete system integrating LangGraph and LlamaIndex"""
def __init__(self):
# Initialize LlamaIndex RAG
self.llamaindex = LlamaIndexRAG(
collection_name="research_papers",
use_qdrant=True,
qdrant_url=os.getenv("QDRANT_URL")
)
# Initialize LangGraph orchestrator
self.orchestrator = LangGraphOrchestrator(
llamaindex=self.llamaindex
)
def research_query(
self,
question: str,
session_id: str = None,
max_papers: int = 10
) -> Dict:
"""Execute complete research workflow"""
# Generate session ID if not provided
session_id = session_id or f"session_{uuid.uuid4()}"
logger.info(f"\n{'='*70}")
logger.info(f"PRODUCTION RAG SYSTEM - Research Query")
logger.info(f"Question: {question}")
logger.info(f"Session: {session_id}")
logger.info(f"{'='*70}\n")
# Execute LangGraph workflow
# This will:
# 1. Collect papers from 7 sources
# 2. Build knowledge graph
# 3. Create vector embeddings
# 4. Index in LlamaIndex
# 5. Generate answer using all sources
# 6. Self-reflect on quality
# 7. Return comprehensive result
result = self.orchestrator.process_query(
query=question,
thread_id=session_id,
max_per_source=max_papers
)
return result
def follow_up_query(
self,
question: str,
session_id: str
) -> Dict:
"""Ask follow-up question in same session"""
# LangGraph will load state from session_id
# LlamaIndex will query already-indexed documents
# No need to re-collect papers!
logger.info(f"Follow-up query in session: {session_id}")
result = self.orchestrator.process_query(
query=question,
thread_id=session_id,
max_per_source=0 # Don't collect new papers
)
return result
Usage Example
# Initialize system
rag_system = ProductionRAGSystem()
# First query
result1 = rag_system.research_query(
question="What are the latest advances in multi-agent RAG systems?",
session_id="research_session_1",
max_papers=15
)
print(f"Answer: {result1['answer']}")
print(f"Sources: {len(result1['sources'])} papers")
print(f"Quality Score: {result1['quality_score']}")
print(f"Stages: {list(result1['stage_outputs'].keys())}")
# Follow-up query (reuses indexed documents)
result2 = rag_system.follow_up_query(
question="How do these systems handle conflicting information?",
session_id="research_session_1"
)
print(f"Follow-up Answer: {result2['answer']}")
Output:
====================================================================
PRODUCTION RAG SYSTEM - Research Query
Question: What are the latest advances in multi-agent RAG systems?
Session: research_session_1
====================================================================
✓ Completed node: data_collection
✓ Completed node: graph_processing
✓ Completed node: vector_processing
✓ Completed node: llamaindex_indexing
✓ Completed node: reasoning
✓ Completed node: self_reflection
✓ Completed node: critic_review
Answer: Recent advances in multi-agent RAG systems focus on...
Sources: 15 papers
Quality Score: 0.87
Stages: ['data_collection', 'graph_processing', 'vector_processing',
'llamaindex_indexing', 'reasoning', 'self_reflection', 'critic_review']
Framework Comparison
LangGraph vs Traditional Orchestration
| Feature | LangGraph | Traditional Code |
|---|---|---|
| State Management | Built-in StateGraph | Manual tracking |
| Persistence | Automatic checkpointing | Custom database code |
| Conditional Routing | Declarative edges | If/else logic |
| Error Recovery | Built-in retry mechanisms | Manual try/catch |
| Visualization | Graph visualization tools | No built-in tools |
| Testing | Mock individual nodes | Mock entire workflow |
LlamaIndex vs Manual RAG
| Feature | LlamaIndex | Manual Implementation |
|---|---|---|
| Vector Store Abstraction | Single API for all stores | Custom per store |
| Document Parsing | 100+ loaders | Custom parsers |
| Retrieval Strategies | 10+ built-in | Implement from scratch |
| Response Synthesis | 5 modes | Manual generation |
| Evaluation | Built-in evaluators | Custom metrics |
| Metadata Filtering | Declarative filters | Custom SQL/queries |
When to Use Each Framework
Use LangGraph when:
- Building multi-step agent workflows
- Need conversation memory/persistence
- Require conditional workflow logic
- Want visualization and debugging tools
Use LlamaIndex when:
- Building RAG applications
- Need document indexing and retrieval
- Want to abstract vector database complexity
- Require advanced query optimization
Use Both when:
- Building production-grade multi-agent RAG systems
- Need orchestration AND intelligent retrieval
- Want best-in-class for each concern
Production Deployment Considerations
Performance Optimization
1. Batch Processing
def index_documents_batch(
self,
papers: List[Dict],
batch_size: int = 100
) -> Dict:
"""Index documents in batches to avoid memory issues"""
total = len(papers)
indexed = 0
for i in range(0, total, batch_size):
batch = papers[i:i + batch_size]
logger.info(f"Indexing batch {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}")
if i == 0:
# Create initial index
self.index_documents(batch)
else:
# Add to existing index
self.add_documents(batch)
indexed += len(batch)
return {"total_indexed": indexed}
2. Caching
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_query(self, question_hash: str, top_k: int) -> str:
"""Cache query results"""
# Actual query executed only on cache miss
response = self.query_engine.query(question)
return str(response)
def query_with_cache(self, question: str, top_k: int = 5) -> Dict:
"""Query with caching"""
# Create hash of question
question_hash = hashlib.md5(question.encode()).hexdigest()
# Check cache
cached_answer = self.cached_query(question_hash, top_k)
return {"answer": cached_answer, "cached": True}
3. Async Operations
import asyncio
from llama_index.core.query_engine import RetrieverQueryEngine
async def async_query(self, question: str) -> Dict:
"""Asynchronous querying for concurrent requests"""
# Use async query engine
response = await self.query_engine.aquery(question)
return self._format_response(response, question)
async def batch_queries(self, questions: List[str]) -> List[Dict]:
"""Process multiple queries concurrently"""
tasks = [self.async_query(q) for q in questions]
results = await asyncio.gather(*tasks)
return results
Monitoring and Observability
LangGraph Workflow Monitoring
def process_query_with_monitoring(
self,
query: str,
thread_id: str
) -> Dict:
"""Execute workflow with detailed monitoring"""
import time
# Track execution metrics
metrics = {
"start_time": time.time(),
"node_durations": {},
"node_errors": []
}
# Execute with monitoring
for step_state in self.workflow.stream(initial_state, config):
for node_name, node_output in step_state.items():
# Track duration
node_start = time.time()
# Check for errors
if node_output.get("error"):
metrics["node_errors"].append({
"node": node_name,
"error": node_output["error"]
})
# Log metrics
duration = time.time() - node_start
metrics["node_durations"][node_name] = duration
logger.info(f"Node {node_name}: {duration:.2f}s")
metrics["total_duration"] = time.time() - metrics["start_time"]
# Send metrics to monitoring system
self._send_metrics(metrics)
return result
LlamaIndex Query Monitoring
def query_with_monitoring(
self,
question: str,
top_k: int = 5
) -> Dict:
"""Query with detailed metrics"""
import time
start_time = time.time()
# Execute query
response = self.query_engine.query(question)
# Calculate metrics
retrieval_time = time.time() - start_time
num_tokens = len(str(response).split())
# Log to monitoring
metrics = {
"retrieval_time_ms": retrieval_time * 1000,
"num_sources": len(response.source_nodes),
"num_tokens": num_tokens,
"avg_source_score": sum(n.score for n in response.source_nodes) / len(response.source_nodes),
"timestamp": time.time()
}
logger.info(f"Query metrics: {metrics}")
# Return with metrics
result = self._format_response(response, question)
result["metrics"] = metrics
return result
Error Handling
Graceful Degradation
def robust_reasoning_node(self, state: AgentState) -> AgentState:
"""Reasoning with fallback strategies"""
query = state["query"]
# Try LlamaIndex RAG first
try:
if state.get("llamaindex_data", {}).get("documents_indexed", 0) > 0:
rag_result = self.llamaindex.query(query, top_k=3)
context = rag_result["answer"]
else:
raise Exception("LlamaIndex not available")
except Exception as e:
logger.warning(f"LlamaIndex failed: {e}, trying vector search")
# Fallback to vector search
try:
vector_docs = state["vector_data"].get("documents", [])
context = "\n".join([doc["text"] for doc in vector_docs[:3]])
except Exception as e2:
logger.warning(f"Vector search failed: {e2}, using graph only")
# Final fallback to knowledge graph
context = self._extract_graph_context(state["graph_data"])
# Generate answer with whatever context we got
answer = self._generate_answer(query, context)
state["reasoning_result"] = {"answer": answer}
return state
Scaling Strategies
1. Horizontal Scaling
# Deploy multiple instances of the RAG system
# Use load balancer to distribute requests
# Share Qdrant vector store across instances
# Example with Docker Compose
"""
services:
rag-system-1:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333
rag-system-2:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333
rag-system-3:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333
qdrant:
image: qdrant/qdrant
volumes:
- qdrant_storage:/qdrant/storage
load-balancer:
image: nginx
ports:
- "80:80"
depends_on:
- rag-system-1
- rag-system-2
- rag-system-3
"""
2. Index Sharding
def create_sharded_indexes(
self,
papers: List[Dict],
num_shards: int = 4
) -> List[LlamaIndexRAG]:
"""Shard documents across multiple indexes"""
# Divide papers by topic/source/year
shards = self._shard_papers(papers, num_shards)
# Create index per shard
indexes = []
for i, shard in enumerate(shards):
rag = LlamaIndexRAG(
collection_name=f"research_papers_shard_{i}"
)
rag.index_documents(shard)
indexes.append(rag)
return indexes
def query_sharded(
self,
question: str,
indexes: List[LlamaIndexRAG]
) -> Dict:
"""Query all shards and merge results"""
all_results = []
# Query each shard
for index in indexes:
result = index.query(question, top_k=2)
all_results.extend(result["sources"])
# Merge and re-rank
all_results.sort(key=lambda x: x["score"], reverse=True)
top_results = all_results[:5]
# Synthesize final answer from top results
final_answer = self._synthesize_from_sources(question, top_results)
return {
"answer": final_answer,
"sources": top_results,
"shards_queried": len(indexes)
}
Best Practices
1. State Design in LangGraph
# ✅ GOOD: Flat, simple state structure
class AgentState(TypedDict):
query: str
papers: List[Dict]
answer: str
error: str | None
# ❌ BAD: Nested, complex state
class AgentState(TypedDict):
data: {
"input": {"query": str, "params": Dict},
"intermediate": {"step1": Dict, "step2": Dict},
"output": {"answer": str, "meta": Dict}
}
2. Document Metadata in LlamaIndex
# ✅ GOOD: Searchable, structured metadata
metadata = {
"title": "Paper Title",
"year": 2024,
"authors": ["Author 1", "Author 2"],
"citations": 150,
"source": "arxiv"
}
# ❌ BAD: Non-searchable, unstructured metadata
metadata = {
"info": "Paper Title by Author 1, Author 2 (2024) - 150 citations - arxiv"
}
3. Error Handling
# ✅ GOOD: Specific error handling with fallbacks
try:
result = self.llamaindex.query(question)
except QdrantConnectionError:
logger.warning("Qdrant unavailable, using in-memory")
result = self.fallback_index.query(question)
except Exception as e:
logger.error(f"Query failed: {e}")
result = {"error": str(e), "answer": "Unable to process query"}
# ❌ BAD: Catch-all with no recovery
try:
result = self.llamaindex.query(question)
except:
pass
4. Resource Management
# ✅ GOOD: Cleanup and resource limits
def __del__(self):
"""Cleanup resources"""
if self.index:
self.index.storage_context.persist()
if self.qdrant_client:
self.qdrant_client.close()
# Set memory limits
Settings.chunk_size = 512 # Not too large
Settings.num_output = 256 # Limit response length
# ❌ BAD: No cleanup, unlimited resources
# (Just let everything run forever and consume all memory)
5. Testing
# ✅ GOOD: Test individual components
def test_llamaindex_indexing():
"""Test document indexing"""
rag = LlamaIndexRAG()
papers = [{"title": "Test", "abstract": "Test abstract"}]
result = rag.index_documents(papers)
assert result["documents_indexed"] == 1
assert rag.index is not None
def test_langgraph_node():
"""Test individual workflow node"""
orchestrator = LangGraphOrchestrator()
state = {"query": "test", "papers": []}
new_state = orchestrator.data_collection_node(state)
assert len(new_state["papers"]) > 0
# ❌ BAD: Only test end-to-end
def test_everything():
system = ProductionRAGSystem()
result = system.research_query("test")
assert result # Too vague!
Common Pitfalls and Solutions
Pitfall 1: Infinite Loops in LangGraph
Problem: Conditional edges can create infinite loops
# ❌ DANGEROUS
workflow.add_conditional_edges(
"quality_check",
lambda state: "improve" if state["score"] < 0.9 else "end",
{"improve": "reasoning", "end": END}
)
workflow.add_edge("reasoning", "quality_check")
# If score never reaches 0.9, infinite loop!
Solution: Add iteration limits
# ✅ SAFE
def should_continue(state: AgentState) -> str:
if state["retry_count"] >= 3:
return "end" # Maximum 3 iterations
if state["score"] >= 0.75: # Reasonable threshold
return "end"
state["retry_count"] += 1
return "improve"
Pitfall 2: Vector Store Memory Leaks
Problem: Creating new indexes without cleanup
# ❌ MEMORY LEAK
for batch in large_dataset:
index = VectorStoreIndex.from_documents(batch)
# Never cleaned up, memory grows
Solution: Reuse indexes and cleanup
# ✅ MEMORY EFFICIENT
index = None
for batch in large_dataset:
if index is None:
index = VectorStoreIndex.from_documents(batch)
else:
for doc in batch:
index.insert(doc)
# Periodically persist and clear memory
if batch_count % 10 == 0:
index.storage_context.persist()
Pitfall 3: Embedding Dimension Mismatches
Problem: Mixing embedding models with different dimensions
# ❌ DIMENSION MISMATCH
# Index created with all-MiniLM-L6-v2 (384 dim)
Settings.embed_model = HuggingFaceEmbedding("all-MiniLM-L6-v2")
index = VectorStoreIndex.from_documents(docs)
# Later, query with different model (768 dim) - FAILS!
Settings.embed_model = HuggingFaceEmbedding("all-mpnet-base-v2")
response = index.query("question") # Error: dimension mismatch
Solution: Lock embedding model
# ✅ CONSISTENT
class LlamaIndexRAG:
def __init__(self, embedding_model: str):
self.embedding_model_name = embedding_model
Settings.embed_model = HuggingFaceEmbedding(embedding_model)
# Save with index
self.index_metadata = {"embedding_model": embedding_model}
def load_index(self, persist_dir: str):
# Verify embedding model matches
metadata = json.load(open(f"{persist_dir}/metadata.json"))
if metadata["embedding_model"] != self.embedding_model_name:
raise ValueError(f"Embedding model mismatch!")
Pitfall 4: Blocking Operations in Async Contexts
Problem: Using sync operations in async workflows
# ❌ BLOCKS EVENT LOOP
async def async_query_handler(question: str):
# This blocks!
response = self.query_engine.query(question)
return response
Solution: Use async methods
# ✅ NON-BLOCKING
async def async_query_handler(question: str):
# Proper async
response = await self.query_engine.aquery(question)
return response
Real-World Performance Metrics
ResearcherAI Production Stats
System Configuration:
- LangGraph 0.2.45
- LlamaIndex 0.12.0
- Qdrant vector store (cloud)
- HuggingFace embeddings (all-MiniLM-L6-v2)
Performance Metrics (from production deployment):
| Operation | Average Time | Notes |
|---|---|---|
| Document Indexing (100 papers) | 12.3s | Including embedding generation |
| LangGraph Full Workflow | 24.7s | All 8 nodes |
| LlamaIndex Query (top-5) | 0.8s | Cached embeddings |
| End-to-End Research Query | 28.5s | First query in session |
| Follow-up Query | 2.1s | Using cached index |
Accuracy Metrics:
| Metric | Score | Measurement |
|---|---|---|
| Answer Relevance | 0.87 | LlamaIndex evaluator |
| Source Faithfulness | 0.92 | LlamaIndex evaluator |
| Self-Reflection Quality | 0.84 | LangGraph critic node |
| User Satisfaction | 4.2/5 | Manual evaluation |
Resource Usage:
| Resource | Usage | Configuration |
|---|---|---|
| Memory (per instance) | 2.1 GB | With 1000 docs indexed |
| CPU (average) | 15% | 2 vCPUs |
| Qdrant Storage | 450 MB | 1000 papers, 384-dim vectors |
| Request Throughput | 12 req/min | Single instance |
Summary
This chapter covered the two foundational frameworks powering ResearcherAI:
LangGraph
- Purpose: Stateful workflow orchestration for multi-agent systems
- Key Features: State graphs, conditional routing, checkpointing, streaming
- Best For: Complex multi-step workflows, conversation memory, adaptive logic
- Production Ready: Built-in persistence, error handling, monitoring hooks
LlamaIndex
- Purpose: Advanced RAG framework for document indexing and retrieval
- Key Features: Vector store abstraction, query optimization, response synthesis
- Best For: Semantic search, document Q&A, knowledge retrieval
- Production Ready: Multiple vector stores, evaluation tools, async support
Integration Benefits
- LangGraph handles macro-level orchestration
- LlamaIndex handles micro-level document intelligence
- Together: Production-grade multi-agent RAG system
- Proven: 19/19 tests passing, deployed in production
Next Steps
In the next chapter, we'll dive into the backend implementation, exploring how these frameworks integrate with:
- Neo4j knowledge graphs
- Qdrant vector databases
- Apache Kafka event streaming
- FastAPI web services
We'll see how the theoretical concepts from this chapter translate into production code that scales.
- LangGraph Documentation: https://langchain-ai.github.io/langgraph/
- LlamaIndex Documentation: https://docs.llamaindex.ai/
- ResearcherAI Implementation: See
agents/langgraph_orchestrator.pyandagents/llamaindex_rag.py - Test Suite: See
test_langgraph_llamaindex.pyfor working examples