Architecture Design
With requirements in hand, it was time to design the system. This chapter walks through my architecture decisions and why I made them.
The Big Picture
Here's the 30,000-foot view of ResearcherAI:
Simple, right? Let me break down each component and explain the design decisions.
Multi-Agent Architecture
Why Multiple Agents?
I could have built one big agent that does everything. But that would be:
- ❌ Hard to test (too many responsibilities)
- ❌ Hard to scale (can't scale parts independently)
- ❌ Hard to maintain (changes ripple everywhere)
- ❌ Hard to reason about (too complex)
Instead, I followed the Single Responsibility Principle and created specialized agents.
The Six Agents
1. OrchestratorAgent
class OrchestratorAgent:
"""Coordinates all other agents and manages sessions"""
def collect_data(query, sources) -> Result:
"""Orchestrates data collection workflow"""
def ask(question) -> Answer:
"""Orchestrates RAG query workflow"""
def save_session() -> None:
"""Persists session state"""
Role: The conductor of the orchestra
- Receives user requests
- Delegates to specialized agents
- Manages conversation state
- Handles session persistence
2. DataCollectorAgent
class DataCollectorAgent:
"""Collects papers from multiple sources"""
def collect_from_arxiv(query) -> Papers:
def collect_from_semantic_scholar(query) -> Papers:
def collect_from_pubmed(query) -> Papers:
# ... 7 sources total
Role: The data gatherer
- Queries 7 academic sources
- Handles rate limiting
- Deduplicates results
- Publishes collection events
3. KnowledgeGraphAgent
class KnowledgeGraphAgent:
"""Manages knowledge graph (Neo4j or NetworkX)"""
def add_papers(papers) -> Stats:
def add_relationship(from, to, type) -> None:
def query(cypher_or_pattern) -> Results:
def visualize() -> Graph:
Role: The knowledge organizer
- Extracts entities (papers, authors, topics)
- Creates relationships
- Supports graph queries
- Generates visualizations
4. VectorAgent
class VectorAgent:
"""Manages vector embeddings (Qdrant or FAISS)"""
def add_documents(docs) -> int:
def search(query, top_k) -> Results:
def get_similar(doc_id, top_k) -> Results:
Role: The semantic searcher
- Generates embeddings
- Stores in vector DB
- Similarity search
- Retrieves relevant context
5. ReasoningAgent
class ReasoningAgent:
"""Generates answers using LLM + RAG"""
def answer(question, context, history) -> Answer:
def summarize(documents) -> Summary:
Role: The thinker
- Synthesizes information
- Maintains conversation memory
- Generates citations
- Provides explanations
6. SchedulerAgent
class SchedulerAgent:
"""Automates periodic data collection"""
def start(interval, queries) -> None:
def stop() -> None:
def get_stats() -> Stats:
Role: The automator
- Background data collection
- Configurable schedules
- Health monitoring
Orchestration Frameworks
While the six agents above handle specific responsibilities, coordinating them requires robust orchestration. ResearcherAI uses two powerful frameworks:
LangGraph: Workflow Orchestration
LangGraph provides stateful workflow orchestration for complex multi-agent interactions:
from langgraph.graph import StateGraph, END
class LangGraphOrchestrator:
"""Stateful workflow management"""
def _build_workflow(self):
workflow = StateGraph(AgentState)
# Define workflow nodes
workflow.add_node("data_collection", self.data_collection_node)
workflow.add_node("graph_processing", self.graph_processing_node)
workflow.add_node("vector_processing", self.vector_processing_node)
workflow.add_node("llamaindex_indexing", self.llamaindex_indexing_node)
workflow.add_node("reasoning", self.reasoning_node)
workflow.add_node("self_reflection", self.self_reflection_node)
# Connect nodes with edges
workflow.add_edge("data_collection", "graph_processing")
workflow.add_edge("graph_processing", "vector_processing")
# ... more edges
# Compile with state persistence
return workflow.compile(checkpointer=MemorySaver())
Key Features:
- State Management: Automatic state flow through workflow nodes
- Conditional Routing: Dynamic workflow paths based on runtime conditions
- Checkpointing: Built-in conversation persistence across sessions
- Error Recovery: Graceful handling of failures with retry mechanisms
Why LangGraph?
- Treats workflows as directed graphs (easier to visualize and debug)
- Built-in memory for multi-turn conversations
- Stream processing for real-time progress updates
- Production-ready with minimal boilerplate
LangGraph is like a state machine orchestrator for backend workflows - similar to Redux for frontend state management, but for agent coordination.
LlamaIndex: Advanced RAG Framework
LlamaIndex powers the document intelligence layer:
from llama_index.core import VectorStoreIndex, Document
from llama_index.vector_stores.qdrant import QdrantVectorStore
class LlamaIndexRAG:
"""Production-grade RAG system"""
def index_documents(self, papers: List[Dict]):
# Convert to LlamaIndex documents
documents = [
Document(
text=paper['abstract'],
metadata={
'title': paper['title'],
'authors': paper['authors'],
'year': paper['year']
}
)
for paper in papers
]
# Create vector index
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=self.storage_context
)
def query(self, question: str, top_k: int = 5):
# Advanced retrieval with post-processing
response = self.query_engine.query(question)
return {
'answer': str(response),
'sources': [node.metadata for node in response.source_nodes]
}
Key Features:
- Vector Store Abstraction: Single API for Qdrant, FAISS, Pinecone, etc.
- Query Optimization: Automatic query rewriting and enhancement
- Response Synthesis: Multiple strategies (compact, refine, tree_summarize)
- Metadata Filtering: Filter by year, author, citations, etc.
Why LlamaIndex?
- Abstracts away vector database complexity
- Built-in evaluation metrics for RAG quality
- Advanced retrieval strategies (hybrid search, HyDE, etc.)
- Production-tested across thousands of applications
LlamaIndex is like an ORM for documents - similar to how Prisma abstracts database operations, LlamaIndex abstracts document indexing and retrieval.
Integration: LangGraph + LlamaIndex
The two frameworks work together seamlessly:
Division of Responsibilities:
- LangGraph: Manages workflow state, agent coordination, conversation memory
- LlamaIndex: Handles document chunking, embedding, retrieval, and synthesis
- Together: Production-grade multi-agent RAG system
Real-World Benefits:
# Single query orchestrates everything
orchestrator = LangGraphOrchestrator()
result = orchestrator.process_query(
query="What are the latest advances in multi-agent systems?",
thread_id="session_123" # Conversation memory
)
# Behind the scenes:
# 1. LangGraph routes through workflow nodes
# 2. DataCollector gathers papers from 7 sources
# 3. LlamaIndex indexes documents in Qdrant
# 4. KnowledgeGraph builds relationships
# 5. VectorAgent creates embeddings
# 6. LlamaIndex retrieves relevant context
# 7. ReasoningAgent synthesizes answer
# 8. Self-reflection evaluates quality
# 9. LangGraph persists conversation state
Performance Impact:
- First query: ~28.5s (includes indexing 30+ papers)
- Follow-up query: ~2.1s (reuses indexed documents)
- Memory efficiency: State checkpointing prevents memory leaks
- Scalability: Both frameworks support horizontal scaling
From ResearcherAI production deployment:
- Documents indexed: 1,000+ research papers
- Average retrieval time: 0.8s (LlamaIndex + Qdrant)
- Workflow completion: 24.7s (all 8 LangGraph nodes)
- Answer quality: 0.87/1.0 (LlamaIndex relevance evaluator)
For a deep dive into these frameworks with complete code examples, see Chapter 4: Orchestration Frameworks.
Dual-Backend Strategy
This was one of my favorite architectural decisions.
The Problem
Development with production infrastructure is painful:
# Production setup
docker-compose up # 30 seconds, 7 containers, 4GB RAM
pytest # 5 minutes (Docker overhead)
But I need production infrastructure eventually. How to have both?
The Solution: Abstraction Layer
I created interfaces that both backends implement:
# Abstract interface
class GraphBackend(ABC):
@abstractmethod
def add_node(self, node_id, properties): pass
@abstractmethod
def add_edge(self, from_id, to_id, relationship): pass
@abstractmethod
def query(self, pattern): pass
# Development backend
class NetworkXBackend(GraphBackend):
"""In-memory graph, instant startup"""
def __init__(self):
self.graph = nx.MultiDiGraph()
def add_node(self, node_id, properties):
self.graph.add_node(node_id, **properties)
# Production backend
class Neo4jBackend(GraphBackend):
"""Persistent graph, Cypher queries"""
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def add_node(self, node_id, properties):
self.driver.execute_query(
"CREATE (n:Node {id: $id}) SET n += $props",
id=node_id, props=properties
)
Then I select the backend based on environment variables:
if os.getenv("USE_NEO4J") == "true":
graph = Neo4jBackend(uri, user, password)
else:
graph = NetworkXBackend()
The Benefits
Development Mode
export USE_NEO4J=false USE_QDRANT=false
python main.py # Instant startup, no Docker
pytest # 90 seconds, no infrastructure
Production Mode
export USE_NEO4J=true USE_QDRANT=true
docker-compose up # Full infrastructure
python main.py # Same code, different backends
Same Code, Different Scale:
- ⚡ 0s startup vs 30s
- 💰 $0 infra vs $50/month
- 🧪 Fast tests vs slow tests
- 🔄 Zero code changes to switch
Event-Driven Architecture
Why Events?
Without events, agents call each other directly:
# Synchronous, tightly coupled
papers = data_collector.collect("transformers")
graph_stats = graph_agent.add_papers(papers) # Blocks
vector_stats = vector_agent.add_papers(papers) # Blocks
Problems:
- ❌ Graph and vector operations run sequentially (slow)
- ❌ If graph_agent fails, vector_agent never runs
- ❌ Can't replay or debug easily
- ❌ Tight coupling between agents
With events:
# Async, loosely coupled
papers = data_collector.collect("transformers")
event_bus.publish("papers.collected", papers)
# Separate consumers process in parallel
graph_agent.on("papers.collected", add_to_graph)
vector_agent.on("papers.collected", add_to_vectors)
Benefits:
- ✅ Graph and vector run in parallel (3x faster)
- ✅ One failure doesn't break everything
- ✅ Can replay events for debugging
- ✅ Loose coupling, easy to add consumers
Event Topics
I designed 16 event topics organized by workflow:
Query Lifecycle
query.submitted → User asks question
query.validated → Question is valid
query.completed → Answer generated
query.failed → Error occurred
Data Collection
data.collection.started → Collection begins
data.collection.completed → Papers collected
data.collection.failed → Collection error
Graph Processing
graph.processing.started → Adding to graph
graph.processing.completed → Graph updated
graph.processing.failed → Graph error
Vector Processing
vector.processing.started → Generating embeddings
vector.processing.completed → Vectors stored
vector.processing.failed → Embedding error
Reasoning
reasoning.started → LLM invoked
reasoning.completed → Answer generated
reasoning.failed → LLM error
System
agent.health.check → Periodic health ping
agent.error → Critical error
Kafka vs Synchronous
I made Kafka optional because:
- Development: Overkill for single-user testing
- CI/CD: Adds complexity to test pipelines
- Simple deployments: Not everyone needs Kafka
The code falls back gracefully:
if kafka_available():
event_bus = KafkaEventBus()
else:
event_bus = SynchronousEventBus() # Direct calls
# Same API for both
event_bus.publish("papers.collected", papers)
RAG Architecture
The RAG Pipeline
When a user asks a question, here's what happens:
1. Question → Embedding
"What are transformers?"
→ [0.2, -0.5, 0.8, ...]
2. Vector Search → Relevant Chunks
Top 5 paper chunks about transformers
3. Graph Traversal → Related Entities
- Paper: "Attention Is All You Need"
- Authors: Vaswani et al.
- Topics: attention, seq2seq
4. Context Building → Prompt
Combine vectors + graph results + history
5. LLM Reasoning → Answer
Gemini 2.0 synthesizes answer with citations
Why Both Graphs and Vectors?
I get asked this a lot. Why not just one?
Vectors are great for:
- ✅ Semantic similarity ("transformers" ≈ "attention mechanisms")
- ✅ Finding relevant passages
- ✅ Fuzzy matching
But vectors are bad for:
- ❌ Relationships ("who authored this paper?")
- ❌ Structure ("papers citing this paper")
- ❌ Multi-hop queries ("papers by colleagues of X")
Graphs are great for:
- ✅ Relationships and structure
- ✅ Multi-hop traversals
- ✅ Explicit connections
But graphs are bad for:
- ❌ Semantic similarity
- ❌ Fuzzy matching
- ❌ Unstructured text search
Together, they're powerful:
# Hybrid search
def answer_question(question):
# 1. Vector search for semantic relevance
relevant_chunks = vector_agent.search(question, top_k=10)
# 2. Graph traversal for related entities
papers = [chunk.paper_id for chunk in relevant_chunks]
related = graph_agent.get_related(papers, depth=2)
# 3. Combine both for context
context = build_context(relevant_chunks, related)
# 4. LLM generates answer
return reasoning_agent.answer(question, context)
Session Management
Users need to:
- Work on multiple research topics
- Save progress
- Resume later
I designed a session system:
class Session:
id: str
name: str
created_at: datetime
papers_collected: int
conversations: List[Conversation]
metadata: Dict
class Conversation:
question: str
answer: str
sources: List[str]
timestamp: datetime
Sessions are saved as JSON:
{
"id": "research-transformers-20251101",
"name": "Transformer Research",
"created_at": "2025-11-01T10:00:00Z",
"papers_collected": 47,
"conversations": [
{
"question": "What are transformers?",
"answer": "Transformers are...",
"sources": ["arxiv:1706.03762"],
"timestamp": "2025-11-01T10:05:00Z"
}
]
}
The OrchestratorAgent manages sessions:
# Create new session
orch = OrchestratorAgent("my-research")
# Collect data (adds to session)
orch.collect_data("transformers")
# Ask questions (added to session)
orch.ask("What are transformers?")
# Save session
orch.save_session()
# Load later
orch = OrchestratorAgent("my-research") # Auto-loads
orch.get_stats() # Shows history
Production-Grade Patterns
From the start, I implemented patterns that matter at scale.
1. Circuit Breakers
Prevent cascade failures when external APIs fail:
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failures = 0
self.threshold = failure_threshold
self.timeout = recovery_timeout
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func):
if self.state == "OPEN":
if time.time() - self.opened_at > self.timeout:
self.state = "HALF_OPEN" # Try again
else:
raise CircuitBreakerOpen()
try:
result = func()
self.failures = 0
self.state = "CLOSED"
return result
except Exception as e:
self.failures += 1
if self.failures >= self.threshold:
self.state = "OPEN"
self.opened_at = time.time()
raise
Result: 94% error reduction in tests with flaky APIs.
2. Token Budgets
Prevent runaway LLM costs:
class TokenBudget:
def __init__(self, per_request=10000, per_user=100000):
self.per_request = per_request
self.per_user = per_user
self.usage = {}
def check(self, user_id, estimated_tokens):
if estimated_tokens > self.per_request:
raise BudgetExceeded("Request exceeds per-request limit")
if self.usage.get(user_id, 0) + estimated_tokens > self.per_user:
raise BudgetExceeded("User exceeds daily limit")
def record(self, user_id, actual_tokens):
self.usage[user_id] = self.usage.get(user_id, 0) + actual_tokens
Result: No surprise bills, predictable costs.
3. Intelligent Caching
Dual-tier cache (memory + disk) with TTLs:
class DualTierCache:
def __init__(self):
self.memory = {} # Fast, volatile
self.disk = diskcache.Cache("./cache") # Slow, persistent
def get(self, key):
# Try memory first
if key in self.memory:
return self.memory[key]
# Fall back to disk
if key in self.disk:
value = self.disk[key]
self.memory[key] = value # Promote to memory
return value
return None
def set(self, key, value, ttl=3600):
self.memory[key] = value
self.disk.set(key, value, expire=ttl)
Result: 40% cost reduction on repeated queries.
4. Dynamic Model Selection
Use the cheapest model that meets requirements:
def select_model(task_type, max_latency=None, min_quality=None):
models = [
{"name": "gemini-flash", "cost": 0.35, "latency": 1.5, "quality": 0.85},
{"name": "gemini-pro", "cost": 1.00, "latency": 2.5, "quality": 0.92},
{"name": "gpt-4", "cost": 30.00, "latency": 3.0, "quality": 0.95},
]
# Filter by constraints
candidates = [m for m in models
if (not max_latency or m["latency"] <= max_latency)
and (not min_quality or m["quality"] >= min_quality)]
# Sort by cost, return cheapest
return min(candidates, key=lambda m: m["cost"])
Result: 70% cost savings vs always using GPT-4.
Airflow Integration
I added Apache Airflow for production-grade ETL orchestration.
Why Airflow?
Before Airflow: Sequential collection
# Slow, sequential, no retries
for source in sources:
papers = collect(source) # Blocks until done
With Airflow: Parallel DAG
# Fast, parallel, auto-retry
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG("research_paper_etl"):
arxiv = PythonOperator(task_id="arxiv", python_callable=collect_arxiv)
pubmed = PythonOperator(task_id="pubmed", python_callable=collect_pubmed)
semantic = PythonOperator(task_id="semantic", python_callable=collect_semantic)
# All run in parallel!
Results:
- ⚡ 3-4x faster (5-10s vs 19-38s)
- 🔄 Automatic retries with exponential backoff
- 📊 Visual monitoring at http://localhost:8080
- 📈 Scalable with Celery workers
Architecture Evolution
The architecture didn't start perfect. Here's how it evolved:
Version 1.0 (Week 1)
- Single agent
- Only arXiv
- NetworkX only
- No caching
- No error handling
Version 1.5 (Week 2)
- Multi-agent split
- 3 data sources
- Added Neo4j/Qdrant
- Basic caching
- Circuit breakers
Version 2.0 (Week 3)
- 6 specialized agents
- 7 data sources
- Kafka events
- Airflow ETL
- Full production patterns
- 96% test coverage
Key Learning: Start simple, add complexity as needed.
Lessons Learned
✅ What Worked
- Dual backends: Saved countless hours
- Event-driven: Enabled parallel processing
- Production patterns early: No painful refactoring
- Hybrid RAG: Graphs + vectors are more powerful together
🤔 What I'd Change
- Should have profiled earlier: Some optimization was premature
- Over-engineered caching: Simple LRU cache would have been fine for v1
- Event schema versioning: Will be needed if this scales
💡 Key Insights
Good architecture enables fast development and easy scaling.
Abstractions are worth it when they provide real optionality.
Production patterns implemented early > Bolt-on later.
Measure before optimizing. I wasted time optimizing things that weren't bottlenecks.
Next: Backend Development
Now that you understand the architecture, let's build it! In the next section, I'll walk through implementing each agent and integrating the data sources.
← Back: Planning Next: Backend Development →