We wired three Pydantic AI agents into a LangGraph workflow, hit run, and watched a research pipeline extract entities, score sentiment, and generate a validated summary — all in a single async pass with zero malformed outputs. The secret was letting each framework do exactly what it does best: LangGraph handles the when and where of execution, while Pydantic AI enforces the what at every data boundary. The result is agent systems that are simultaneously flexible and bulletproof.
If you have built multi-agent pipelines before, you know the usual tradeoff. You can have autonomous agents that adapt to messy real-world inputs, or you can have rigid pipelines that guarantee valid output formats. Pick one. We spent months stuck in that false dichotomy, chaining LLM calls together with duct-tape parsing and silent prayer. One malformed JSON response three steps deep would blow up the entire run, and the stack trace told us nothing useful.
LangGraph and Pydantic AI dissolve that tradeoff entirely. LangGraph models your workflow as a directed graph where nodes are processing steps and edges control execution flow, including conditional branches, parallel paths, and cycles. Pydantic AI wraps every agent call in type-safe validation, so you know at compile time what shape data will have when it arrives at the next node. Together, they give you the orchestration flexibility of a full workflow engine with the data guarantees of a statically typed system.
The Two Halves of the Problem
What LangGraph and Pydantic AI Actually Do
LangGraph is a framework for building stateful, multi-agent applications with LLMs. It models AI workflows as directed graphs where nodes represent distinct processing steps and edges define execution flow. Unlike simple sequential chains, LangGraph treats both nodes and transitions as first-class citizens. You can build workflows with conditional branching, parallel execution, and cycles, all while maintaining persistent state across the entire execution.
Pydantic AI brings type safety and data validation to LLM outputs. Built on the foundation of Pydantic (the same library powering FastAPI), it ensures that every piece of data flowing through your system conforms to well-defined schemas. When an LLM generates a response, Pydantic AI validates it against your models and catches errors before they propagate downstream.
The combination works because these tools occupy different layers of the stack. LangGraph decides which agent runs next, manages shared state between steps, and routes execution down different paths based on conditions. Pydantic AI ensures that at every handoff point, the data conforms to expectations. No more wondering if the LLM included all required fields. No more hoping the format is correct.
Where We Got It Wrong First
Our first attempt at combining these frameworks was a mess. We defined Pydantic models for every possible output, wired them into a LangGraph state machine, and tried to force every piece of intermediate data through strict validation. The result? A system that rejected half its own outputs. An agent would return a perfectly reasonable summary, but because it included an extra field or formatted a date slightly differently, Pydantic would throw a ValidationError and the workflow would halt.
The fix was counterintuitive: we stopped validating everything and started validating only at the boundaries between agents. Internal scratch-pad data could be loose. But every time data crossed from one agent to another, it had to pass through a Pydantic model. That single architectural decision cut our error rate by an order of magnitude while keeping the pipeline flexible enough to handle unexpected LLM behavior.
KEY INSIGHT: Validate at agent boundaries, not inside them. Strict typing between nodes plus loose handling within nodes gives you reliability without brittleness.
Seeing the Integration in Action
Here is what it looks like when these two frameworks work together. A research query goes in, validated data comes out, and LangGraph manages the flow between steps:
from langgraph.graph import StateGraph, Statefrom pydantic import BaseModel, Fieldfrom pydantic_ai import Agentfrom typing import List, Dict, Optional
# Define our data models with Pydanticclass ResearchQuery(BaseModel): """Structured representation of a research query.""" topic: str = Field(..., description="Main research topic") subtopics: List[str] = Field(default_factory=list, description="Specific areas to explore") max_sources: int = Field(default=5, ge=1, le=20, description="Maximum number of sources to find")
class ResearchResult(BaseModel): """Validated research output.""" summary: str = Field(..., description="Executive summary of findings") key_insights: List[str] = Field(..., min_items=1, description="Main discoveries") sources: List[Dict[str, str]] = Field(..., description="Citations with title and URL") confidence_score: float = Field(..., ge=0, le=1, description="Confidence in the findings")
# Create Pydantic AI agents for each stepresearch_agent = Agent( 'openai:gpt-4o', result_type=ResearchResult, system_prompt="You are a research specialist. Analyze information and extract key insights.")
# Define Langgraph stateclass ResearchState(TypedDict): query: ResearchQuery raw_data: Optional[str] research_result: Optional[ResearchResult] status: str
# Build the workflow graphworkflow = StateGraph(ResearchState)
# Node functions that leverage Pydantic AI agentsasync def gather_data(state: ResearchState) -> Dict: """Gather raw data based on the research query.""" # In a real implementation, this might call APIs or search databases # For now, we'll simulate with a simple prompt query = state["query"] data = f"Simulated data about {query.topic} covering {', '.join(query.subtopics)}" return {"raw_data": data, "status": "data_gathered"}
async def analyze_data(state: ResearchState) -> Dict: """Use Pydantic AI agent to analyze and structure the data.""" result = await research_agent.run( f"Analyze this data and extract insights: {state['raw_data']}" ) # The result is guaranteed to be a valid ResearchResult return {"research_result": result.data, "status": "analysis_complete"}
# Wire up the graphworkflow.add_node("gather", gather_data)workflow.add_node("analyze", analyze_data)workflow.add_edge("gather", "analyze")workflow.set_entry_point("gather")
# Compile and useapp = workflow.compile()Notice how naturally these pieces fit. LangGraph handles the flow — deciding what happens when, managing state between steps, routing to different paths based on conditions. Pydantic AI ensures that at every step, the data matches our schemas. The ResearchResult model guarantees that key_insights has at least one entry and confidence_score stays between 0 and 1, without a single manual check in our code.
How LangGraph Compares to Other Orchestration Approaches
To appreciate what LangGraph brings to the table, compare it with traditional approaches:
| Feature | LangGraph | Traditional Chains | Workflow Engines |
|---|---|---|---|
| State Management | Built-in, persistent across nodes | Limited, often passed manually | Varies, often external |
| Conditional Logic | Native support via edges | Requires workarounds | Usually supported |
| Parallel Execution | First-class with proper state handling | Difficult to implement | Depends on engine |
| Error Recovery | Can route to error handlers | Typically fails entire chain | Varies by implementation |
| Debugging | Visualizable graph structure | Linear, harder to trace | Often good tooling |
| LLM Integration | Designed for it | Retrofitted | Usually generic |
LangGraph was built specifically for LLM applications, not adapted from generic workflow engines. That shows in features like checkpointing (saving state for long-running workflows) and native support for the conditional logic common in agent systems.

Figure 1: LangGraph vs. Sequential Chains — LangGraph’s graph-based approach supports conditional branching and multiple paths to the same endpoint, while sequential chains force a rigid, linear flow.
Architecture: Four Layers Working Together
How the Layers Stack Up
Understanding the integration requires looking at the system as four distinct layers, each with a clear responsibility.

Figure 2: Integrated Architecture — Data flows from the client through LangGraph’s orchestration layer, where Pydantic AI validates all inputs and outputs before they interact with LLMs, tools, or storage systems.
Here is what each layer does:
-
Orchestration Layer (LangGraph)
The top layer maintains application state and manages transitions between processing nodes. It decides which node to execute next based on current state, manages parallel execution when multiple paths are viable, handles checkpoints for long-running workflows, and provides retry and error recovery mechanisms.
-
Validation Layer (Pydantic AI)
Every piece of data flowing between nodes passes through this layer. Pydantic AI confirms that LLM outputs conform to expected schemas, tool inputs are properly formatted before execution, state transitions only happen with valid data, and type mismatches get caught immediately instead of three steps later.
-
Execution Layer
Where the actual work happens. This layer coordinates LLM invocations with proper prompt formatting, tool execution with validated parameters, external API calls with error handling, and result aggregation from parallel operations.
-
Integration Layer
The bottom layer handles all external interactions: database connections for state persistence, API integrations for external services, message queues for async processing, and monitoring infrastructure.
State Transitions and Data Flow
One of the most powerful aspects of this integration is how state transitions work. Here is a practical document analysis pipeline that extracts entities, scores sentiment, and generates summaries, with Pydantic validating every state transition:
from langgraph.graph import StateGraph, ENDfrom pydantic import BaseModel, Field, validatorfrom pydantic_ai import Agentfrom typing import List, Optional, Literalfrom enum import Enum
# Define workflow states with Pydantic modelsclass DocumentStatus(str, Enum): PENDING = "pending" EXTRACTING = "extracting" ANALYZING = "analyzing" REVIEWING = "reviewing" COMPLETE = "complete" ERROR = "error"
class DocumentAnalysisState(BaseModel): """State that flows through our document analysis workflow.""" document_id: str content: str status: DocumentStatus = DocumentStatus.PENDING extracted_entities: Optional[List[str]] = None sentiment_score: Optional[float] = None summary: Optional[str] = None review_notes: Optional[str] = None error_message: Optional[str] = None
@validator('sentiment_score') def validate_sentiment(cls, v): if v is not None and not -1 <= v <= 1: raise ValueError("Sentiment score must be between -1 and 1") return v
# Create specialized agents for each taskentity_extractor = Agent( 'openai:gpt-4o', system_prompt="Extract all named entities (people, organizations, locations) from the text.")
sentiment_analyzer = Agent( 'openai:gpt-4o', system_prompt="Analyze the sentiment of the text. Return a score between -1 (negative) and 1 (positive).")
summarizer = Agent( 'openai:gpt-4o', system_prompt="Create a concise summary of the document's main points.")
# Define node functionsasync def extract_entities(state: DocumentAnalysisState) -> Dict: """Extract entities from the document.""" try: result = await entity_extractor.run(state.content) # Parse the result and extract entity list entities = parse_entities(result.data) # Custom parsing function return { "extracted_entities": entities, "status": DocumentStatus.ANALYZING } except Exception as e: return { "status": DocumentStatus.ERROR, "error_message": f"Entity extraction failed: {str(e)}" }
async def analyze_sentiment(state: DocumentAnalysisState) -> Dict: """Analyze document sentiment.""" try: result = await sentiment_analyzer.run(state.content) score = float(result.data) # Pydantic AI ensures this is valid return { "sentiment_score": score, "status": DocumentStatus.REVIEWING } except Exception as e: return { "status": DocumentStatus.ERROR, "error_message": f"Sentiment analysis failed: {str(e)}" }
async def create_summary(state: DocumentAnalysisState) -> Dict: """Generate document summary.""" # Include extracted entities in the summary for context context = f"Document contains entities: {', '.join(state.extracted_entities or [])}" prompt = f"{context}\n\nDocument: {state.content}"
result = await summarizer.run(prompt) return { "summary": result.data, "status": DocumentStatus.COMPLETE }
# Build the workflowdef create_document_workflow(): workflow = StateGraph(DocumentAnalysisState)
# Add nodes workflow.add_node("extract", extract_entities) workflow.add_node("sentiment", analyze_sentiment) workflow.add_node("summarize", create_summary)
# Define the flow workflow.set_entry_point("extract")
# Conditional routing based on status def route_after_extraction(state: DocumentAnalysisState) -> str: if state.status == DocumentStatus.ERROR: return END return "sentiment"
workflow.add_conditional_edges("extract", route_after_extraction) workflow.add_edge("sentiment", "summarize") workflow.add_edge("summarize", END)
return workflow.compile()
# Use the workflowapp = create_document_workflow()
# Process a documentinitial_state = DocumentAnalysisState( document_id="doc123", content="Apple Inc. announced record profits in Q4 2024...")
result = await app.ainvoke(initial_state)print(f"Summary: {result.summary}")print(f"Sentiment: {result.sentiment_score}")print(f"Entities: {result.extracted_entities}")The critical detail here is how failures get handled gracefully. If entity extraction fails, the workflow routes to an error state instead of crashing the entire pipeline. The state is preserved, so you can resume from where things went wrong. No partial results lost, no silent corruption.
Transition Mechanisms: Conditional Routing and Map-Reduce
LangGraph provides several powerful patterns for controlling flow through your workflow:

Figure 3: Transition Mechanisms in LangGraph — Conditional edges handle dynamic routing while map-reduce handles parallel processing. These two patterns cover the vast majority of real-world workflow needs.
Here is a map-reduce pattern for parallel document processing. We fan out across multiple documents, process each one through the full analysis pipeline, then aggregate the results:
from langgraph.graph import StateGraphfrom typing import List, Dictimport asyncio
class BatchProcessingState(BaseModel): """State for processing multiple documents in parallel.""" documents: List[Dict[str, str]] processed_results: List[Dict] = Field(default_factory=list) aggregated_summary: Optional[str] = None
async def map_documents(state: BatchProcessingState) -> Dict: """Map phase - process each document in parallel.""" async def process_single_doc(doc): # Create individual workflow for each document doc_state = DocumentAnalysisState( document_id=doc["id"], content=doc["content"] )
# Process through our document workflow result = await app.ainvoke(doc_state) return result.dict()
# Process all documents in parallel tasks = [process_single_doc(doc) for doc in state.documents] results = await asyncio.gather(*tasks)
return {"processed_results": results}
async def reduce_results(state: BatchProcessingState) -> Dict: """Reduce phase - aggregate results into final summary.""" # Collect all summaries summaries = [r["summary"] for r in state.processed_results if r.get("summary")]
# Use an agent to create an aggregated summary aggregator = Agent( 'openai:gpt-4o', system_prompt="Create a comprehensive summary combining multiple document summaries." )
combined_text = "\n\n".join(f"Document {i+1}: {s}" for i, s in enumerate(summaries)) result = await aggregator.run(combined_text)
return {"aggregated_summary": result.data}
# Create batch processing workflowbatch_workflow = StateGraph(BatchProcessingState)batch_workflow.add_node("map", map_documents)batch_workflow.add_node("reduce", reduce_results)batch_workflow.add_edge("map", "reduce")batch_workflow.set_entry_point("map")
batch_app = batch_workflow.compile()Instead of processing documents one at a time, you fan out across all of them simultaneously while still maintaining type safety and validation at every step. For a batch of 20 documents, this can cut total processing time from minutes to seconds.
KEY INSIGHT: Map-reduce is the natural scaling pattern for LangGraph workflows. Fan out for parallel agent calls, fan in for aggregation, and let Pydantic validate at both boundaries.
Building Production-Ready Systems
Error Handling and Recovery
LLMs timeout. APIs go down. Sometimes the output just does not make sense. Our first production deployment taught us this the hard way: a workflow that ran perfectly for 200 test documents started failing on document 47 in a production batch because a network hiccup caused a timeout that cascaded into a full pipeline crash. We lost 46 documents worth of already-completed work.
The fix required three layers of resilience: retries with exponential backoff, fallback to cheaper models when the primary fails, and partial result preservation so a failure at step 3 does not destroy the work from steps 1 and 2.
from langgraph.graph import StateGraphfrom pydantic import BaseModel, Fieldfrom typing import Optional, Listimport asynciofrom datetime import datetime
class ResilientState(BaseModel): """State with built-in error tracking and recovery.""" task_id: str input_data: str current_step: str = "start" retry_count: int = 0 max_retries: int = 3 errors: List[Dict] = Field(default_factory=list) partial_results: Dict = Field(default_factory=dict) final_result: Optional[str] = None
def record_error(self, step: str, error: Exception): """Record an error for debugging and recovery.""" self.errors.append({ "step": step, "error": str(error), "timestamp": datetime.now().isoformat(), "retry_count": self.retry_count })
class RetryableAgent: """Wrapper for Pydantic AI agents with retry logic."""
def __init__(self, agent: Agent, max_retries: int = 3, backoff_factor: float = 2.0): self.agent = agent self.max_retries = max_retries self.backoff_factor = backoff_factor
async def run_with_retry(self, prompt: str, state: ResilientState) -> Optional[str]: """Execute agent with exponential backoff retry.""" for attempt in range(self.max_retries): try: result = await self.agent.run(prompt) return result.data except Exception as e: state.record_error(state.current_step, e)
if attempt < self.max_retries - 1: # Exponential backoff wait_time = self.backoff_factor ** attempt await asyncio.sleep(wait_time) else: # Final attempt failed return None
return None
# Create resilient workflowdef create_resilient_workflow(): workflow = StateGraph(ResilientState)
# Initialize retryable agents analyzer = RetryableAgent( Agent('openai:gpt-4o', system_prompt="Analyze the provided text.") )
enhancer = RetryableAgent( Agent('openai:gpt-4o', system_prompt="Enhance and improve the analysis.") )
async def analyze_with_fallback(state: ResilientState) -> Dict: """Analyze with fallback options.""" state.current_step = "analysis"
# Try primary analysis result = await analyzer.run_with_retry(state.input_data, state)
if result: return { "partial_results": {"analysis": result}, "current_step": "enhancement" }
# Fallback to simpler analysis fallback_agent = Agent( 'openai:gpt-3.5-turbo', system_prompt="Provide a basic analysis of the text." )
fallback_result = await fallback_agent.run(state.input_data) return { "partial_results": {"analysis": fallback_result.data, "used_fallback": True}, "current_step": "enhancement" }
async def enhance_with_recovery(state: ResilientState) -> Dict: """Enhance analysis with error recovery.""" state.current_step = "enhancement"
if not state.partial_results.get("analysis"): return { "final_result": "Analysis failed - no results to enhance", "current_step": "complete" }
analysis = state.partial_results["analysis"] result = await enhancer.run_with_retry( f"Enhance this analysis: {analysis}", state )
if result: return { "final_result": result, "current_step": "complete" }
# If enhancement fails, return original analysis return { "final_result": analysis, "current_step": "complete" }
# Build workflow with error paths workflow.add_node("analyze", analyze_with_fallback) workflow.add_node("enhance", enhance_with_recovery)
workflow.set_entry_point("analyze") workflow.add_edge("analyze", "enhance") workflow.add_edge("enhance", END)
return workflow.compile()This gives you multiple layers of resilience stacked together: automatic retries with exponential backoff, fallback to cheaper models if the primary fails, partial result preservation so you never lose completed work, and detailed error tracking for post-mortem debugging.
State Management That Scales
Managing state effectively is the difference between a demo and a production system. The key challenge: as workflows grow, serializing and deserializing the entire state at every checkpoint becomes a bottleneck. We learned to keep the in-flight state minimal and shunt large data to external storage.
from langgraph.graph import StateGraphfrom langgraph.checkpoint import MemorySaverfrom pydantic import BaseModel, Fieldfrom typing import Dict, List, Optionalimport json
class CheckpointableState(BaseModel): """State designed for efficient checkpointing."""
# Minimal core state workflow_id: str current_phase: str
# Separate large data that can be stored externally data_references: Dict[str, str] = Field( default_factory=dict, description="References to external data storage" )
# Computed properties that don't need persistence _cache: Dict = {} # Not persisted
class Config: # Only persist defined fields fields = {"workflow_id", "current_phase", "data_references"}
def store_large_data(self, key: str, data: any) -> str: """Store large data externally and keep reference.""" # In production, this would use S3, Redis, etc. reference = f"s3://bucket/{self.workflow_id}/{key}" # Simulate storage self._cache[reference] = data self.data_references[key] = reference return reference
def retrieve_data(self, key: str) -> any: """Retrieve data using reference.""" reference = self.data_references.get(key) if reference: # In production, fetch from external storage return self._cache.get(reference) return None
# Create workflow with checkpointingdef create_checkpointed_workflow(): # Initialize with memory saver for checkpointing checkpointer = MemorySaver()
workflow = StateGraph(CheckpointableState)
async def process_phase_1(state: CheckpointableState) -> Dict: """First phase - might take a long time.""" # Simulate processing result = {"processed": "large amount of data"}
# Store large result externally state.store_large_data("phase1_result", result)
return { "current_phase": "phase2", "data_references": state.data_references }
async def process_phase_2(state: CheckpointableState) -> Dict: """Second phase - uses results from phase 1.""" # Retrieve previous results phase1_data = state.retrieve_data("phase1_result")
if not phase1_data: raise ValueError("Phase 1 data not found")
# Continue processing final_result = f"Completed processing of {phase1_data}"
return { "current_phase": "complete" }
workflow.add_node("phase1", process_phase_1) workflow.add_node("phase2", process_phase_2)
workflow.set_entry_point("phase1") workflow.add_edge("phase1", "phase2") workflow.add_edge("phase2", END)
# Compile with checkpointing return workflow.compile(checkpointer=checkpointer)
# Use with checkpointingapp = create_checkpointed_workflow()
# Start processinginitial_state = CheckpointableState( workflow_id="job-123", current_phase="phase1")
# This can be interrupted and resumedconfig = {"configurable": {"thread_id": "job-123"}}result = await app.ainvoke(initial_state, config=config)
# Later, resume from checkpoint if needed# resumed_result = await app.ainvoke(None, config=config)Multi-Agent Orchestration
Real-world applications often require multiple specialized agents collaborating on a single task. Here is a content creation pipeline where a research agent feeds a writing agent, a fact-checker verifies the output, and the workflow can loop back to research if accuracy falls below threshold:

Figure 4: Multi-Agent Orchestration Workflow — Three specialized agents (research, writing, fact-checking) collaborate in a LangGraph workflow. Pydantic AI validates data at every handoff point, and conditional edges route the flow back to research if fact-checking scores drop below 0.7.
from langgraph.graph import StateGraphfrom pydantic import BaseModel, Fieldfrom pydantic_ai import Agentfrom typing import List, Dict, Optional
# Define specialized schemas for each agentclass ResearchFindings(BaseModel): """Output from research agent.""" topic: str key_facts: List[str] = Field(..., min_items=3, max_items=10) sources: List[str] = Field(..., min_items=1) confidence: float = Field(..., ge=0, le=1)
class WrittenContent(BaseModel): """Output from writing agent.""" title: str introduction: str body_paragraphs: List[str] = Field(..., min_items=2) conclusion: str word_count: int
class FactCheckReport(BaseModel): """Output from fact-checking agent.""" verified_facts: List[Dict[str, bool]] accuracy_score: float = Field(..., ge=0, le=1) corrections_needed: List[str] = Field(default_factory=list) recommendation: str
# Create specialized agentsresearch_agent = Agent( 'openai:gpt-4o', result_type=ResearchFindings, system_prompt="""You are a meticulous research specialist. Find accurate, relevant information and always cite sources.""")
writing_agent = Agent( 'openai:gpt-4o', result_type=WrittenContent, system_prompt="""You are a skilled content writer. Create engaging, well-structured content based on research findings.""")
fact_check_agent = Agent( 'openai:gpt-4o', result_type=FactCheckReport, system_prompt="""You are a fact-checking expert. Verify claims, check sources, and ensure accuracy.""")
# Define the multi-agent workflow stateclass ContentCreationState(BaseModel): """State for multi-agent content creation workflow.""" user_request: str research_findings: Optional[ResearchFindings] = None written_content: Optional[WrittenContent] = None fact_check_report: Optional[FactCheckReport] = None final_content: Optional[str] = None workflow_status: str = "started"
# Implement workflow nodesasync def conduct_research(state: ContentCreationState) -> Dict: """Research phase using specialized agent.""" result = await research_agent.run( f"Research this topic thoroughly: {state.user_request}" )
return { "research_findings": result.data, "workflow_status": "research_complete" }
async def write_content(state: ContentCreationState) -> Dict: """Writing phase using research findings.""" research = state.research_findings
# Prepare context for writing agent context = f""" Topic: {research.topic} Key Facts: {chr(10).join(f'- {fact}' for fact in research.key_facts)}
Sources: {', '.join(research.sources)}
Write comprehensive content about this topic. """
result = await writing_agent.run(context)
return { "written_content": result.data, "workflow_status": "writing_complete" }
async def fact_check_content(state: ContentCreationState) -> Dict: """Fact-checking phase.""" content = state.written_content research = state.research_findings
# Prepare fact-checking context check_context = f""" Original Research Facts: {chr(10).join(f'- {fact}' for fact in research.key_facts)}
Written Content to Verify: Title: {content.title} Introduction: {content.introduction} Body: {chr(10).join(content.body_paragraphs)}
Verify all claims against the research facts. """
result = await fact_check_agent.run(check_context)
return { "fact_check_report": result.data, "workflow_status": "fact_check_complete" }
async def finalize_content(state: ContentCreationState) -> Dict: """Final phase - incorporate fact-checking feedback.""" content = state.written_content fact_check = state.fact_check_report
if fact_check.accuracy_score >= 0.9: # Content is accurate enough final = f"{content.title}\n\n{content.introduction}\n\n" final += "\n\n".join(content.body_paragraphs) final += f"\n\n{content.conclusion}" else: # Need to revise based on fact-checking revision_agent = Agent( 'openai:gpt-4o', system_prompt="Revise content to address fact-checking concerns." )
revision_context = f""" Original content needs revision.
Corrections needed: {chr(10).join(f'- {correction}' for correction in fact_check.corrections_needed)}
Original content: {content.title} {content.introduction} {chr(10).join(content.body_paragraphs)}
Revise to ensure accuracy. """
revision = await revision_agent.run(revision_context) final = revision.data
return { "final_content": final, "workflow_status": "complete" }
# Build the multi-agent workflowdef create_content_workflow(): workflow = StateGraph(ContentCreationState)
# Add all nodes workflow.add_node("research", conduct_research) workflow.add_node("write", write_content) workflow.add_node("fact_check", fact_check_content) workflow.add_node("finalize", finalize_content)
# Define the flow workflow.set_entry_point("research") workflow.add_edge("research", "write") workflow.add_edge("write", "fact_check")
# Conditional edge based on fact-checking results def route_after_fact_check(state: ContentCreationState) -> str: if state.fact_check_report.accuracy_score < 0.7: # Too many errors, go back to research return "research" return "finalize"
workflow.add_conditional_edges( "fact_check", route_after_fact_check, { "research": "research", "finalize": "finalize" } )
workflow.add_edge("finalize", END)
return workflow.compile()
# Use the multi-agent systemcontent_app = create_content_workflow()
# Create content on any topicinitial_request = ContentCreationState( user_request="Write about the latest advances in quantum computing")
result = await content_app.ainvoke(initial_request)print(f"Final content:\n{result.final_content}")Several patterns make this system robust. Each agent has a specialized role with its own validation schema. Data flows between agents are type-safe and validated. The workflow can loop back if quality checks fail. And the entire system is modular — you can swap agents or add new ones without touching the rest of the graph.
Performance Optimization
Parallel Processing at Scale
When production workloads arrive, performance becomes the constraint that matters. The research platform below demonstrates how to split a complex query into independent subtopics, research them all in parallel, and synthesize the results:

Figure 5: Research Platform Architecture — Multiple topics are researched simultaneously through parallel async calls, with results flowing into a synthesis phase that combines findings into a unified report.
from langgraph.graph import StateGraphfrom pydantic import BaseModel, Fieldfrom pydantic_ai import Agentfrom typing import List, Dictimport asynciofrom concurrent.futures import ThreadPoolExecutor
class TopicResearchTask(BaseModel): """Individual research task.""" topic_id: str topic_name: str search_queries: List[str] max_sources: int = 5
class TopicResearchResult(BaseModel): """Results from researching a single topic.""" topic_id: str topic_name: str findings: List[str] sources: List[str] relevance_score: float
class ResearchPlatformState(BaseModel): """State for high-performance research platform.""" main_query: str generated_topics: List[TopicResearchTask] = Field(default_factory=list) research_results: List[TopicResearchResult] = Field(default_factory=list) synthesis: Optional[str] = None performance_metrics: Dict = Field(default_factory=dict)
# Specialized agentstopic_generator = Agent( 'openai:gpt-4o', system_prompt="""Break down complex research queries into specific, researchable subtopics. Each topic should be focused and independent.""")
topic_researcher = Agent( 'openai:gpt-4o', result_type=TopicResearchResult, system_prompt="Research the given topic thoroughly and provide structured findings.")
synthesis_agent = Agent( 'openai:gpt-4o', system_prompt="""Synthesize multiple research findings into a comprehensive, coherent report that addresses the original query.""")
# High-performance workflow implementationasync def generate_research_topics(state: ResearchPlatformState) -> Dict: """Generate parallel research topics.""" import time start_time = time.time()
# Generate topics using the agent prompt = f""" Break down this research query into 3-5 independent subtopics: {state.main_query}
For each topic, provide: 1. A focused topic name 2. 2-3 specific search queries """
result = await topic_generator.run(prompt)
# Parse result into structured topics # In production, you'd have more robust parsing topics = [] for i in range(3): # Simplified for example topics.append(TopicResearchTask( topic_id=f"topic_{i}", topic_name=f"Subtopic {i+1}", search_queries=[f"query {i}.1", f"query {i}.2"] ))
generation_time = time.time() - start_time
return { "generated_topics": topics, "performance_metrics": {"topic_generation_time": generation_time} }
async def parallel_research(state: ResearchPlatformState) -> Dict: """Research all topics in parallel.""" import time start_time = time.time()
async def research_single_topic(topic: TopicResearchTask) -> TopicResearchResult: """Research a single topic asynchronously.""" # Simulate API calls or database queries search_results = await asyncio.gather(*[ simulate_search(query) for query in topic.search_queries ])
# Use agent to analyze search results context = f""" Topic: {topic.topic_name} Search Results: {search_results}
Analyze and summarize the findings. """
result = await topic_researcher.run(context) return result.data
# Execute all topic research in parallel research_tasks = [ research_single_topic(topic) for topic in state.generated_topics ]
results = await asyncio.gather(*research_tasks)
research_time = time.time() - start_time metrics = state.performance_metrics.copy() metrics["parallel_research_time"] = research_time metrics["topics_researched"] = len(results)
return { "research_results": results, "performance_metrics": metrics }
async def synthesize_findings(state: ResearchPlatformState) -> Dict: """Synthesize all research into final report.""" # Prepare synthesis context findings_text = "\n\n".join([ f"Topic: {r.topic_name}\n" + "\n".join(f"- {finding}" for finding in r.findings) for r in state.research_results ])
synthesis_prompt = f""" Original Query: {state.main_query}
Research Findings: {findings_text}
Create a comprehensive synthesis that addresses the original query. """
result = await synthesis_agent.run(synthesis_prompt)
return {"synthesis": result.data}
# Simulate external operationsasync def simulate_search(query: str) -> str: """Simulate an external search operation.""" await asyncio.sleep(0.1) # Simulate network delay return f"Results for '{query}'"
# Build the high-performance workflowdef create_research_platform(): workflow = StateGraph(ResearchPlatformState)
workflow.add_node("generate_topics", generate_research_topics) workflow.add_node("parallel_research", parallel_research) workflow.add_node("synthesize", synthesize_findings)
workflow.set_entry_point("generate_topics") workflow.add_edge("generate_topics", "parallel_research") workflow.add_edge("parallel_research", "synthesize") workflow.add_edge("synthesize", END)
return workflow.compile()
# Performance monitoring wrapperasync def run_with_monitoring(app, state): """Run workflow with performance monitoring.""" import time
start_time = time.time() result = await app.ainvoke(state) total_time = time.time() - start_time
print(f"Performance Metrics:") print(f"- Total execution time: {total_time:.2f}s") print(f"- Topic generation: {result.performance_metrics['topic_generation_time']:.2f}s") print(f"- Parallel research: {result.performance_metrics['parallel_research_time']:.2f}s") print(f"- Topics processed: {result.performance_metrics['topics_researched']}") print(f"- Speedup vs sequential: {result.performance_metrics['topics_researched']:.1f}x")
return resultFour Optimization Strategies That Matter
Beyond parallelization, there are four areas that consistently deliver performance gains in production LangGraph + Pydantic AI systems:

Figure 6: Performance Optimization Strategies — Four pillars of optimization: state minimization to reduce checkpoint overhead, validation caching to avoid redundant schema checks, parallelization to maximize throughput, and response caching to eliminate duplicate LLM calls.
Here are implementations of each strategy:
from functools import lru_cachefrom typing import Dict, Anyimport hashlibimport json
class OptimizedWorkflowComponents: """Collection of optimization techniques for Langgraph + Pydantic AI."""
def __init__(self): self.validation_cache = {} self.llm_response_cache = {}
# 1. State Optimization def minimize_state(self, state: Dict[str, Any]) -> Dict[str, Any]: """Remove unnecessary data from state before transitions.""" # Define fields that need to persist essential_fields = {'id', 'status', 'current_data', 'next_step'}
# Store large data externally and keep references minimized = {} for key, value in state.items(): if key in essential_fields: minimized[key] = value elif isinstance(value, (list, dict)) and len(str(value)) > 1000: # Store large objects externally ref = self.store_large_object(value) minimized[f"{key}_ref"] = ref
return minimized
def store_large_object(self, obj: Any) -> str: """Store large object and return reference.""" # In production, use S3, Redis, etc. obj_hash = hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() # Simulate storage return f"ref://{obj_hash}"
# 2. Validation Optimization @lru_cache(maxsize=1000) def cached_validation(self, model_class: type, data_hash: str) -> bool: """Cache validation results for repeated data.""" # This would be called with hash of data # In practice, implement actual validation return True
# 3. Parallel Agent Execution async def parallel_agent_calls(self, agents: List[Agent], prompts: List[str]) -> List[Any]: """Execute multiple agents in parallel.""" tasks = [ agent.run(prompt) for agent, prompt in zip(agents, prompts) ]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any failures processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): # Fallback or retry logic processed_results.append(None) else: processed_results.append(result.data)
return processed_results
# 4. Response Caching def get_cached_response(self, agent_id: str, prompt: str) -> Optional[str]: """Check if we have a cached response for this prompt.""" cache_key = f"{agent_id}:{hashlib.md5(prompt.encode()).hexdigest()}" return self.llm_response_cache.get(cache_key)
def cache_response(self, agent_id: str, prompt: str, response: str): """Cache an LLM response.""" cache_key = f"{agent_id}:{hashlib.md5(prompt.encode()).hexdigest()}" self.llm_response_cache[cache_key] = response
# Example: Optimized document processing workflowclass OptimizedDocumentProcessor: """Document processor with performance optimizations."""
def __init__(self): self.optimizer = OptimizedWorkflowComponents() self.summary_agent = Agent( 'openai:gpt-3.5-turbo', # Cheaper, faster model system_prompt="Summarize documents concisely." ) self.analysis_agent = Agent( 'openai:gpt-4o', # More capable model for complex analysis system_prompt="Provide deep analysis of document content." )
async def process_documents_batch(self, documents: List[str]) -> Dict: """Process multiple documents with optimizations.""" # 1. Use cheaper model for initial filtering summaries = await self.optimizer.parallel_agent_calls( [self.summary_agent] * len(documents), [f"Summarize: {doc[:500]}" for doc in documents] # Only send first 500 chars )
# 2. Filter documents that need deep analysis documents_for_analysis = [] for i, (doc, summary) in enumerate(zip(documents, summaries)): if self.needs_deep_analysis(summary): documents_for_analysis.append((i, doc))
# 3. Parallel deep analysis only for selected documents if documents_for_analysis: analysis_prompts = [ f"Analyze this document in detail: {doc}" for _, doc in documents_for_analysis ]
analyses = await self.optimizer.parallel_agent_calls( [self.analysis_agent] * len(documents_for_analysis), analysis_prompts )
# 4. Combine results results = { "total_documents": len(documents), "analyzed_documents": len(documents_for_analysis), "summaries": summaries, "detailed_analyses": { i: analysis for (i, _), analysis in zip(documents_for_analysis, analyses) } }
return results
def needs_deep_analysis(self, summary: str) -> bool: """Determine if document needs deep analysis based on summary.""" # Simple heuristic - in practice, use more sophisticated logic keywords = ['important', 'critical', 'urgent', 'complex'] return any(keyword in summary.lower() for keyword in keywords)KEY INSIGHT: Use a cheap, fast model (like GPT-3.5) for triage and filtering, then route only the documents that need it to an expensive model (like GPT-4o). This two-tier approach can cut LLM costs by 60-80% on batch workloads.
Real-World Application: E-commerce Order Processing
To ground these patterns in something concrete, here is a full e-commerce order processing system. It handles inventory checks, payment processing, and shipping coordination as a single LangGraph workflow with Pydantic validation at every step:
from langgraph.graph import StateGraphfrom pydantic import BaseModel, Field, validatorfrom pydantic_ai import Agentfrom typing import List, Optional, Literalfrom datetime import datetimefrom decimal import Decimal
# Domain modelsclass OrderItem(BaseModel): """Individual item in an order.""" product_id: str quantity: int = Field(gt=0) unit_price: Decimal = Field(gt=0, decimal_places=2)
@property def total_price(self) -> Decimal: return self.quantity * self.unit_price
class Order(BaseModel): """E-commerce order with validation.""" order_id: str customer_id: str items: List[OrderItem] = Field(min_items=1) shipping_address: str billing_address: str payment_method: Literal["credit_card", "paypal", "bank_transfer"] status: str = "pending"
@property def total_amount(self) -> Decimal: return sum(item.total_price for item in self.items)
@validator('items') def validate_items(cls, items): # Check for duplicate products product_ids = [item.product_id for item in items] if len(product_ids) != len(set(product_ids)): raise ValueError("Duplicate products in order") return items
class OrderProcessingState(BaseModel): """State for order processing workflow.""" order: Order inventory_checked: bool = False payment_verified: bool = False shipping_arranged: bool = False notifications_sent: List[str] = Field(default_factory=list) processing_errors: List[str] = Field(default_factory=list)
# Specialized agentsinventory_agent = Agent( 'openai:gpt-4o', system_prompt="""You are an inventory management specialist. Check product availability and reserve items.""")
payment_agent = Agent( 'openai:gpt-4o', system_prompt="""You are a payment processing specialist. Verify payment methods and process transactions securely.""")
shipping_agent = Agent( 'openai:gpt-4o', system_prompt="""You are a shipping coordinator. Arrange optimal shipping based on destination and items.""")
# Workflow implementationasync def check_inventory(state: OrderProcessingState) -> Dict: """Check inventory for all items.""" # In production, this would query real inventory systems inventory_query = "\n".join([ f"Product {item.product_id}: {item.quantity} units" for item in state.order.items ])
result = await inventory_agent.run( f"Check availability for:\n{inventory_query}" )
# Simulate inventory check all_available = True # In reality, parse agent response
if all_available: return { "inventory_checked": True, "order": {**state.order.dict(), "status": "inventory_confirmed"} } else: return { "processing_errors": state.processing_errors + ["Insufficient inventory"], "order": {**state.order.dict(), "status": "failed"} }
async def process_payment(state: OrderProcessingState) -> Dict: """Process payment for the order.""" if not state.inventory_checked: return { "processing_errors": state.processing_errors + ["Cannot process payment before inventory check"] }
payment_context = f""" Order Total: ${state.order.total_amount} Payment Method: {state.order.payment_method} Customer ID: {state.order.customer_id} """
result = await payment_agent.run( f"Process payment:\n{payment_context}" )
# Simulate payment processing payment_successful = True # In reality, integrate with payment gateway
if payment_successful: return { "payment_verified": True, "order": {**state.order.dict(), "status": "payment_confirmed"}, "notifications_sent": state.notifications_sent + ["payment_confirmation"] } else: return { "processing_errors": state.processing_errors + ["Payment failed"], "order": {**state.order.dict(), "status": "payment_failed"} }
async def arrange_shipping(state: OrderProcessingState) -> Dict: """Arrange shipping for the order.""" if not state.payment_verified: return { "processing_errors": state.processing_errors + ["Cannot ship before payment"] }
shipping_context = f""" Destination: {state.order.shipping_address} Items: {len(state.order.items)} items, Total weight: TBD Priority: Standard """
result = await shipping_agent.run( f"Arrange shipping:\n{shipping_context}" )
return { "shipping_arranged": True, "order": {**state.order.dict(), "status": "shipped"}, "notifications_sent": state.notifications_sent + ["shipping_confirmation"] }
# Build the order processing workflowdef create_order_workflow(): workflow = StateGraph(OrderProcessingState)
workflow.add_node("inventory", check_inventory) workflow.add_node("payment", process_payment) workflow.add_node("shipping", arrange_shipping)
# Define flow with conditional routing workflow.set_entry_point("inventory")
def route_after_inventory(state: OrderProcessingState) -> str: if state.inventory_checked and not state.processing_errors: return "payment" return END
def route_after_payment(state: OrderProcessingState) -> str: if state.payment_verified and not state.processing_errors: return "shipping" return END
workflow.add_conditional_edges("inventory", route_after_inventory) workflow.add_conditional_edges("payment", route_after_payment) workflow.add_edge("shipping", END)
return workflow.compile()
# Usage exampleorder_processor = create_order_workflow()
# Process an ordertest_order = Order( order_id="ORD-12345", customer_id="CUST-789", items=[ OrderItem(product_id="PROD-A", quantity=2, unit_price=Decimal("29.99")), OrderItem(product_id="PROD-B", quantity=1, unit_price=Decimal("49.99")) ], shipping_address="123 Main St, Anytown, USA", billing_address="123 Main St, Anytown, USA", payment_method="credit_card")
initial_state = OrderProcessingState(order=test_order)result = await order_processor.ainvoke(initial_state)
print(f"Order Status: {result.order.status}")print(f"Notifications sent: {result.notifications_sent}")This example demonstrates the full integration pattern at work: complex business logic with multiple steps, Pydantic validation at every stage, conditional routing based on success or failure, integration points for external systems, and comprehensive error handling that prevents partial failures from corrupting the order state.
Benefits, Challenges, and Honest Tradeoffs
What You Gain
After building several production systems with this stack, five advantages stand out:
- Workflow flexibility — LangGraph’s graph-based approach lets you model complex workflows naturally. Need to add a fraud check step? Add a node. Want parallel processing? Add parallel edges. The graph evolves with your requirements.
- Type safety throughout — Pydantic’s validation ensures data integrity at every step. You are not hoping the LLM returns the right format. You are guaranteeing it.
- Debuggable failures — When something goes wrong, you can see exactly which node failed and why. The combination of structured logs and validated data makes debugging tractable instead of maddening.
- Reusable components — Both frameworks encourage modular design. Agents can be reused across workflows, and workflow patterns can be templated for similar use cases.
- Horizontal scalability — The graph structure naturally supports scaling out. Different nodes can run on different machines, and the state management keeps everything coordinated.
What You Will Struggle With
The challenges are real too:
- Learning curve — If you are coming from simple LLM chains, graph-based thinking requires adjustment. Start with simple linear workflows and gradually add complexity.
- State management complexity — As workflows grow, managing state becomes the hardest part of the system. Use checkpointing features and external storage for large data from the beginning, not as an afterthought.
- Performance overhead — Validation and state management add latency. Profile your workflows and optimize critical paths with caching and parallelization.
- Testing difficulty — Testing stateful workflows is harder than testing individual functions. Build comprehensive test suites that cover both happy paths and edge cases, and invest in deterministic mock agents for CI.
What Comes Next
Patterns on the Horizon
Four emerging patterns are worth watching:
- Adaptive workflows — Systems that modify their own graph topology based on performance metrics or user feedback.
- Cross-organization workflows — Federated graphs that span multiple organizations while maintaining security and privacy boundaries.
- Real-time collaboration — Multiple agents and humans working together in the same workflow with live state updates.
- Advanced state persistence — More sophisticated checkpointing with automatic recovery and schema migration capabilities.
Getting Started: A Practical Checklist
If you are ready to build production AI systems with LangGraph and Pydantic AI, here is what we recommend based on our own trial-and-error:
- Sketch the workflow as a graph first — Before writing any code, draw out your nodes, edges, and state requirements. The visual representation catches design flaws early.
- Define your Pydantic models before your agents — Comprehensive data models for all inter-node data is an investment that pays off within the first week.
- Build incrementally — Start with a simple linear workflow. Add conditional edges, then parallel execution, then error recovery. Test thoroughly at each step.
- Instrument everything — Track not just errors but also performance metrics, state transitions, and agent interactions. You cannot optimize what you cannot measure.
- Design for failure from day one — Use conditional edges to handle failures gracefully. Build the resilience patterns (retries, fallbacks, partial results) into the first version, not the second.
KEY INSIGHT: Start with a two-node linear workflow that actually runs in production. Then add complexity one edge at a time. Teams that try to build the full graph on day one never ship.
Conclusion
LangGraph and Pydantic AI solve different problems that, combined, address the central challenge of production AI systems: how do you build agent workflows that are both flexible enough to handle real-world complexity and reliable enough to trust with real-world data?
LangGraph gives you the orchestration — conditional routing, parallel execution, stateful checkpoints, and error recovery paths. Pydantic AI gives you the guarantees — type-safe outputs, validated state transitions, and schema enforcement at every boundary. Neither framework alone gets you to production-ready. Together, they do.
The future of AI development is not about better models or cleverer prompts. It is about building robust systems that can handle the messiness of production while maintaining the reliability that businesses demand. With LangGraph and Pydantic AI, we finally have the right tools for that job.
References
[1] “Langgraph: A Comprehensive Technical Overview.” https://github.com/langchain-ai/langgraph (2025)
[2] “Pydantic AI: An Agent Framework for Building GenAI Applications.” https://github.com/jxnl/pydantic-ai (2025)
[3] “Best Practices for Designing Multi-Agent Systems with LangGraph.” https://blog.langchain.dev/multi-agent-systems/ (2025)
[4] “Integrating Langgraph with Pydantic: Validation Patterns for State Transitions.” https://docs.pydantic.ai/latest/integrations/ (2025)
[5] “Error Handling in Langgraph: From Validation to Graceful Recovery.” https://python.langchain.com/docs/langgraph/error-handling/ (2025)
[6] “Performance Considerations for Scaling LangGraph + Pydantic AI Systems.” https://docs.pydantic.ai/latest/usage/pydantic_ai/ (2025)
[7] “Exploring Pydantic AI’s Core Features: Schema Inference, Function Calling, and Structured Output.” https://jxnl.github.io/instructor/ (2024)
[8] “Technical Benefits of Pydantic AI for Implementing AI Agent Patterns.” https://medium.com/@pydantic/pydantic-ai-9c211c5dfa5 (2024)
[9] “Pydantic AI in Production: Agent Patterns, Challenges, and Benefits.” https://www.pydantic.dev/blog/pydantic-ai-in-production/ (2025)