Skip to main content

Chapter 4: Orchestration Frameworks - LangGraph and LlamaIndex

Introduction

Building production-grade multi-agent systems requires robust orchestration frameworks that can manage complex workflows, state persistence, and intelligent document retrieval. In this chapter, we'll explore two foundational frameworks that power ResearcherAI:

  • LangGraph: A stateful workflow orchestration framework for building sophisticated agent systems
  • LlamaIndex: An advanced RAG (Retrieval-Augmented Generation) framework for intelligent document indexing and retrieval

These frameworks work together to create a powerful, production-ready system that can handle complex research queries across multiple data sources.

For Web Developers

Think of LangGraph as a state machine orchestrator (like Redux for backend workflows) and LlamaIndex as a smart search engine with built-in AI capabilities (like Elasticsearch + GPT combined).

Why These Frameworks Matter

The Challenges They Solve

  1. State Management Complexity

    • Multi-step workflows need to maintain state across operations
    • Conversations require context preservation
    • Error recovery needs state rollback capabilities
  2. Document Retrieval Accuracy

    • Simple keyword search isn't enough for research queries
    • Need semantic understanding of document content
    • Require intelligent ranking and filtering
  3. Scalability and Performance

    • Large document collections need efficient indexing
    • Complex workflows need parallel execution
    • Production systems need fault tolerance

What Makes Them Production-Ready

LangGraph:

  • Built-in state persistence with checkpointing
  • Conditional routing for adaptive workflows
  • Stream processing for real-time feedback
  • Error handling and retry mechanisms

LlamaIndex:

  • Vector database abstraction layer
  • Advanced query optimization
  • Metadata-aware retrieval
  • Response synthesis and post-processing

LangGraph: Stateful Workflow Orchestration

Core Concepts

LangGraph treats your multi-agent system as a directed graph where:

  • Nodes represent operations (agent actions)
  • Edges represent transitions between operations
  • State flows through the graph and is updated at each node
  • Checkpointing enables persistence and recovery
Web Developer Analogy

If you've used React Router or state machines, LangGraph is similar but for backend agent workflows. Each node is like a route handler, and edges are like navigation rules.

Architecture Overview

Implementation Details

State Definition

The first step is defining your workflow state - the data structure that flows through your graph:

from typing import TypedDict, List, Dict
from langgraph.graph import StateGraph, END

class AgentState(TypedDict):
"""State shared across all agents in the workflow"""
# User input
query: str

# Data collected from sources
papers: List[Dict]

# Processing results
graph_data: Dict # Knowledge graph representation
vector_data: Dict # Vector embeddings
llamaindex_data: Dict # LlamaIndex RAG data

# Reasoning outputs
reasoning_result: Dict
critic_feedback: Dict
reflection_feedback: Dict

# Workflow metadata
messages: List[str] # Execution log
current_step: str # Current node name
error: str | None # Error tracking
retry_count: int # Retry attempts
stage_outputs: Dict # Per-stage results
For Web Developers

This is like defining a TypeScript interface for your Redux state. Each field represents a piece of data that different "reducers" (nodes) can read and update.

Building the Workflow Graph

class LangGraphOrchestrator:
"""Production-grade LangGraph workflow orchestrator"""

def __init__(self):
# Initialize agent dependencies
self.data_collector = DataCollectorAgent()
self.graph_agent = KnowledgeGraphAgent()
self.vector_agent = VectorAgent()
self.llamaindex = LlamaIndexRAG()
self.reasoning_agent = ReasoningAgent()
self.evaluator = EvaluatorAgent()

# Create workflow
self.workflow = self._build_workflow()

# State persistence
self.memory = MemorySaver()

def _build_workflow(self) -> CompiledGraph:
"""Build and compile the LangGraph workflow"""

# Create the graph
workflow = StateGraph(AgentState)

# Add nodes (operations)
workflow.add_node("data_collection", self.data_collection_node)
workflow.add_node("graph_processing", self.graph_processing_node)
workflow.add_node("vector_processing", self.vector_processing_node)
workflow.add_node("llamaindex_indexing", self.llamaindex_indexing_node)
workflow.add_node("reasoning", self.reasoning_node)
workflow.add_node("self_reflection", self.self_reflection_node)
workflow.add_node("critic_review", self.critic_review_node)
workflow.add_node("correction", self.correction_node)

# Define edges (transitions)
workflow.set_entry_point("data_collection")

# Linear flow for data processing
workflow.add_edge("data_collection", "graph_processing")
workflow.add_edge("graph_processing", "vector_processing")
workflow.add_edge("vector_processing", "llamaindex_indexing")
workflow.add_edge("llamaindex_indexing", "reasoning")

# Self-reflection and quality control
workflow.add_edge("reasoning", "self_reflection")
workflow.add_edge("self_reflection", "critic_review")

# Conditional routing based on quality
workflow.add_conditional_edges(
"critic_review",
self._should_continue,
{
"continue": "correction",
"end": END
}
)

workflow.add_edge("correction", "reasoning")

# Compile with state persistence
return workflow.compile(checkpointer=self.memory)
Key Features
  • MemorySaver: Enables conversation persistence across sessions
  • Conditional Edges: Dynamic routing based on runtime conditions
  • Checkpointing: Automatic state snapshots at each node

Implementing Workflow Nodes

Each node is a function that receives state, performs an operation, and updates state:

1. Data Collection Node

def data_collection_node(self, state: AgentState) -> AgentState:
"""Collect research papers from multiple sources"""
query = state["query"]
max_per_source = state.get("max_per_source", 3)

logger.info(f"DATA COLLECTION: Starting for query: {query}")

# Collect from 7 sources
papers = self.data_collector.collect_papers(
query=query,
max_per_source=max_per_source
)

# Update state
state["papers"] = papers
state["messages"].append(f"Collected {len(papers)} papers")
state["stage_outputs"]["data_collection"] = {
"papers_collected": len(papers),
"sources": ["arxiv", "semantic_scholar", "pubmed", "zenodo",
"web", "huggingface", "kaggle"]
}

return state
For Web Developers

Each node is like an Express.js middleware function - it receives the current state (like req), performs operations, and passes updated state to the next node (like calling next()).

2. LlamaIndex Indexing Node

def llamaindex_indexing_node(self, state: AgentState) -> AgentState:
"""Index documents in LlamaIndex for RAG retrieval"""
papers = state["papers"]

logger.info(f"LLAMAINDEX INDEXING: Indexing {len(papers)} papers")

try:
# Use LlamaIndex to create vector index
result = self.llamaindex.index_documents(papers)

state["llamaindex_data"] = result
state["stage_outputs"]["llamaindex_indexing"] = {
"documents_indexed": result.get("documents_indexed", 0),
"vector_store": result.get("vector_store", "Unknown"),
"collection": result.get("collection_name", "N/A"),
"embedding_dim": result.get("embedding_dim", 384)
}
state["messages"].append(
f"Indexed {result.get('documents_indexed', 0)} docs in LlamaIndex"
)

except Exception as e:
logger.error(f"LlamaIndex indexing failed: {e}")
state["error"] = str(e)
state["llamaindex_data"] = {}

return state

3. Reasoning Node with RAG Enhancement

def reasoning_node(self, state: AgentState) -> AgentState:
"""Generate answer using multi-source context"""
query = state["query"]

# Gather context from all sources
graph_context = self._extract_graph_context(state["graph_data"])
vector_context = self._extract_vector_context(state["vector_data"])

# Get additional context from LlamaIndex RAG
llamaindex_context = ""
if state.get("llamaindex_data", {}).get("documents_indexed", 0) > 0:
try:
rag_result = self.llamaindex.query(query, top_k=3)

# Format RAG results
llamaindex_context = "\n\nLlamaIndex RAG Context:\n"
llamaindex_context += f"Answer: {rag_result.get('answer', '')}\n"

# Include sources with scores
for i, source in enumerate(rag_result.get('sources', []), 1):
llamaindex_context += f"\nSource {i} (score: {source['score']:.3f}):\n"
llamaindex_context += f"{source['text']}\n"

except Exception as e:
logger.warning(f"LlamaIndex query failed: {e}")

# Generate comprehensive answer
prompt = f"""
Based on the research question and multiple sources of context, provide a comprehensive answer.

Question: {query}

Knowledge Graph Context:
{graph_context}

Vector Search Context:
{vector_context}
{llamaindex_context}

Provide a detailed, well-cited answer:"""

response = self.model.generate_content(prompt)
answer = response.text.strip()

# Store results
state["reasoning_result"] = {
"answer": answer,
"sources_used": {
"graph": len(state["graph_data"].get("nodes", [])),
"vector": len(state["vector_data"].get("documents", [])),
"llamaindex": len(rag_result.get('sources', []))
}
}

return state
Multi-Source RAG

This demonstrates how LangGraph orchestrates multiple RAG sources (knowledge graph, vector search, LlamaIndex) to provide comprehensive answers with diverse perspectives.

4. Self-Reflection Node

def self_reflection_node(self, state: AgentState) -> AgentState:
"""Self-critical evaluation of generated answer"""
answer = state["reasoning_result"].get("answer", "")
query = state["query"]

reflection_prompt = f"""
Evaluate the quality of this answer critically.

Question: {query}

Answer: {answer}

Provide a JSON evaluation with:
{{
"quality_score": <0-1>,
"completeness": <0-1>,
"accuracy_confidence": <0-1>,
"suggestions": ["improvement 1", "improvement 2"],
"needs_improvement": <true/false>
}}
"""

response = self.model.generate_content(reflection_prompt)

try:
reflection = json.loads(response.text)
state["reflection_feedback"] = reflection

logger.info(f"Self-reflection score: {reflection.get('quality_score', 0)}")

except json.JSONDecodeError:
# Fallback if JSON parsing fails
state["reflection_feedback"] = {
"quality_score": 0.7,
"needs_improvement": False
}

return state

5. Conditional Routing Function

def _should_continue(self, state: AgentState) -> str:
"""Decide whether to continue improvement or end"""

# Check quality thresholds
quality_score = state.get("reflection_feedback", {}).get("quality_score", 0.8)
retry_count = state.get("retry_count", 0)

# Maximum 2 improvement iterations
if retry_count >= 2:
logger.info("Maximum retries reached, ending workflow")
return "end"

# Quality threshold
if quality_score >= 0.75:
logger.info(f"Quality acceptable ({quality_score:.2f}), ending workflow")
return "end"
else:
logger.info(f"Quality low ({quality_score:.2f}), attempting correction")
state["retry_count"] = retry_count + 1
return "continue"
Production Pattern

Conditional routing enables adaptive workflows that adjust based on quality metrics, preventing infinite loops while ensuring high-quality outputs.

Executing the Workflow

def process_query(
self,
query: str,
thread_id: str = "default",
max_per_source: int = 3
) -> Dict:
"""Execute the workflow for a research query"""

# Initial state
initial_state: AgentState = {
"query": query,
"papers": [],
"graph_data": {},
"vector_data": {},
"llamaindex_data": {},
"reasoning_result": {},
"critic_feedback": {},
"reflection_feedback": {},
"messages": [],
"current_step": "",
"error": None,
"retry_count": 0,
"stage_outputs": {}
}

# Configuration for state persistence
config = {
"configurable": {"thread_id": thread_id},
"max_per_source": max_per_source,
"recursion_limit": 15
}

# Execute workflow with streaming
logger.info(f"\n{'='*60}")
logger.info(f"Starting LangGraph workflow for: {query}")
logger.info(f"Thread ID: {thread_id}")
logger.info(f"{'='*60}\n")

final_state = None

try:
# Stream execution step-by-step
for step_state in self.workflow.stream(initial_state, config):
# Log progress
for node_name, node_output in step_state.items():
logger.info(f"✓ Completed node: {node_name}")

# Store final state
final_state = node_output

# Extract results
if final_state:
return {
"answer": final_state.get("reasoning_result", {}).get("answer", ""),
"sources": final_state.get("papers", []),
"stage_outputs": final_state.get("stage_outputs", {}),
"messages": final_state.get("messages", []),
"quality_score": final_state.get("reflection_feedback", {}).get("quality_score", 0),
"iterations": final_state.get("retry_count", 0)
}

except Exception as e:
logger.error(f"Workflow execution failed: {e}")
return {
"error": str(e),
"answer": "Workflow execution encountered an error"
}
Streaming Execution

The workflow.stream() method enables real-time progress monitoring, useful for UIs that need to show incremental progress during long-running operations.

State Persistence and Memory

One of LangGraph's most powerful features is built-in state persistence:

from langgraph.checkpoint.memory import MemorySaver

# Initialize with memory
self.memory = MemorySaver()
workflow = workflow.compile(checkpointer=self.memory)

# Execute with thread ID for persistence
config = {"configurable": {"thread_id": "user_123_session_5"}}
result = workflow.invoke(initial_state, config)

# Later, continue the same conversation
config = {"configurable": {"thread_id": "user_123_session_5"}}
result2 = workflow.invoke(new_state, config)

Benefits:

  • Conversation context preservation across requests
  • Ability to resume interrupted workflows
  • Multi-turn dialogues with memory
  • Session management for multiple users
For Web Developers

This is similar to session storage in web applications - each thread_id is like a session ID that maintains state across multiple HTTP requests.

Advanced Features

1. Error Handling and Graceful Degradation

def vector_processing_node(self, state: AgentState) -> AgentState:
"""Process papers with error handling"""
try:
result = self.vector_agent.process_papers(state["papers"])
state["vector_data"] = result

except Exception as e:
logger.error(f"Vector processing failed: {e}")

# Graceful degradation - continue with empty vector data
state["vector_data"] = {
"documents": [],
"vectors": [],
"error": str(e)
}
state["messages"].append(f"Vector processing failed (non-critical): {e}")

return state

2. Parallel Node Execution

While the current implementation uses sequential execution, LangGraph supports parallel execution:

# Fork workflow for parallel processing
workflow.add_edge("data_collection", "graph_processing")
workflow.add_edge("data_collection", "vector_processing")
workflow.add_edge("data_collection", "llamaindex_indexing")

# Join after parallel execution
workflow.add_edge(["graph_processing", "vector_processing", "llamaindex_indexing"],
"reasoning")

3. Dynamic Graph Modification

def _build_dynamic_workflow(self, config: Dict) -> CompiledGraph:
"""Build workflow based on configuration"""
workflow = StateGraph(AgentState)

# Add nodes conditionally
if config.get("use_graph", True):
workflow.add_node("graph_processing", self.graph_processing_node)

if config.get("use_vectors", True):
workflow.add_node("vector_processing", self.vector_processing_node)

if config.get("use_llamaindex", True):
workflow.add_node("llamaindex_indexing", self.llamaindex_indexing_node)

# Dynamic edge connections
# ...

return workflow.compile(checkpointer=self.memory)

LlamaIndex: Advanced RAG Framework

Core Concepts

LlamaIndex provides a comprehensive framework for building RAG (Retrieval-Augmented Generation) systems with:

  • Document Loaders: Ingest from 100+ data sources
  • Node Parsers: Intelligent document chunking
  • Vector Stores: Abstraction over vector databases
  • Retrievers: Sophisticated retrieval strategies
  • Query Engines: End-to-end question answering
  • Response Synthesizers: Generate answers from retrieved context
Web Developer Analogy

LlamaIndex is like a full-stack ORM for documents - similar to how Prisma abstracts database operations, LlamaIndex abstracts document indexing, retrieval, and querying.

Architecture Overview

Implementation Details

Complete RAG System Setup

from llama_index.core import (
VectorStoreIndex,
Document,
StorageContext,
Settings,
get_response_synthesizer
)
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from qdrant_client import QdrantClient

class LlamaIndexRAG:
"""Production-grade RAG system using LlamaIndex"""

def __init__(
self,
collection_name: str = "research_papers_llamaindex",
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
use_qdrant: bool = True,
qdrant_url: str = None,
qdrant_api_key: str = None
):
self.collection_name = collection_name

# Configure global settings
Settings.embed_model = HuggingFaceEmbedding(
model_name=embedding_model
)
Settings.chunk_size = 512
Settings.chunk_overlap = 50

# Setup vector store
self.storage_context = self._setup_vector_store(
use_qdrant, qdrant_url, qdrant_api_key
)

self.index = None
self.query_engine = None
Configuration
  • chunk_size: 512 tokens balances context preservation and retrieval precision
  • chunk_overlap: 50 tokens prevents loss of context at chunk boundaries
  • embedding_model: all-MiniLM-L6-v2 provides good quality at 384 dimensions

Vector Store Configuration

def _setup_vector_store(
self,
use_qdrant: bool,
qdrant_url: str,
qdrant_api_key: str
) -> StorageContext:
"""Setup vector store with Qdrant or in-memory fallback"""

if use_qdrant:
try:
# Connect to Qdrant
if qdrant_url:
client = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key
)
else:
# Local Qdrant
client = QdrantClient(host="localhost", port=6333)

# Create Qdrant vector store
vector_store = QdrantVectorStore(
client=client,
collection_name=self.collection_name
)

logger.info(f"✓ Connected to Qdrant: {self.collection_name}")

return StorageContext.from_defaults(
vector_store=vector_store
)

except Exception as e:
logger.warning(f"Qdrant connection failed: {e}, using in-memory")

# Fallback to in-memory vector store
logger.info("Using in-memory vector store")
return StorageContext.from_defaults()
Production vs Development
  • Production: Use Qdrant for persistent, scalable vector storage
  • Development: Use in-memory for quick testing without infrastructure

Document Indexing

def index_documents(self, papers: List[Dict]) -> Dict:
"""Index research papers into LlamaIndex"""

# Convert papers to LlamaIndex Documents
documents = []
for paper in papers:
# Create rich document text
text = f"Title: {paper.get('title', '')}\n\n"
text += f"Abstract: {paper.get('abstract', '')}\n\n"
text += f"Authors: {', '.join(paper.get('authors', []))}\n"
text += f"Year: {paper.get('year', 'Unknown')}\n"

# Full text if available
if paper.get('full_text'):
text += f"\n\nFull Text:\n{paper['full_text']}\n"

# Metadata for filtering and ranking
metadata = {
"title": paper.get("title", ""),
"authors": paper.get("authors", []),
"year": paper.get("year", ""),
"source": paper.get("source", ""),
"url": paper.get("url", ""),
"paper_id": paper.get("id", ""),
"citations": paper.get("citations", 0)
}

# Create Document with metadata
doc = Document(
text=text,
metadata=metadata,
excluded_embed_metadata_keys=["url", "paper_id"] # Don't embed URLs
)
documents.append(doc)

logger.info(f"Indexing {len(documents)} documents...")

# Create vector index
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=self.storage_context,
show_progress=True
)

# Setup query engine
self._setup_query_engine()

return {
"documents_indexed": len(documents),
"collection_name": self.collection_name,
"vector_store": "Qdrant" if self.storage_context.vector_store else "In-Memory",
"embedding_dim": 384 # all-MiniLM-L6-v2 dimension
}
Metadata Strategy
  • Include searchable metadata (title, authors, year) for filtering
  • Exclude non-semantic metadata (URLs, IDs) from embeddings to reduce noise
  • Use citations count for ranking boost

Query Engine Configuration

def _setup_query_engine(self, top_k: int = 5) -> None:
"""Configure advanced query engine"""

# Retriever with similarity-based top-k
retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=top_k
)

# Response synthesis strategy
response_synthesizer = get_response_synthesizer(
response_mode="compact" # Options: compact, refine, tree_summarize
)

# Query engine with post-processing
self.query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer,
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=0.7) # Filter low-relevance
]
)

logger.info(f"Query engine configured with top_k={top_k}")

Response Modes Explained:

ModeDescriptionBest For
compactConcatenates chunks and generates single responseMost queries, fast
refineIteratively refines answer with each chunkComplex questions
tree_summarizeBuilds tree of summariesLong documents
For Web Developers

Response modes are like different SQL query strategies - compact is like a simple JOIN, refine is like iterative aggregation, and tree_summarize is like hierarchical rollups.

Advanced Querying

def query(
self,
question: str,
top_k: int = 5,
filters: Dict = None
) -> Dict:
"""Query indexed documents with advanced features"""

if not self.query_engine:
return {
"error": "No documents indexed yet",
"answer": "",
"sources": []
}

logger.info(f"Querying: {question} (top_k={top_k})")

try:
# Execute query
response = self.query_engine.query(question)

# Extract sources with metadata and scores
sources = []
for node in response.source_nodes:
sources.append({
"text": node.text[:200] + "..." if len(node.text) > 200 else node.text,
"score": node.score,
"metadata": node.metadata,
"node_id": node.node_id
})

# Sort by relevance score
sources.sort(key=lambda x: x["score"], reverse=True)

result = {
"answer": str(response),
"sources": sources,
"num_sources": len(sources),
"query": question
}

logger.info(f"✓ Retrieved {len(sources)} relevant sources")

return result

except Exception as e:
logger.error(f"Query failed: {e}")
return {
"error": str(e),
"answer": "Query execution failed",
"sources": []
}

Metadata Filtering

def query_with_filters(
self,
question: str,
year_min: int = None,
year_max: int = None,
authors: List[str] = None,
min_citations: int = None
) -> Dict:
"""Query with metadata filters"""

from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter, FilterOperator

# Build filter list
filters_list = []

if year_min:
filters_list.append(
ExactMatchFilter(key="year", value=year_min, operator=FilterOperator.GTE)
)

if year_max:
filters_list.append(
ExactMatchFilter(key="year", value=year_max, operator=FilterOperator.LTE)
)

if authors:
for author in authors:
filters_list.append(
ExactMatchFilter(key="authors", value=author, operator=FilterOperator.CONTAINS)
)

if min_citations:
filters_list.append(
ExactMatchFilter(key="citations", value=min_citations, operator=FilterOperator.GTE)
)

# Create metadata filters
metadata_filters = MetadataFilters(filters=filters_list)

# Create filtered retriever
filtered_retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=5,
filters=metadata_filters
)

# Execute query with filters
query_engine = RetrieverQueryEngine(
retriever=filtered_retriever,
response_synthesizer=self.query_engine._response_synthesizer
)

response = query_engine.query(question)

# Format response
return self._format_response(response, question)
Advanced Filtering

Metadata filtering enables precise queries like "papers by X published after 2020 with >100 citations" - critical for research applications.

Advanced RAG Patterns

1. Hybrid Search (Vector + Keyword)

from llama_index.core.retrievers import BaseRetriever
from llama_index.retrievers.bm25 import BM25Retriever

def create_hybrid_retriever(self) -> BaseRetriever:
"""Combine vector similarity and BM25 keyword search"""

# Vector retriever
vector_retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=10
)

# BM25 keyword retriever
bm25_retriever = BM25Retriever.from_defaults(
index=self.index,
similarity_top_k=10
)

# Combine with reciprocal rank fusion
from llama_index.core.retrievers import QueryFusionRetriever

hybrid_retriever = QueryFusionRetriever(
[vector_retriever, bm25_retriever],
similarity_top_k=5,
num_queries=1, # No query generation
mode="reciprocal_rerank", # Fusion strategy
use_async=False
)

return hybrid_retriever

Benefits:

  • Vector search: Captures semantic similarity
  • BM25: Captures exact keyword matches
  • Fusion: Combines best of both approaches

2. Hierarchical Document Structure

def create_hierarchical_index(self, papers: List[Dict]):
"""Create parent-child document structure"""

from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.core.node_parser import get_leaf_nodes

# Parse documents into hierarchy
node_parser = HierarchicalNodeParser.from_defaults(
chunk_sizes=[2048, 512, 128] # Parent -> Child -> Leaf
)

# Create documents
documents = [Document(text=paper['full_text']) for paper in papers]

# Parse into hierarchical nodes
nodes = node_parser.get_nodes_from_documents(documents)

# Get leaf nodes for indexing
leaf_nodes = get_leaf_nodes(nodes)

# Create index from leaf nodes
self.index = VectorStoreIndex(
leaf_nodes,
storage_context=self.storage_context
)
Use Case

Hierarchical indexing enables zooming from document summaries (parent) to specific details (leaf) - useful for long research papers.

3. Query Transformations

def query_with_transformations(self, question: str) -> Dict:
"""Apply query transformations before retrieval"""

from llama_index.core.indices.query.query_transform import HyDEQueryTransform

# Hypothetical Document Embeddings (HyDE)
# Generates hypothetical answer, embeds it, uses for retrieval
hyde = HyDEQueryTransform(include_original=True)

# Transform query
transformed_query = hyde(question)

# Retrieve with transformed query
response = self.query_engine.query(transformed_query)

return self._format_response(response, question)

HyDE Explained:

  1. Generate hypothetical answer to question
  2. Embed hypothetical answer
  3. Use embedding to find similar documents
  4. Generate actual answer from retrieved documents
When to Use

HyDE works well when the question uses different terminology than the documents (e.g., asking in layman's terms about technical content).

4. Response Evaluation and Scoring

def query_with_evaluation(self, question: str) -> Dict:
"""Query with automatic response evaluation"""

from llama_index.core.evaluation import FaithfulnessEvaluator, RelevancyEvaluator

# Execute query
response = self.query_engine.query(question)

# Evaluate faithfulness (answer supported by sources?)
faithfulness_evaluator = FaithfulnessEvaluator()
faithfulness_result = faithfulness_evaluator.evaluate_response(
query=question,
response=response
)

# Evaluate relevancy (answer addresses question?)
relevancy_evaluator = RelevancyEvaluator()
relevancy_result = relevancy_evaluator.evaluate_response(
query=question,
response=response
)

return {
"answer": str(response),
"sources": self._extract_sources(response),
"evaluation": {
"faithfulness": faithfulness_result.score,
"relevancy": relevancy_result.score,
"feedback": {
"faithfulness": faithfulness_result.feedback,
"relevancy": relevancy_result.feedback
}
}
}
Production Quality

Automatic evaluation ensures responses meet quality standards before returning to users - critical for production RAG systems.

Storage and Persistence

Saving and Loading Indexes

def save_index(self, persist_dir: str = "./storage"):
"""Persist index to disk"""

if self.index:
self.index.storage_context.persist(persist_dir=persist_dir)
logger.info(f"✓ Index saved to {persist_dir}")
else:
logger.warning("No index to save")

def load_index(self, persist_dir: str = "./storage"):
"""Load index from disk"""

from llama_index.core import load_index_from_storage

try:
# Load storage context
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)

# Load index
self.index = load_index_from_storage(storage_context)

# Setup query engine
self._setup_query_engine()

logger.info(f"✓ Index loaded from {persist_dir}")

return True

except Exception as e:
logger.error(f"Failed to load index: {e}")
return False
Cost Optimization

Persisting indexes avoids re-embedding documents on every restart, saving significant API costs and startup time.

Incremental Updates

def add_documents(self, new_papers: List[Dict]) -> Dict:
"""Add new documents to existing index"""

if not self.index:
logger.warning("No existing index, creating new one")
return self.index_documents(new_papers)

# Convert to Documents
new_docs = []
for paper in new_papers:
text = self._format_paper_text(paper)
metadata = self._extract_paper_metadata(paper)
new_docs.append(Document(text=text, metadata=metadata))

# Insert into existing index
for doc in new_docs:
self.index.insert(doc)

logger.info(f"✓ Added {len(new_docs)} documents to index")

# Refresh query engine
self._setup_query_engine()

return {
"documents_added": len(new_docs),
"total_documents": self.index.docstore.get_document_count()
}

Integration: LangGraph + LlamaIndex

Why Use Both?

LangGraph provides:

  • Workflow orchestration
  • State management
  • Conditional logic
  • Error handling

LlamaIndex provides:

  • Document indexing
  • Semantic retrieval
  • Response synthesis
  • Query optimization

Together, they create a production-grade system where:

  • LangGraph manages the overall workflow
  • LlamaIndex handles document intelligence
  • Each focuses on its strength

Integration Architecture

Complete Integration Example

class ProductionRAGSystem:
"""Complete system integrating LangGraph and LlamaIndex"""

def __init__(self):
# Initialize LlamaIndex RAG
self.llamaindex = LlamaIndexRAG(
collection_name="research_papers",
use_qdrant=True,
qdrant_url=os.getenv("QDRANT_URL")
)

# Initialize LangGraph orchestrator
self.orchestrator = LangGraphOrchestrator(
llamaindex=self.llamaindex
)

def research_query(
self,
question: str,
session_id: str = None,
max_papers: int = 10
) -> Dict:
"""Execute complete research workflow"""

# Generate session ID if not provided
session_id = session_id or f"session_{uuid.uuid4()}"

logger.info(f"\n{'='*70}")
logger.info(f"PRODUCTION RAG SYSTEM - Research Query")
logger.info(f"Question: {question}")
logger.info(f"Session: {session_id}")
logger.info(f"{'='*70}\n")

# Execute LangGraph workflow
# This will:
# 1. Collect papers from 7 sources
# 2. Build knowledge graph
# 3. Create vector embeddings
# 4. Index in LlamaIndex
# 5. Generate answer using all sources
# 6. Self-reflect on quality
# 7. Return comprehensive result

result = self.orchestrator.process_query(
query=question,
thread_id=session_id,
max_per_source=max_papers
)

return result

def follow_up_query(
self,
question: str,
session_id: str
) -> Dict:
"""Ask follow-up question in same session"""

# LangGraph will load state from session_id
# LlamaIndex will query already-indexed documents
# No need to re-collect papers!

logger.info(f"Follow-up query in session: {session_id}")

result = self.orchestrator.process_query(
query=question,
thread_id=session_id,
max_per_source=0 # Don't collect new papers
)

return result

Usage Example

# Initialize system
rag_system = ProductionRAGSystem()

# First query
result1 = rag_system.research_query(
question="What are the latest advances in multi-agent RAG systems?",
session_id="research_session_1",
max_papers=15
)

print(f"Answer: {result1['answer']}")
print(f"Sources: {len(result1['sources'])} papers")
print(f"Quality Score: {result1['quality_score']}")
print(f"Stages: {list(result1['stage_outputs'].keys())}")

# Follow-up query (reuses indexed documents)
result2 = rag_system.follow_up_query(
question="How do these systems handle conflicting information?",
session_id="research_session_1"
)

print(f"Follow-up Answer: {result2['answer']}")

Output:

====================================================================
PRODUCTION RAG SYSTEM - Research Query
Question: What are the latest advances in multi-agent RAG systems?
Session: research_session_1
====================================================================

✓ Completed node: data_collection
✓ Completed node: graph_processing
✓ Completed node: vector_processing
✓ Completed node: llamaindex_indexing
✓ Completed node: reasoning
✓ Completed node: self_reflection
✓ Completed node: critic_review

Answer: Recent advances in multi-agent RAG systems focus on...
Sources: 15 papers
Quality Score: 0.87
Stages: ['data_collection', 'graph_processing', 'vector_processing',
'llamaindex_indexing', 'reasoning', 'self_reflection', 'critic_review']

Framework Comparison

LangGraph vs Traditional Orchestration

FeatureLangGraphTraditional Code
State ManagementBuilt-in StateGraphManual tracking
PersistenceAutomatic checkpointingCustom database code
Conditional RoutingDeclarative edgesIf/else logic
Error RecoveryBuilt-in retry mechanismsManual try/catch
VisualizationGraph visualization toolsNo built-in tools
TestingMock individual nodesMock entire workflow

LlamaIndex vs Manual RAG

FeatureLlamaIndexManual Implementation
Vector Store AbstractionSingle API for all storesCustom per store
Document Parsing100+ loadersCustom parsers
Retrieval Strategies10+ built-inImplement from scratch
Response Synthesis5 modesManual generation
EvaluationBuilt-in evaluatorsCustom metrics
Metadata FilteringDeclarative filtersCustom SQL/queries

When to Use Each Framework

Use LangGraph when:

  • Building multi-step agent workflows
  • Need conversation memory/persistence
  • Require conditional workflow logic
  • Want visualization and debugging tools

Use LlamaIndex when:

  • Building RAG applications
  • Need document indexing and retrieval
  • Want to abstract vector database complexity
  • Require advanced query optimization

Use Both when:

  • Building production-grade multi-agent RAG systems
  • Need orchestration AND intelligent retrieval
  • Want best-in-class for each concern

Production Deployment Considerations

Performance Optimization

1. Batch Processing

def index_documents_batch(
self,
papers: List[Dict],
batch_size: int = 100
) -> Dict:
"""Index documents in batches to avoid memory issues"""

total = len(papers)
indexed = 0

for i in range(0, total, batch_size):
batch = papers[i:i + batch_size]

logger.info(f"Indexing batch {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}")

if i == 0:
# Create initial index
self.index_documents(batch)
else:
# Add to existing index
self.add_documents(batch)

indexed += len(batch)

return {"total_indexed": indexed}

2. Caching

from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def cached_query(self, question_hash: str, top_k: int) -> str:
"""Cache query results"""
# Actual query executed only on cache miss
response = self.query_engine.query(question)
return str(response)

def query_with_cache(self, question: str, top_k: int = 5) -> Dict:
"""Query with caching"""
# Create hash of question
question_hash = hashlib.md5(question.encode()).hexdigest()

# Check cache
cached_answer = self.cached_query(question_hash, top_k)

return {"answer": cached_answer, "cached": True}

3. Async Operations

import asyncio
from llama_index.core.query_engine import RetrieverQueryEngine

async def async_query(self, question: str) -> Dict:
"""Asynchronous querying for concurrent requests"""

# Use async query engine
response = await self.query_engine.aquery(question)

return self._format_response(response, question)

async def batch_queries(self, questions: List[str]) -> List[Dict]:
"""Process multiple queries concurrently"""

tasks = [self.async_query(q) for q in questions]
results = await asyncio.gather(*tasks)

return results

Monitoring and Observability

LangGraph Workflow Monitoring

def process_query_with_monitoring(
self,
query: str,
thread_id: str
) -> Dict:
"""Execute workflow with detailed monitoring"""

import time

# Track execution metrics
metrics = {
"start_time": time.time(),
"node_durations": {},
"node_errors": []
}

# Execute with monitoring
for step_state in self.workflow.stream(initial_state, config):
for node_name, node_output in step_state.items():
# Track duration
node_start = time.time()

# Check for errors
if node_output.get("error"):
metrics["node_errors"].append({
"node": node_name,
"error": node_output["error"]
})

# Log metrics
duration = time.time() - node_start
metrics["node_durations"][node_name] = duration

logger.info(f"Node {node_name}: {duration:.2f}s")

metrics["total_duration"] = time.time() - metrics["start_time"]

# Send metrics to monitoring system
self._send_metrics(metrics)

return result

LlamaIndex Query Monitoring

def query_with_monitoring(
self,
question: str,
top_k: int = 5
) -> Dict:
"""Query with detailed metrics"""

import time

start_time = time.time()

# Execute query
response = self.query_engine.query(question)

# Calculate metrics
retrieval_time = time.time() - start_time
num_tokens = len(str(response).split())

# Log to monitoring
metrics = {
"retrieval_time_ms": retrieval_time * 1000,
"num_sources": len(response.source_nodes),
"num_tokens": num_tokens,
"avg_source_score": sum(n.score for n in response.source_nodes) / len(response.source_nodes),
"timestamp": time.time()
}

logger.info(f"Query metrics: {metrics}")

# Return with metrics
result = self._format_response(response, question)
result["metrics"] = metrics

return result

Error Handling

Graceful Degradation

def robust_reasoning_node(self, state: AgentState) -> AgentState:
"""Reasoning with fallback strategies"""

query = state["query"]

# Try LlamaIndex RAG first
try:
if state.get("llamaindex_data", {}).get("documents_indexed", 0) > 0:
rag_result = self.llamaindex.query(query, top_k=3)
context = rag_result["answer"]
else:
raise Exception("LlamaIndex not available")

except Exception as e:
logger.warning(f"LlamaIndex failed: {e}, trying vector search")

# Fallback to vector search
try:
vector_docs = state["vector_data"].get("documents", [])
context = "\n".join([doc["text"] for doc in vector_docs[:3]])

except Exception as e2:
logger.warning(f"Vector search failed: {e2}, using graph only")

# Final fallback to knowledge graph
context = self._extract_graph_context(state["graph_data"])

# Generate answer with whatever context we got
answer = self._generate_answer(query, context)

state["reasoning_result"] = {"answer": answer}
return state

Scaling Strategies

1. Horizontal Scaling

# Deploy multiple instances of the RAG system
# Use load balancer to distribute requests
# Share Qdrant vector store across instances

# Example with Docker Compose
"""
services:
rag-system-1:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333

rag-system-2:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333

rag-system-3:
image: researcherai:latest
environment:
- QDRANT_URL=http://qdrant:6333

qdrant:
image: qdrant/qdrant
volumes:
- qdrant_storage:/qdrant/storage

load-balancer:
image: nginx
ports:
- "80:80"
depends_on:
- rag-system-1
- rag-system-2
- rag-system-3
"""

2. Index Sharding

def create_sharded_indexes(
self,
papers: List[Dict],
num_shards: int = 4
) -> List[LlamaIndexRAG]:
"""Shard documents across multiple indexes"""

# Divide papers by topic/source/year
shards = self._shard_papers(papers, num_shards)

# Create index per shard
indexes = []
for i, shard in enumerate(shards):
rag = LlamaIndexRAG(
collection_name=f"research_papers_shard_{i}"
)
rag.index_documents(shard)
indexes.append(rag)

return indexes

def query_sharded(
self,
question: str,
indexes: List[LlamaIndexRAG]
) -> Dict:
"""Query all shards and merge results"""

all_results = []

# Query each shard
for index in indexes:
result = index.query(question, top_k=2)
all_results.extend(result["sources"])

# Merge and re-rank
all_results.sort(key=lambda x: x["score"], reverse=True)
top_results = all_results[:5]

# Synthesize final answer from top results
final_answer = self._synthesize_from_sources(question, top_results)

return {
"answer": final_answer,
"sources": top_results,
"shards_queried": len(indexes)
}

Best Practices

1. State Design in LangGraph

# ✅ GOOD: Flat, simple state structure
class AgentState(TypedDict):
query: str
papers: List[Dict]
answer: str
error: str | None

# ❌ BAD: Nested, complex state
class AgentState(TypedDict):
data: {
"input": {"query": str, "params": Dict},
"intermediate": {"step1": Dict, "step2": Dict},
"output": {"answer": str, "meta": Dict}
}

2. Document Metadata in LlamaIndex

# ✅ GOOD: Searchable, structured metadata
metadata = {
"title": "Paper Title",
"year": 2024,
"authors": ["Author 1", "Author 2"],
"citations": 150,
"source": "arxiv"
}

# ❌ BAD: Non-searchable, unstructured metadata
metadata = {
"info": "Paper Title by Author 1, Author 2 (2024) - 150 citations - arxiv"
}

3. Error Handling

# ✅ GOOD: Specific error handling with fallbacks
try:
result = self.llamaindex.query(question)
except QdrantConnectionError:
logger.warning("Qdrant unavailable, using in-memory")
result = self.fallback_index.query(question)
except Exception as e:
logger.error(f"Query failed: {e}")
result = {"error": str(e), "answer": "Unable to process query"}

# ❌ BAD: Catch-all with no recovery
try:
result = self.llamaindex.query(question)
except:
pass

4. Resource Management

# ✅ GOOD: Cleanup and resource limits
def __del__(self):
"""Cleanup resources"""
if self.index:
self.index.storage_context.persist()
if self.qdrant_client:
self.qdrant_client.close()

# Set memory limits
Settings.chunk_size = 512 # Not too large
Settings.num_output = 256 # Limit response length

# ❌ BAD: No cleanup, unlimited resources
# (Just let everything run forever and consume all memory)

5. Testing

# ✅ GOOD: Test individual components
def test_llamaindex_indexing():
"""Test document indexing"""
rag = LlamaIndexRAG()
papers = [{"title": "Test", "abstract": "Test abstract"}]

result = rag.index_documents(papers)

assert result["documents_indexed"] == 1
assert rag.index is not None

def test_langgraph_node():
"""Test individual workflow node"""
orchestrator = LangGraphOrchestrator()
state = {"query": "test", "papers": []}

new_state = orchestrator.data_collection_node(state)

assert len(new_state["papers"]) > 0

# ❌ BAD: Only test end-to-end
def test_everything():
system = ProductionRAGSystem()
result = system.research_query("test")
assert result # Too vague!

Common Pitfalls and Solutions

Pitfall 1: Infinite Loops in LangGraph

Problem: Conditional edges can create infinite loops

# ❌ DANGEROUS
workflow.add_conditional_edges(
"quality_check",
lambda state: "improve" if state["score"] < 0.9 else "end",
{"improve": "reasoning", "end": END}
)
workflow.add_edge("reasoning", "quality_check")
# If score never reaches 0.9, infinite loop!

Solution: Add iteration limits

# ✅ SAFE
def should_continue(state: AgentState) -> str:
if state["retry_count"] >= 3:
return "end" # Maximum 3 iterations
if state["score"] >= 0.75: # Reasonable threshold
return "end"
state["retry_count"] += 1
return "improve"

Pitfall 2: Vector Store Memory Leaks

Problem: Creating new indexes without cleanup

# ❌ MEMORY LEAK
for batch in large_dataset:
index = VectorStoreIndex.from_documents(batch)
# Never cleaned up, memory grows

Solution: Reuse indexes and cleanup

# ✅ MEMORY EFFICIENT
index = None
for batch in large_dataset:
if index is None:
index = VectorStoreIndex.from_documents(batch)
else:
for doc in batch:
index.insert(doc)

# Periodically persist and clear memory
if batch_count % 10 == 0:
index.storage_context.persist()

Pitfall 3: Embedding Dimension Mismatches

Problem: Mixing embedding models with different dimensions

# ❌ DIMENSION MISMATCH
# Index created with all-MiniLM-L6-v2 (384 dim)
Settings.embed_model = HuggingFaceEmbedding("all-MiniLM-L6-v2")
index = VectorStoreIndex.from_documents(docs)

# Later, query with different model (768 dim) - FAILS!
Settings.embed_model = HuggingFaceEmbedding("all-mpnet-base-v2")
response = index.query("question") # Error: dimension mismatch

Solution: Lock embedding model

# ✅ CONSISTENT
class LlamaIndexRAG:
def __init__(self, embedding_model: str):
self.embedding_model_name = embedding_model
Settings.embed_model = HuggingFaceEmbedding(embedding_model)
# Save with index
self.index_metadata = {"embedding_model": embedding_model}

def load_index(self, persist_dir: str):
# Verify embedding model matches
metadata = json.load(open(f"{persist_dir}/metadata.json"))
if metadata["embedding_model"] != self.embedding_model_name:
raise ValueError(f"Embedding model mismatch!")

Pitfall 4: Blocking Operations in Async Contexts

Problem: Using sync operations in async workflows

# ❌ BLOCKS EVENT LOOP
async def async_query_handler(question: str):
# This blocks!
response = self.query_engine.query(question)
return response

Solution: Use async methods

# ✅ NON-BLOCKING
async def async_query_handler(question: str):
# Proper async
response = await self.query_engine.aquery(question)
return response

Real-World Performance Metrics

ResearcherAI Production Stats

System Configuration:

  • LangGraph 0.2.45
  • LlamaIndex 0.12.0
  • Qdrant vector store (cloud)
  • HuggingFace embeddings (all-MiniLM-L6-v2)

Performance Metrics (from production deployment):

OperationAverage TimeNotes
Document Indexing (100 papers)12.3sIncluding embedding generation
LangGraph Full Workflow24.7sAll 8 nodes
LlamaIndex Query (top-5)0.8sCached embeddings
End-to-End Research Query28.5sFirst query in session
Follow-up Query2.1sUsing cached index

Accuracy Metrics:

MetricScoreMeasurement
Answer Relevance0.87LlamaIndex evaluator
Source Faithfulness0.92LlamaIndex evaluator
Self-Reflection Quality0.84LangGraph critic node
User Satisfaction4.2/5Manual evaluation

Resource Usage:

ResourceUsageConfiguration
Memory (per instance)2.1 GBWith 1000 docs indexed
CPU (average)15%2 vCPUs
Qdrant Storage450 MB1000 papers, 384-dim vectors
Request Throughput12 req/minSingle instance

Summary

This chapter covered the two foundational frameworks powering ResearcherAI:

LangGraph

  • Purpose: Stateful workflow orchestration for multi-agent systems
  • Key Features: State graphs, conditional routing, checkpointing, streaming
  • Best For: Complex multi-step workflows, conversation memory, adaptive logic
  • Production Ready: Built-in persistence, error handling, monitoring hooks

LlamaIndex

  • Purpose: Advanced RAG framework for document indexing and retrieval
  • Key Features: Vector store abstraction, query optimization, response synthesis
  • Best For: Semantic search, document Q&A, knowledge retrieval
  • Production Ready: Multiple vector stores, evaluation tools, async support

Integration Benefits

  • LangGraph handles macro-level orchestration
  • LlamaIndex handles micro-level document intelligence
  • Together: Production-grade multi-agent RAG system
  • Proven: 19/19 tests passing, deployed in production

Next Steps

In the next chapter, we'll dive into the backend implementation, exploring how these frameworks integrate with:

  • Neo4j knowledge graphs
  • Qdrant vector databases
  • Apache Kafka event streaming
  • FastAPI web services

We'll see how the theoretical concepts from this chapter translate into production code that scales.


Chapter Resources