7466 words
37 minutes
Advanced Testing Strategies for LangGraph and Pydantic AI Agent Systems

We shipped an AI agent that passed every unit test in the suite. Green across the board. Then it hit production and started routing customer complaints to the sales team, hallucinating product names that didn’t exist, and occasionally responding in French. The tests never caught any of it because we were testing the wrong things at the wrong level. After rebuilding our entire test strategy around LangGraph’s state machine model and Pydantic AI’s validation primitives, we cut production incidents by 90% and dropped our test execution time from 12 minutes to 40 seconds.

That experience taught us a hard lesson. Traditional testing assumes deterministic inputs and outputs. AI agents laugh at that assumption. The same prompt can produce different responses across runs. State mutates as it flows through graph nodes. External LLM calls fail in ways you can’t predict. You need a testing strategy built specifically for this kind of system, one that accounts for probabilistic behavior, validates state transitions, and catches the subtle orchestration bugs that only surface three nodes deep in a workflow.

The good news: LangGraph and Pydantic AI give you the tools to make this tractable. LangGraph’s explicit state graphs turn implicit agent behavior into something you can inspect and verify at every step. Pydantic AI’s TestModel and FunctionModel let you rip out the LLM entirely and test your logic in milliseconds. Together, they turn “hope it works” into “prove it works.”

Why AI Agent Testing Breaks Traditional Approaches#

Five Layers of Complexity You Didn’t Ask For#

When we first tried to test our LangGraph agents with standard pytest patterns, we kept running into the same wall. The tests passed, but the system still broke. Here is why.

Stateful complexity is the biggest culprit. LangGraph maintains state as information flows through nodes, and that state mutates at each step. A bug in node two might not manifest until node five, when the corrupted state triggers an unexpected conditional branch. You cannot catch this with isolated unit tests.

LLM non-determinism compounds the problem. Even with temperature set to 0, language models produce slightly different outputs for the same input. Your tests need to verify the intent and structure of responses, not exact string matches.

Validation boundaries in Pydantic AI double your test surface. You need to verify that valid data passes through and that invalid data gets properly rejected with the right error messages. Testing only the happy path is testing half the system.

Tool integration introduces external dependencies that break test isolation. When your agent calls APIs, queries databases, or hits other services, you need strategies that test the integration logic without creating brittle or expensive external calls.

Orchestration logic demands testing at multiple levels. Individual nodes might work perfectly in isolation but fail when composed into a graph, because the routing conditions, state merges, or error propagation paths contain bugs that only appear during full execution.

The Testing Pyramid for AI Agents#

To catch bugs at every level, we need a layered strategy where each tier builds on the one below.

Figure 1: AI Agent Testing Pyramid. Unit tests form the base, verifying individual node functions and validation rules. Integration tests check component interactions. Workflow tests validate graph execution and state transitions. End-to-end tests at the top verify complete system behavior. Each layer catches different failure modes.

Unit tests verify individual components: node functions, validation rules, tool interfaces. They run in milliseconds and catch basic logic errors.

Integration tests validate interactions between components. Can a Pydantic AI agent call its tools correctly? Does a state update from one LangGraph node arrive intact at the next? These catch the subtle “components don’t quite fit together” bugs.

Workflow tests exercise complete LangGraph graphs. They validate state transitions, conditional routing, and error propagation across multiple nodes. These are where orchestration bugs surface.

End-to-end tests confirm the system works from the user’s perspective. They simulate full conversations and verify that all pieces deliver the expected result together.

You need all four layers. Unit tests alone miss orchestration bugs. End-to-end tests alone make it impossible to pinpoint which component failed.

KEY INSIGHT: Test AI agents at every layer of the pyramid. Unit tests that pass tell you components work in isolation. Only workflow and E2E tests tell you the system actually works.

Testing Pydantic AI Components#

Unit Testing with TestModel#

Pydantic AI’s TestModel changed how we write agent tests. Instead of making expensive, non-deterministic LLM calls during testing, TestModel provides predictable responses. Tests run in milliseconds, and they actually test your logic instead of the LLM’s mood that day.

from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel, Field
from typing import List, Optional
import pytest
# First, let's define a structured output model
class CustomerAnalysis(BaseModel):
"""Structured analysis of customer sentiment and needs."""
sentiment: float = Field(..., ge=-1, le=1, description="Sentiment score from -1 to 1")
primary_issue: str = Field(..., description="Main customer concern")
urgency_level: int = Field(..., ge=1, le=5, description="Urgency from 1-5")
recommended_actions: List[str] = Field(..., min_items=1, max_items=3)
# Create our agent with structured output
customer_agent = Agent(
'openai:gpt-4o',
result_type=CustomerAnalysis,
system_prompt="You are a customer service analyst. Analyze customer messages for sentiment, issues, and recommended actions."
)
# Now let's write comprehensive tests
class TestCustomerAgent:
def test_basic_analysis(self):
"""Test that agent produces valid structured output."""
# TestModel returns a simple response by default
with customer_agent.override(model=TestModel()):
result = customer_agent.run_sync("I'm frustrated with the slow shipping")
# The result is guaranteed to be a CustomerAnalysis instance
assert isinstance(result.data, CustomerAnalysis)
assert -1 <= result.data.sentiment <= 1
assert 1 <= result.data.urgency_level <= 5
assert len(result.data.recommended_actions) >= 1
def test_custom_responses(self):
"""Test agent with specific mock responses."""
# Create a TestModel with a specific response
mock_response = CustomerAnalysis(
sentiment=-0.8,
primary_issue="Shipping delays",
urgency_level=4,
recommended_actions=["Expedite shipping", "Offer compensation", "Follow up"]
)
# TestModel can return structured data directly
test_model = TestModel(response=mock_response.model_dump_json())
with customer_agent.override(model=test_model):
result = customer_agent.run_sync("My order is 2 weeks late!")
# Verify we got our expected response
assert result.data.sentiment == -0.8
assert result.data.primary_issue == "Shipping delays"
assert "Expedite shipping" in result.data.recommended_actions
def test_error_handling(self):
"""Test that agent handles errors gracefully."""
# TestModel can simulate errors too
error_model = TestModel(response=Exception("API Error"))
with customer_agent.override(model=error_model):
with pytest.raises(Exception) as exc_info:
customer_agent.run_sync("Test message")
assert "API Error" in str(exc_info.value)

The pattern here is straightforward: override the model, run your agent, assert on the structured output. You are testing how your agent handles responses, validates data, and deals with errors, all without touching an external LLM.

Controlling Agent Behavior with FunctionModel#

When you need finer-grained control over how your mock agent responds, FunctionModel lets you write custom response logic. We used this to test multi-step tool workflows where the agent’s behavior depends on what happened in previous turns.

from pydantic_ai.models.function import FunctionModel, AgentInfo
from pydantic_ai.messages import ModelMessage, ModelResponse, TextPart, ToolCallPart
import asyncio
# Let's create a more complex agent with tools
agent_with_tools = Agent(
'openai:gpt-4o',
system_prompt="You are a helpful assistant with access to various tools."
)
@agent_with_tools.tool
async def check_inventory(product_id: str) -> int:
"""Check inventory levels for a product."""
# In tests, this might return mock data
return 42
@agent_with_tools.tool
async def calculate_shipping(weight: float, destination: str) -> float:
"""Calculate shipping cost."""
return weight * 2.5 # Simplified calculation
# Now create sophisticated test scenarios
async def test_multi_tool_workflow():
"""Test complex workflows involving multiple tool calls."""
call_sequence = []
async def custom_model_behavior(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
"""Simulate specific model behavior based on conversation state."""
# Track what's been called
nonlocal call_sequence
# Get the last user message
last_message = messages[-1].content
if "check stock" in last_message.lower():
# First, indicate we'll check inventory
call_sequence.append("intent_recognized")
return ModelResponse(
parts=[
TextPart("I'll check the inventory for you."),
ToolCallPart(
tool_name="check_inventory",
args={"product_id": "PROD-123"}
)
]
)
elif any(part.part_kind == "tool-return" for part in messages[-1].parts):
# We got tool results back, now calculate shipping
call_sequence.append("tool_result_processed")
return ModelResponse(
parts=[
TextPart("The product is in stock. Let me calculate shipping."),
ToolCallPart(
tool_name="calculate_shipping",
args={"weight": 2.5, "destination": "New York"}
)
]
)
else:
# Final response after all tools
call_sequence.append("final_response")
return ModelResponse(
parts=[TextPart("Product PROD-123 is in stock (42 units) with shipping cost of $6.25 to New York.")]
)
# Run the test with our custom function
with agent_with_tools.override(model=FunctionModel(custom_model_behavior)):
result = await agent_with_tools.run("Check stock for PROD-123 and shipping to New York")
# Verify the workflow executed correctly
assert call_sequence == ["intent_recognized", "tool_result_processed", "final_response"]
assert "42 units" in result.data
assert "$6.25" in result.data
# Test error recovery in tool execution
async def test_tool_error_recovery():
"""Test how agent handles tool failures."""
async def model_with_fallback(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
# Check if we just got a tool error
last_message = messages[-1]
if hasattr(last_message, 'parts'):
for part in last_message.parts:
if part.part_kind == "tool-return" and "error" in str(part.content).lower():
# Provide helpful fallback response
return ModelResponse(
parts=[TextPart("I encountered an error checking inventory, but I can help you place a backorder instead.")]
)
# Initial tool call that will fail
return ModelResponse(
parts=[ToolCallPart(tool_name="check_inventory", args={"product_id": "INVALID"})]
)
# Override the tool to simulate failure
async def failing_inventory_check(product_id: str) -> int:
raise ValueError("Product not found")
agent_with_tools._tools["check_inventory"].func = failing_inventory_check
with agent_with_tools.override(model=FunctionModel(model_with_fallback)):
result = await agent_with_tools.run("Check stock for INVALID")
assert "backorder" in result.data.lower()

With FunctionModel, you control exactly what the agent “thinks” at each step. You can simulate multi-step tool chains, test error recovery paths, and verify conversation flow, all deterministically and all in milliseconds.

Figure 2: Pydantic AI Testing Flow. TestModel provides predetermined responses for fast, simple tests. FunctionModel lets you write custom response logic based on conversation state, enabling sophisticated scenario testing. Both approaches avoid real LLM calls while thoroughly exercising your agent logic.

KEY INSIGHT: Use TestModel for “does the plumbing work” tests and FunctionModel for “does the agent behave correctly across multi-turn interactions” tests. Together they cover the full range of agent behavior without a single LLM call.

Validation Testing at the Boundaries#

One of Pydantic AI’s biggest strengths is its validation layer. But we learned the hard way that validation only protects you if you test both sides of every boundary. Our first agent accepted malformed meeting requests for weeks before anyone noticed, because we only tested the happy path.

Here is how we test validation comprehensively now:

from pydantic import BaseModel, Field, validator, model_validator
from typing import List, Optional, Dict
import pytest
from datetime import datetime, timedelta
class MeetingScheduleRequest(BaseModel):
"""Complex model with multiple validation rules."""
title: str = Field(..., min_length=3, max_length=100)
attendees: List[str] = Field(..., min_items=2, max_items=20)
duration_minutes: int = Field(..., ge=15, le=480) # 15 min to 8 hours
proposed_time: datetime
meeting_type: str = Field(..., pattern="^(video|audio|in-person)$")
@validator('attendees')
def validate_attendees(cls, v):
"""Ensure all attendees have valid email format."""
import re
email_pattern = re.compile(r'^[\w\.-]+@[\w\.-]+\.\w+$')
invalid_emails = [email for email in v if not email_pattern.match(email)]
if invalid_emails:
raise ValueError(f"Invalid email addresses: {invalid_emails}")
# Check for duplicates
if len(set(v)) != len(v):
raise ValueError("Duplicate attendees not allowed")
return v
@validator('proposed_time')
def validate_future_time(cls, v):
"""Ensure meeting is scheduled in the future."""
if v <= datetime.now():
raise ValueError("Meeting must be scheduled in the future")
# Not too far in the future
if v > datetime.now() + timedelta(days=365):
raise ValueError("Cannot schedule meetings more than a year in advance")
return v
@model_validator(mode='after')
def validate_video_meeting_duration(self):
"""Video meetings shouldn't exceed 2 hours."""
if self.meeting_type == 'video' and self.duration_minutes > 120:
raise ValueError("Video meetings should not exceed 2 hours")
return self
# Comprehensive validation tests
class TestMeetingValidation:
def test_boundary_values(self):
"""Test validation at the edges of acceptable ranges."""
base_data = {
"title": "Test Meeting",
"attendees": ["user1@example.com", "user2@example.com"],
"proposed_time": datetime.now() + timedelta(hours=1),
"meeting_type": "video"
}
# Test duration boundaries
# Minimum valid duration
valid_min = MeetingScheduleRequest(**{**base_data, "duration_minutes": 15})
assert valid_min.duration_minutes == 15
# Maximum valid duration
valid_max = MeetingScheduleRequest(**{**base_data, "duration_minutes": 480})
assert valid_max.duration_minutes == 480
# Below minimum
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{**base_data, "duration_minutes": 14})
assert "greater than or equal to 15" in str(exc_info.value)
# Above maximum
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{**base_data, "duration_minutes": 481})
assert "less than or equal to 480" in str(exc_info.value)
def test_custom_validators(self):
"""Test complex custom validation logic."""
base_data = {
"title": "Team Standup",
"duration_minutes": 30,
"proposed_time": datetime.now() + timedelta(days=1),
"meeting_type": "video"
}
# Test invalid email format
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{
**base_data,
"attendees": ["valid@example.com", "invalid-email"]
})
assert "Invalid email addresses" in str(exc_info.value)
# Test duplicate attendees
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{
**base_data,
"attendees": ["user@example.com", "user@example.com"]
})
assert "Duplicate attendees" in str(exc_info.value)
# Test past meeting time
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{
**base_data,
"attendees": ["user1@example.com", "user2@example.com"],
"proposed_time": datetime.now() - timedelta(hours=1)
})
assert "must be scheduled in the future" in str(exc_info.value)
def test_model_level_validation(self):
"""Test validation that depends on multiple fields."""
base_data = {
"title": "Long Video Call",
"attendees": ["user1@example.com", "user2@example.com"],
"proposed_time": datetime.now() + timedelta(hours=1)
}
# Video meeting within 2-hour limit - should pass
valid_video = MeetingScheduleRequest(**{
**base_data,
"meeting_type": "video",
"duration_minutes": 120
})
assert valid_video.meeting_type == "video"
# Video meeting exceeding 2-hour limit - should fail
with pytest.raises(ValueError) as exc_info:
MeetingScheduleRequest(**{
**base_data,
"meeting_type": "video",
"duration_minutes": 150
})
assert "should not exceed 2 hours" in str(exc_info.value)
# In-person meeting can exceed 2 hours - should pass
valid_in_person = MeetingScheduleRequest(**{
**base_data,
"meeting_type": "in-person",
"duration_minutes": 240
})
assert valid_in_person.duration_minutes == 240
# Test validation in the context of an agent
async def test_agent_validation_handling():
"""Test how agents handle validation errors."""
scheduling_agent = Agent(
'openai:gpt-4o',
result_type=MeetingScheduleRequest,
system_prompt="You are a meeting scheduler. Extract meeting details from requests."
)
# Create a test scenario where the model returns invalid data
invalid_response = {
"title": "Quick Sync",
"attendees": ["only-one@example.com"], # Too few attendees
"duration_minutes": 30,
"proposed_time": "2024-01-01T10:00:00", # Likely in the past
"meeting_type": "video"
}
test_model = TestModel(response=json.dumps(invalid_response))
with scheduling_agent.override(model=test_model):
with pytest.raises(ValueError) as exc_info:
await scheduling_agent.run("Schedule a quick sync with just me")
# The validation error should bubble up
assert "at least 2 items" in str(exc_info.value)

The key patterns here: test boundaries (15 and 480 minutes, not just 60), test custom validators with intentionally bad data, test cross-field validation (video meetings have different duration limits than in-person), and test that validation errors propagate correctly through the agent.

Testing LangGraph Workflows#

Verifying Nodes in Isolation#

LangGraph workflows are built from node functions, and those nodes are where you start testing. Each node takes a state dict, does some work, and returns a partial state update. Test them as pure functions first.

from langgraph.graph import StateGraph, State
from typing import TypedDict, List, Optional, Dict
import pytest
# Define a complex state structure
class DocumentProcessingState(TypedDict):
document_id: str
content: str
entities: Optional[List[Dict[str, str]]]
sentiment_analysis: Optional[Dict[str, float]]
summary: Optional[str]
processing_errors: List[str]
metadata: Dict[str, any]
# Individual node functions to test
def extract_entities_node(state: DocumentProcessingState) -> Dict:
"""Extract named entities from document content."""
try:
# In production, this would use NLP models
# For testing, we'll simulate the extraction
content = state["content"]
# Simulate entity extraction
entities = []
if "Apple" in content:
entities.append({"text": "Apple", "type": "ORG", "confidence": 0.95})
if "Tim Cook" in content:
entities.append({"text": "Tim Cook", "type": "PERSON", "confidence": 0.98})
if "Cupertino" in content:
entities.append({"text": "Cupertino", "type": "LOC", "confidence": 0.92})
return {
"entities": entities,
"metadata": {**state.get("metadata", {}), "entities_extracted": True}
}
except Exception as e:
return {
"processing_errors": state.get("processing_errors", []) + [f"Entity extraction failed: {str(e)}"]
}
def analyze_sentiment_node(state: DocumentProcessingState) -> Dict:
"""Analyze document sentiment."""
try:
content = state["content"]
# Simulate sentiment analysis
# In reality, this would use a sentiment model
sentiment_keywords = {
"positive": ["excellent", "amazing", "innovative", "breakthrough"],
"negative": ["disappointing", "failed", "problem", "issue"],
"neutral": ["announced", "stated", "reported", "mentioned"]
}
scores = {"positive": 0.0, "negative": 0.0, "neutral": 0.0}
for category, keywords in sentiment_keywords.items():
for keyword in keywords:
if keyword in content.lower():
scores[category] += 0.25
# Normalize scores
total = sum(scores.values()) or 1.0
normalized_scores = {k: v / total for k, v in scores.items()}
return {
"sentiment_analysis": normalized_scores,
"metadata": {**state.get("metadata", {}), "sentiment_analyzed": True}
}
except Exception as e:
return {
"processing_errors": state.get("processing_errors", []) + [f"Sentiment analysis failed: {str(e)}"]
}
# Comprehensive node-level tests
class TestDocumentProcessingNodes:
def test_entity_extraction_success(self):
"""Test successful entity extraction."""
test_state = DocumentProcessingState(
document_id="doc123",
content="Apple CEO Tim Cook announced new products in Cupertino today.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
result = extract_entities_node(test_state)
# Verify entities were extracted
assert "entities" in result
assert len(result["entities"]) == 3
# Check specific entities
entity_texts = {e["text"] for e in result["entities"]}
assert "Apple" in entity_texts
assert "Tim Cook" in entity_texts
assert "Cupertino" in entity_texts
# Verify metadata update
assert result["metadata"]["entities_extracted"] is True
def test_entity_extraction_empty_content(self):
"""Test entity extraction with no recognizable entities."""
test_state = DocumentProcessingState(
document_id="doc124",
content="This is a generic statement with no specific entities.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
result = extract_entities_node(test_state)
assert "entities" in result
assert len(result["entities"]) == 0
assert result["metadata"]["entities_extracted"] is True
def test_sentiment_analysis_mixed(self):
"""Test sentiment analysis with mixed sentiment."""
test_state = DocumentProcessingState(
document_id="doc125",
content="The product launch was amazing but faced some disappointing technical issues.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
result = analyze_sentiment_node(test_state)
assert "sentiment_analysis" in result
scores = result["sentiment_analysis"]
# Should have both positive and negative sentiment
assert scores["positive"] > 0
assert scores["negative"] > 0
# Verify scores sum to 1 (normalized)
assert abs(sum(scores.values()) - 1.0) < 0.001
def test_node_error_handling(self):
"""Test that nodes handle errors gracefully."""
# Create state that will cause an error (missing required field)
test_state = {
"document_id": "doc126",
# Missing 'content' field
"processing_errors": [],
"metadata": {}
}
result = extract_entities_node(test_state)
# Should add error to processing_errors
assert "processing_errors" in result
assert len(result["processing_errors"]) > 0
assert "Entity extraction failed" in result["processing_errors"][0]
# Test node composition and data flow
def test_node_composition():
"""Test that nodes can be composed and data flows correctly."""
initial_state = DocumentProcessingState(
document_id="doc127",
content="Apple's innovative products continue to amaze customers worldwide.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={"source": "test"}
)
# Process through multiple nodes
state = initial_state.copy()
# First node: entity extraction
entity_result = extract_entities_node(state)
state.update(entity_result)
# Second node: sentiment analysis
sentiment_result = analyze_sentiment_node(state)
state.update(sentiment_result)
# Verify cumulative results
assert state["entities"] is not None
assert len(state["entities"]) > 0
assert state["sentiment_analysis"] is not None
assert state["sentiment_analysis"]["positive"] > state["sentiment_analysis"]["negative"]
# Verify metadata accumulation
assert state["metadata"]["entities_extracted"] is True
assert state["metadata"]["sentiment_analyzed"] is True
assert state["metadata"]["source"] == "test" # Original metadata preserved

Notice the error handling test. We pass a state dict missing the content key and verify the node catches the exception and appends to processing_errors instead of crashing. Nodes in a LangGraph workflow must be resilient, because a crash in one node can take down the entire graph execution.

Figure 3: LangGraph Testing Architecture. Node-level tests verify individual functions with controlled inputs. Graph-level tests check full workflow execution from initial to final state. State transition tests validate conditional routing and verify the correct execution paths fire based on state conditions.

Testing Complete Graph Execution#

Once individual nodes pass their tests, you move up the pyramid to graph-level testing. Here we compile an actual LangGraph workflow and run it end-to-end with controlled inputs.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint import MemorySaver
import asyncio
def create_document_processing_workflow():
"""Create a complete document processing workflow."""
builder = StateGraph(DocumentProcessingState)
# Add nodes
builder.add_node("extract_entities", extract_entities_node)
builder.add_node("analyze_sentiment", analyze_sentiment_node)
builder.add_node("generate_summary", generate_summary_node)
builder.add_node("quality_check", quality_check_node)
# Define the flow
builder.add_edge(START, "extract_entities")
builder.add_edge("extract_entities", "analyze_sentiment")
# Conditional edge based on sentiment
def route_after_sentiment(state: DocumentProcessingState) -> str:
if state.get("processing_errors"):
return "quality_check"
sentiment = state.get("sentiment_analysis", {})
# Only generate summary for positive/neutral content
if sentiment.get("negative", 0) > 0.7:
return "quality_check"
return "generate_summary"
builder.add_conditional_edges(
"analyze_sentiment",
route_after_sentiment,
{
"generate_summary": "generate_summary",
"quality_check": "quality_check"
}
)
builder.add_edge("generate_summary", "quality_check")
builder.add_edge("quality_check", END)
return builder.compile()
# Additional nodes for complete workflow
def generate_summary_node(state: DocumentProcessingState) -> Dict:
"""Generate document summary."""
try:
content = state["content"]
entities = state.get("entities", [])
# Simple summary generation (in production, use LLM)
entity_names = [e["text"] for e in entities]
summary = f"Document discusses {', '.join(entity_names)}. " if entity_names else ""
summary += f"Content length: {len(content)} characters."
return {
"summary": summary,
"metadata": {**state.get("metadata", {}), "summary_generated": True}
}
except Exception as e:
return {
"processing_errors": state.get("processing_errors", []) + [f"Summary generation failed: {str(e)}"]
}
def quality_check_node(state: DocumentProcessingState) -> Dict:
"""Perform final quality checks."""
issues = []
# Check for processing errors
if state.get("processing_errors"):
issues.append("Processing errors encountered")
# Check completeness
if not state.get("entities"):
issues.append("No entities extracted")
if not state.get("sentiment_analysis"):
issues.append("No sentiment analysis performed")
# For negative content, flag for review
sentiment = state.get("sentiment_analysis", {})
if sentiment.get("negative", 0) > 0.7:
issues.append("High negative sentiment detected")
return {
"metadata": {
**state.get("metadata", {}),
"quality_check_completed": True,
"quality_issues": issues
}
}
# Comprehensive graph-level tests
class TestDocumentWorkflow:
async def test_complete_positive_flow(self):
"""Test the happy path through the workflow."""
workflow = create_document_processing_workflow()
initial_state = DocumentProcessingState(
document_id="test001",
content="Apple announced groundbreaking innovations at their Cupertino headquarters. CEO Tim Cook expressed excitement about the future.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={"test_case": "positive_flow"}
)
# Execute the workflow
result = await workflow.ainvoke(initial_state)
# Verify all steps completed successfully
assert result["entities"] is not None
assert len(result["entities"]) == 3 # Apple, Cupertino, Tim Cook
assert result["sentiment_analysis"] is not None
assert result["sentiment_analysis"]["positive"] > result["sentiment_analysis"]["negative"]
assert result["summary"] is not None
assert "Apple" in result["summary"]
# Check metadata for execution tracking
assert result["metadata"]["entities_extracted"] is True
assert result["metadata"]["sentiment_analyzed"] is True
assert result["metadata"]["summary_generated"] is True
assert result["metadata"]["quality_check_completed"] is True
assert len(result["metadata"]["quality_issues"]) == 0
async def test_negative_sentiment_routing(self):
"""Test that negative content skips summary generation."""
workflow = create_document_processing_workflow()
initial_state = DocumentProcessingState(
document_id="test002",
content="The product launch was a complete disaster. Multiple critical failures and disappointed customers.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={"test_case": "negative_flow"}
)
result = await workflow.ainvoke(initial_state)
# Should have completed sentiment analysis
assert result["sentiment_analysis"] is not None
assert result["sentiment_analysis"]["negative"] > 0.5
# Should NOT have generated summary due to negative sentiment
assert result["summary"] is None
assert "summary_generated" not in result["metadata"]
# Should have quality issues flagged
assert "High negative sentiment detected" in result["metadata"]["quality_issues"]
async def test_error_propagation(self):
"""Test that errors are properly propagated through the workflow."""
workflow = create_document_processing_workflow()
# Create state that will cause errors
initial_state = DocumentProcessingState(
document_id="test003",
content="", # Empty content should cause issues
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=["Pre-existing error"],
metadata={"test_case": "error_flow"}
)
result = await workflow.ainvoke(initial_state)
# Errors should be accumulated
assert len(result["processing_errors"]) >= 1
assert "Pre-existing error" in result["processing_errors"]
# Quality check should flag the errors
assert "Processing errors encountered" in result["metadata"]["quality_issues"]
# Test with checkpointing for complex workflows
async def test_workflow_checkpointing():
"""Test workflow execution with checkpointing."""
checkpointer = MemorySaver()
workflow = create_document_processing_workflow()
# Compile with checkpointer
app = workflow.compile(checkpointer=checkpointer)
initial_state = DocumentProcessingState(
document_id="test004",
content="Test content for checkpointing workflow.",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
# Execute with thread ID for checkpointing
thread_config = {"configurable": {"thread_id": "test-thread-001"}}
# First execution
result1 = await app.ainvoke(initial_state, config=thread_config)
# Verify checkpoint was saved
saved_state = await checkpointer.aget(thread_config)
assert saved_state is not None
# Simulate resuming from checkpoint
# In a real scenario, this might be after a failure
result2 = await app.ainvoke(None, config=thread_config)
# Results should be consistent
assert result2["document_id"] == result1["document_id"]
assert result2["entities"] == result1["entities"]

The negative sentiment routing test is the critical one. It verifies that when sentiment exceeds the 0.7 negative threshold, the workflow skips summary generation and routes directly to quality check. That conditional branch is exactly the kind of logic that unit tests cannot catch, because it only manifests during full graph execution.

Validating State Transitions and Routing#

State transitions are where LangGraph bugs hide. We built a dedicated test pattern that tracks which nodes execute and in what order, giving us full visibility into the routing logic.

class TestStateTransitions:
def test_conditional_routing_logic(self):
"""Test that conditional edges route correctly based on state."""
# Create a simple workflow with conditional routing
builder = StateGraph(DocumentProcessingState)
# Track which nodes were visited
visited_nodes = []
def track_node(name: str):
def node_func(state):
visited_nodes.append(name)
return state
return node_func
builder.add_node("start", track_node("start"))
builder.add_node("path_a", track_node("path_a"))
builder.add_node("path_b", track_node("path_b"))
builder.add_node("end", track_node("end"))
# Conditional routing based on document length
def route_by_length(state):
if len(state.get("content", "")) > 100:
return "path_a"
return "path_b"
builder.add_edge(START, "start")
builder.add_conditional_edges(
"start",
route_by_length,
{
"path_a": "path_a",
"path_b": "path_b"
}
)
builder.add_edge("path_a", "end")
builder.add_edge("path_b", "end")
builder.add_edge("end", END)
workflow = builder.compile()
# Test path A (long content)
visited_nodes.clear()
long_content_state = DocumentProcessingState(
document_id="test",
content="x" * 200, # Long content
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
workflow.invoke(long_content_state)
assert visited_nodes == ["start", "path_a", "end"]
# Test path B (short content)
visited_nodes.clear()
short_content_state = DocumentProcessingState(
document_id="test",
content="short", # Short content
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
)
workflow.invoke(short_content_state)
assert visited_nodes == ["start", "path_b", "end"]
async def test_parallel_execution_paths(self):
"""Test workflows with parallel execution branches."""
builder = StateGraph(DocumentProcessingState)
execution_times = {}
async def slow_node_a(state):
start = asyncio.get_event_loop().time()
await asyncio.sleep(0.1) # Simulate work
execution_times["node_a"] = asyncio.get_event_loop().time() - start
return {"metadata": {**state.get("metadata", {}), "node_a_completed": True}}
async def slow_node_b(state):
start = asyncio.get_event_loop().time()
await asyncio.sleep(0.1) # Simulate work
execution_times["node_b"] = asyncio.get_event_loop().time() - start
return {"metadata": {**state.get("metadata", {}), "node_b_completed": True}}
def merge_results(state):
# This node runs after both parallel nodes complete
return {
"metadata": {
**state.get("metadata", {}),
"merge_completed": True,
"parallel_execution_verified": True
}
}
# Build workflow with parallel execution
builder.add_node("split", lambda x: x) # Pass-through node
builder.add_node("node_a", slow_node_a)
builder.add_node("node_b", slow_node_b)
builder.add_node("merge", merge_results)
builder.add_edge(START, "split")
builder.add_edge("split", "node_a")
builder.add_edge("split", "node_b")
builder.add_edge("node_a", "merge")
builder.add_edge("node_b", "merge")
builder.add_edge("merge", END)
workflow = builder.compile()
# Execute workflow
start_time = asyncio.get_event_loop().time()
result = await workflow.ainvoke(DocumentProcessingState(
document_id="parallel_test",
content="test",
entities=None,
sentiment_analysis=None,
summary=None,
processing_errors=[],
metadata={}
))
total_time = asyncio.get_event_loop().time() - start_time
# Verify both nodes executed
assert result["metadata"]["node_a_completed"] is True
assert result["metadata"]["node_b_completed"] is True
assert result["metadata"]["merge_completed"] is True
# Verify parallel execution (total time should be ~0.1s, not ~0.2s)
assert total_time < 0.15 # Allow some overhead
assert execution_times["node_a"] >= 0.1
assert execution_times["node_b"] >= 0.1

The track_node pattern is one of our most reused test utilities. Wrap each node in a tracker, run the workflow, then assert on the exact sequence. The parallel execution test goes further: it verifies that LangGraph actually runs the branches concurrently by checking that total wall-clock time is roughly equal to one branch, not both added together.

KEY INSIGHT: Build “node visitor” tracking into your LangGraph tests. Asserting on the exact sequence of visited nodes catches routing bugs that would be invisible if you only checked final state.

Bringing It All Together: End-to-End Testing#

Full Pipeline Verification#

End-to-end tests wire together everything: Pydantic AI agents, LangGraph workflows, mocked external services, and validation logic. These tests verify that the entire pipeline behaves correctly from input to output.

from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
from langgraph.graph import StateGraph
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from unittest.mock import MagicMock, AsyncMock
import json
# Define our domain models
class NewsArticle(BaseModel):
"""Structured news article data."""
headline: str = Field(..., min_length=10, max_length=200)
content: str = Field(..., min_length=50)
category: str = Field(..., pattern="^(tech|business|health|sports)$")
entities: List[str] = Field(default_factory=list)
sentiment_score: float = Field(default=0.0, ge=-1, le=1)
class AnalysisResult(BaseModel):
"""Result of article analysis."""
article_id: str
key_insights: List[str] = Field(..., min_items=1, max_items=5)
recommended_actions: List[str] = Field(default_factory=list)
risk_level: str = Field(default="low", pattern="^(low|medium|high)$")
# Create specialized agents
content_analyzer = Agent(
'openai:gpt-4o',
result_type=AnalysisResult,
system_prompt="Analyze news articles for key insights and actionable recommendations."
)
# Define workflow state
class NewsAnalysisState(TypedDict):
article: NewsArticle
raw_content: str
analysis: Optional[AnalysisResult]
external_data: Optional[Dict]
notifications_sent: List[str]
# Mock external services
class ExternalServices:
def __init__(self):
self.database = AsyncMock()
self.notification_service = AsyncMock()
self.enrichment_api = AsyncMock()
# Comprehensive end-to-end test
class TestNewsAnalysisSystem:
async def test_complete_article_processing_flow(self):
"""Test the entire article processing pipeline end-to-end."""
# Set up mocks
services = ExternalServices()
# Mock database responses
services.database.fetch_article.return_value = {
"id": "article-123",
"raw_content": "Apple announced record profits today. CEO Tim Cook stated that innovation continues to drive growth. The tech giant's stock rose 5% in after-hours trading.",
"metadata": {"source": "Reuters", "timestamp": "2024-01-15T10:00:00Z"}
}
# Mock enrichment API
services.enrichment_api.enrich.return_value = {
"related_articles": ["article-120", "article-121"],
"market_data": {"AAPL": {"change": "+5%", "volume": "high"}}
}
# Create workflow with mocked services
def create_workflow(services):
builder = StateGraph(NewsAnalysisState)
async def fetch_article(state):
# Fetch from database
article_data = await services.database.fetch_article("article-123")
return {"raw_content": article_data["raw_content"]}
async def parse_article(state):
# In real system, this would use an LLM
# For testing, we create structured data
article = NewsArticle(
headline="Apple Reports Record Profits",
content=state["raw_content"],
category="tech",
entities=["Apple", "Tim Cook"],
sentiment_score=0.8
)
return {"article": article}
async def enrich_data(state):
# Call external enrichment service
enrichment = await services.enrichment_api.enrich(
entities=state["article"].entities
)
return {"external_data": enrichment}
async def analyze_article(state):
# Use our Pydantic AI agent
with content_analyzer.override(model=TestModel()):
# Simulate agent response
analysis = AnalysisResult(
article_id="article-123",
key_insights=[
"Apple shows strong financial performance",
"Positive market reaction with 5% stock increase",
"Leadership emphasizes continued innovation"
],
recommended_actions=[
"Monitor competitor responses",
"Track sustained stock performance"
],
risk_level="low"
)
return {"analysis": analysis}
async def send_notifications(state):
# Send notifications based on analysis
notifications = []
if state["analysis"].risk_level == "high":
await services.notification_service.send_alert(
"High risk article detected",
state["article"].headline
)
notifications.append("risk_alert")
if state["article"].sentiment_score < -0.5:
await services.notification_service.send_alert(
"Negative sentiment detected",
state["article"].headline
)
notifications.append("sentiment_alert")
return {"notifications_sent": notifications}
# Build the workflow
builder.add_node("fetch", fetch_article)
builder.add_node("parse", parse_article)
builder.add_node("enrich", enrich_data)
builder.add_node("analyze", analyze_article)
builder.add_node("notify", send_notifications)
# Define flow
builder.add_edge(START, "fetch")
builder.add_edge("fetch", "parse")
builder.add_edge("parse", "enrich")
builder.add_edge("enrich", "analyze")
builder.add_edge("analyze", "notify")
builder.add_edge("notify", END)
return builder.compile()
# Execute the workflow
workflow = create_workflow(services)
initial_state = NewsAnalysisState(
article=None,
raw_content="",
analysis=None,
external_data=None,
notifications_sent=[]
)
result = await workflow.ainvoke(initial_state)
# Comprehensive assertions
# 1. Verify article was fetched and parsed
assert result["article"] is not None
assert result["article"].headline == "Apple Reports Record Profits"
assert result["article"].category == "tech"
assert len(result["article"].entities) == 2
# 2. Verify enrichment occurred
assert result["external_data"] is not None
assert "related_articles" in result["external_data"]
assert "market_data" in result["external_data"]
# 3. Verify analysis was performed
assert result["analysis"] is not None
assert len(result["analysis"].key_insights) == 3
assert result["analysis"].risk_level == "low"
# 4. Verify service interactions
services.database.fetch_article.assert_called_once_with("article-123")
services.enrichment_api.enrich.assert_called_once()
# 5. Verify notifications (none sent for positive low-risk article)
assert len(result["notifications_sent"]) == 0
services.notification_service.send_alert.assert_not_called()
async def test_error_handling_and_recovery(self):
"""Test system behavior when components fail."""
services = ExternalServices()
# Configure enrichment API to fail
services.enrichment_api.enrich.side_effect = Exception("API timeout")
# Create workflow with error handling
def create_resilient_workflow(services):
builder = StateGraph(NewsAnalysisState)
async def enrich_with_fallback(state):
try:
enrichment = await services.enrichment_api.enrich(
entities=state["article"].entities
)
return {"external_data": enrichment}
except Exception as e:
# Fallback to basic data
return {
"external_data": {
"error": str(e),
"fallback": True,
"related_articles": []
}
}
# ... (other nodes remain the same)
builder.add_node("enrich", enrich_with_fallback)
# ... (build rest of workflow)
return builder.compile()
workflow = create_resilient_workflow(services)
result = await workflow.ainvoke(initial_state)
# Verify graceful degradation
assert result["external_data"] is not None
assert result["external_data"]["fallback"] is True
assert "error" in result["external_data"]
# Verify workflow continued despite enrichment failure
assert result["analysis"] is not None

The error recovery test is particularly important. When the enrichment API times out, the workflow should degrade gracefully, filling in fallback data and continuing rather than crashing. We check both that the fallback was used and that downstream nodes still executed successfully.

CI/CD Pipeline for Agent Testing#

Running these tests consistently requires a CI pipeline that separates fast deterministic tests from slow LLM-dependent ones. Here is the GitHub Actions configuration we use:

.github/workflows/agent-testing.yml
name: Langgraph + Pydantic AI Test Suite
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
schedule:
# Run nightly tests with live LLMs
- cron: '0 2 * * *'
jobs:
unit-tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache dependencies
uses: actions/cache@v3
with:
path: |
~/.cache/pip
.venv
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-asyncio pytest-cov pytest-timeout
- name: Run unit tests with coverage
run: |
pytest tests/unit/ -v --cov=src --cov-report=xml --timeout=30
env:
TESTING: "true"
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
integration-tests:
runs-on: ubuntu-latest
needs: unit-tests
services:
redis:
image: redis:alpine
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Run integration tests
run: |
pytest tests/integration/ -v --timeout=60
env:
REDIS_URL: redis://localhost:6379
USE_TEST_MODELS: "true"
workflow-tests:
runs-on: ubuntu-latest
needs: integration-tests
steps:
- uses: actions/checkout@v3
- name: Run workflow tests
run: |
pytest tests/workflows/ -v -m "not slow" --timeout=120
nightly-llm-tests:
if: github.event_name == 'schedule'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run tests with real LLMs
run: |
pytest tests/e2e/ -v -m "llm_required" --timeout=300
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
RUN_EXPENSIVE_TESTS: "true"
performance-benchmarks:
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Run performance benchmarks
run: |
python -m pytest tests/benchmarks/ -v --benchmark-only
- name: Store benchmark results
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: output.json
github-token: ${{ secrets.GITHUB_TOKEN }}
auto-push: true

The pipeline has five layers. Unit tests run on every push across 3 Python versions with a 30-second timeout. Integration tests spin up Redis and use TestModel. Workflow tests exercise full LangGraph graphs. Nightly tests hit real LLMs to catch model behavior changes. Performance benchmarks run on main branch pushes and track regressions over time.

Building an Evaluation Framework#

Beyond pass/fail testing, AI agents need quality evaluation. We built a lightweight evaluation framework that scores agent responses across multiple dimensions: required entities, forbidden phrases, and flexible output matching.

from typing import List, Dict, Tuple
from dataclasses import dataclass
from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
import numpy as np
@dataclass
class EvaluationCase:
"""Single test case for agent evaluation."""
input_text: str
expected_outputs: List[str] # Multiple acceptable outputs
required_entities: List[str] # Entities that must be detected
forbidden_phrases: List[str] # Phrases that shouldn't appear
min_quality_score: float = 0.8
@dataclass
class EvaluationResult:
"""Results from evaluating an agent."""
total_cases: int
passed_cases: int
failed_cases: List[Tuple[str, str]] # (input, reason)
accuracy: float
average_quality_score: float
performance_metrics: Dict[str, float]
class AgentEvaluator:
"""Comprehensive evaluation framework for AI agents."""
def __init__(self, agent: Agent, test_cases: List[EvaluationCase]):
self.agent = agent
self.test_cases = test_cases
async def evaluate(self, use_test_model: bool = True) -> EvaluationResult:
"""Run comprehensive evaluation of the agent."""
passed = 0
failed_cases = []
quality_scores = []
# Override with TestModel for consistent evaluation
if use_test_model:
context_manager = self.agent.override(model=TestModel())
else:
context_manager = nullcontext() # No override
with context_manager:
for case in self.test_cases:
try:
# Run agent
result = await self.agent.run(case.input_text)
# Evaluate result
evaluation = self._evaluate_single_case(result.data, case)
if evaluation["passed"]:
passed += 1
quality_scores.append(evaluation["quality_score"])
else:
failed_cases.append((case.input_text, evaluation["reason"]))
except Exception as e:
failed_cases.append((case.input_text, f"Exception: {str(e)}"))
return EvaluationResult(
total_cases=len(self.test_cases),
passed_cases=passed,
failed_cases=failed_cases,
accuracy=passed / len(self.test_cases),
average_quality_score=np.mean(quality_scores) if quality_scores else 0.0,
performance_metrics=self._calculate_performance_metrics()
)
def _evaluate_single_case(self, output: str, case: EvaluationCase) -> Dict:
"""Evaluate a single test case result."""
reasons = []
quality_score = 1.0
# Check for required entities
missing_entities = []
for entity in case.required_entities:
if entity.lower() not in output.lower():
missing_entities.append(entity)
quality_score -= 0.1
if missing_entities:
reasons.append(f"Missing entities: {missing_entities}")
# Check for forbidden phrases
found_forbidden = []
for phrase in case.forbidden_phrases:
if phrase.lower() in output.lower():
found_forbidden.append(phrase)
quality_score -= 0.2
if found_forbidden:
reasons.append(f"Contains forbidden phrases: {found_forbidden}")
# Check if output matches any expected outputs
matched_expected = False
for expected in case.expected_outputs:
# Flexible matching - could be substring, semantic similarity, etc.
if self._flexible_match(output, expected):
matched_expected = True
break
if not matched_expected and case.expected_outputs:
reasons.append("Output doesn't match expected patterns")
quality_score -= 0.3
# Ensure quality score is within bounds
quality_score = max(0.0, min(1.0, quality_score))
return {
"passed": quality_score >= case.min_quality_score and not reasons,
"quality_score": quality_score,
"reason": "; ".join(reasons) if reasons else "Passed"
}
def _flexible_match(self, output: str, expected: str) -> bool:
"""Flexible matching that handles variations."""
# Simple implementation - in practice, use semantic similarity
output_lower = output.lower().strip()
expected_lower = expected.lower().strip()
# Exact match
if output_lower == expected_lower:
return True
# Substring match
if expected_lower in output_lower:
return True
# Key phrases match (80% of words present)
expected_words = set(expected_lower.split())
output_words = set(output_lower.split())
overlap = len(expected_words.intersection(output_words))
return overlap / len(expected_words) >= 0.8 if expected_words else False
def _calculate_performance_metrics(self) -> Dict[str, float]:
"""Calculate additional performance metrics."""
# In a real implementation, track timing, token usage, etc.
return {
"avg_response_time": 0.1, # seconds
"avg_tokens_used": 150,
"error_rate": 0.02
}
# Example usage
async def evaluate_customer_service_agent():
"""Evaluate a customer service agent comprehensively."""
# Define evaluation cases
test_cases = [
EvaluationCase(
input_text="My order #12345 hasn't arrived and it's been 2 weeks!",
expected_outputs=[
"I sincerely apologize for the delay with order #12345",
"I'm sorry to hear about the delay with your order #12345"
],
required_entities=["#12345", "apologize"],
forbidden_phrases=["calm down", "not my problem"],
min_quality_score=0.8
),
EvaluationCase(
input_text="How do I return a defective product?",
expected_outputs=[
"To return a defective product, please follow these steps",
"I'll help you with the return process for your defective product"
],
required_entities=["return", "defective"],
forbidden_phrases=["figure it out yourself", "too bad"],
min_quality_score=0.85
),
# ... more test cases
]
# Create agent
agent = Agent(
'openai:gpt-4o',
system_prompt="You are a helpful customer service representative."
)
# Run evaluation
evaluator = AgentEvaluator(agent, test_cases)
results = await evaluator.evaluate(use_test_model=True)
# Report results
print(f"Evaluation Results:")
print(f"- Accuracy: {results.accuracy:.1%}")
print(f"- Average Quality: {results.average_quality_score:.2f}")
print(f"- Failed Cases: {len(results.failed_cases)}")
for input_text, reason in results.failed_cases[:3]: # Show first 3 failures
print(f" - Input: '{input_text[:50]}...'")
print(f" Reason: {reason}")

Performance Testing Under Load#

Measuring What Matters in Production#

Performance testing for AI agent systems goes beyond simple response time. You need to track latency percentiles, throughput under concurrency, per-node execution time within LangGraph workflows, and resource consumption over sustained load.

Figure 4: Performance Testing Framework. Test queries of varying complexity feed into the test executor, which runs the agent system and collects raw data. The analysis covers four dimensions: latency metrics (avg, p50, p95, p99), throughput (requests per second), resource usage (memory and CPU), and per-node execution times. Bottleneck identification drives optimization priorities.

Here is the benchmarking framework we use:

import asyncio
import time
import psutil
import statistics
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
from datetime import datetime
@dataclass
class PerformanceMetrics:
"""Comprehensive performance metrics for agent execution."""
request_id: str
start_time: float
end_time: float
total_duration: float
node_timings: Dict[str, float] = field(default_factory=dict)
memory_usage_mb: float = 0.0
cpu_usage_percent: float = 0.0
tokens_used: int = 0
error_occurred: bool = False
error_message: Optional[str] = None
@dataclass
class BenchmarkResult:
"""Aggregated benchmark results."""
total_requests: int
successful_requests: int
failed_requests: int
avg_latency_ms: float
p50_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
throughput_rps: float
avg_memory_mb: float
peak_memory_mb: float
avg_cpu_percent: float
node_performance: Dict[str, Dict[str, float]]
class PerformanceBenchmark:
"""Comprehensive performance benchmarking for Langgraph + Pydantic AI systems."""
def __init__(self, agent_system, test_data: List[Dict]):
self.agent_system = agent_system
self.test_data = test_data
self.metrics: List[PerformanceMetrics] = []
async def run_benchmark(
self,
duration_seconds: int = 60,
concurrent_requests: int = 10,
warmup_requests: int = 5
) -> BenchmarkResult:
"""Run a comprehensive performance benchmark."""
# Warmup phase
print(f"Running {warmup_requests} warmup requests...")
for i in range(warmup_requests):
await self._execute_single_request(f"warmup-{i}", self.test_data[0])
# Clear warmup metrics
self.metrics.clear()
# Main benchmark
print(f"Running benchmark for {duration_seconds} seconds with {concurrent_requests} concurrent requests...")
start_time = time.time()
end_time = start_time + duration_seconds
request_count = 0
# Create a pool of requests
async def request_worker(worker_id: int):
nonlocal request_count
while time.time() < end_time:
test_case = self.test_data[request_count % len(self.test_data)]
request_id = f"req-{worker_id}-{request_count}"
request_count += 1
await self._execute_single_request(request_id, test_case)
# Run concurrent workers
workers = [request_worker(i) for i in range(concurrent_requests)]
await asyncio.gather(*workers)
# Calculate results
return self._calculate_results(time.time() - start_time)
async def _execute_single_request(self, request_id: str, test_input: Dict) -> PerformanceMetrics:
"""Execute a single request and collect metrics."""
# Initialize metrics
metrics = PerformanceMetrics(
request_id=request_id,
start_time=time.time(),
end_time=0,
total_duration=0
)
# Monitor system resources
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
try:
# Track node execution times if using Langgraph
if hasattr(self.agent_system, '_graph'):
node_timings = {}
# Monkey-patch nodes to track timing
original_nodes = {}
for node_name, node_func in self.agent_system._graph.nodes.items():
original_nodes[node_name] = node_func
async def timed_node(state, _node_name=node_name, _original=node_func):
node_start = time.time()
result = await _original(state) if asyncio.iscoroutinefunction(_original) else _original(state)
node_timings[_node_name] = time.time() - node_start
return result
self.agent_system._graph.nodes[node_name] = timed_node
# Execute request
result = await self.agent_system.ainvoke(test_input)
# Restore original nodes
for node_name, node_func in original_nodes.items():
self.agent_system._graph.nodes[node_name] = node_func
metrics.node_timings = node_timings
else:
# Direct agent execution
result = await self.agent_system.run(test_input['query'])
# Collect resource usage
metrics.memory_usage_mb = process.memory_info().rss / 1024 / 1024 - initial_memory
metrics.cpu_usage_percent = process.cpu_percent(interval=0.1)
# Extract token usage if available
if hasattr(result, 'usage'):
metrics.tokens_used = result.usage.get('total_tokens', 0)
except Exception as e:
metrics.error_occurred = True
metrics.error_message = str(e)
metrics.end_time = time.time()
metrics.total_duration = metrics.end_time - metrics.start_time
self.metrics.append(metrics)
return metrics
def _calculate_results(self, total_duration: float) -> BenchmarkResult:
"""Calculate aggregate benchmark results."""
successful_metrics = [m for m in self.metrics if not m.error_occurred]
failed_count = len([m for m in self.metrics if m.error_occurred])
if not successful_metrics:
raise ValueError("No successful requests to analyze")
# Calculate latency percentiles
latencies = [m.total_duration * 1000 for m in successful_metrics] # Convert to ms
latencies.sort()
# Calculate node performance
node_performance = {}
all_nodes = set()
for m in successful_metrics:
all_nodes.update(m.node_timings.keys())
for node in all_nodes:
node_times = [m.node_timings.get(node, 0) * 1000 for m in successful_metrics if node in m.node_timings]
if node_times:
node_performance[node] = {
'avg_ms': statistics.mean(node_times),
'p95_ms': node_times[int(len(node_times) * 0.95)],
'percentage': statistics.mean(node_times) / statistics.mean(latencies) * 100
}
return BenchmarkResult(
total_requests=len(self.metrics),
successful_requests=len(successful_metrics),
failed_requests=failed_count,
avg_latency_ms=statistics.mean(latencies),
p50_latency_ms=latencies[int(len(latencies) * 0.50)],
p95_latency_ms=latencies[int(len(latencies) * 0.95)],
p99_latency_ms=latencies[int(len(latencies) * 0.99)],
throughput_rps=len(successful_metrics) / total_duration,
avg_memory_mb=statistics.mean([m.memory_usage_mb for m in successful_metrics]),
peak_memory_mb=max([m.memory_usage_mb for m in successful_metrics]),
avg_cpu_percent=statistics.mean([m.cpu_usage_percent for m in successful_metrics]),
node_performance=node_performance
)
def generate_report(self, result: BenchmarkResult, output_file: str = "benchmark_report.png"):
"""Generate a visual performance report."""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Latency distribution
latencies = [m.total_duration * 1000 for m in self.metrics if not m.error_occurred]
ax1.hist(latencies, bins=50, alpha=0.7, color='blue', edgecolor='black')
ax1.axvline(result.p50_latency_ms, color='red', linestyle='--', label=f'P50: {result.p50_latency_ms:.1f}ms')
ax1.axvline(result.p95_latency_ms, color='orange', linestyle='--', label=f'P95: {result.p95_latency_ms:.1f}ms')
ax1.set_xlabel('Latency (ms)')
ax1.set_ylabel('Frequency')
ax1.set_title('Latency Distribution')
ax1.legend()
# Throughput over time
time_buckets = {}
for m in self.metrics:
bucket = int(m.start_time - self.metrics[0].start_time)
time_buckets[bucket] = time_buckets.get(bucket, 0) + 1
times = sorted(time_buckets.keys())
throughputs = [time_buckets[t] for t in times]
ax2.plot(times, throughputs, marker='o')
ax2.set_xlabel('Time (seconds)')
ax2.set_ylabel('Requests per second')
ax2.set_title('Throughput Over Time')
ax2.grid(True, alpha=0.3)
# Node performance breakdown
if result.node_performance:
nodes = list(result.node_performance.keys())
avg_times = [result.node_performance[n]['avg_ms'] for n in nodes]
ax3.barh(nodes, avg_times, color='green', alpha=0.7)
ax3.set_xlabel('Average Time (ms)')
ax3.set_title('Node Performance Breakdown')
ax3.grid(True, alpha=0.3)
# Resource usage
memory_usage = [m.memory_usage_mb for m in self.metrics if not m.error_occurred]
cpu_usage = [m.cpu_usage_percent for m in self.metrics if not m.error_occurred]
ax4_twin = ax4.twinx()
ax4.plot(range(len(memory_usage)), memory_usage, 'b-', label='Memory (MB)')
ax4_twin.plot(range(len(cpu_usage)), cpu_usage, 'r-', label='CPU (%)')
ax4.set_xlabel('Request Number')
ax4.set_ylabel('Memory (MB)', color='b')
ax4_twin.set_ylabel('CPU (%)', color='r')
ax4.set_title('Resource Usage')
ax4.tick_params(axis='y', labelcolor='b')
ax4_twin.tick_params(axis='y', labelcolor='r')
plt.tight_layout()
plt.savefig(output_file)
plt.close()
# Print summary
print("\n" + "="*50)
print("PERFORMANCE BENCHMARK RESULTS")
print("="*50)
print(f"Total Requests: {result.total_requests}")
print(f"Successful: {result.successful_requests} ({result.successful_requests/result.total_requests*100:.1f}%)")
print(f"Failed: {result.failed_requests}")
print(f"\nLatency Metrics:")
print(f" Average: {result.avg_latency_ms:.1f}ms")
print(f" P50: {result.p50_latency_ms:.1f}ms")
print(f" P95: {result.p95_latency_ms:.1f}ms")
print(f" P99: {result.p99_latency_ms:.1f}ms")
print(f"\nThroughput: {result.throughput_rps:.1f} requests/second")
print(f"\nResource Usage:")
print(f" Avg Memory: {result.avg_memory_mb:.1f}MB")
print(f" Peak Memory: {result.peak_memory_mb:.1f}MB")
print(f" Avg CPU: {result.avg_cpu_percent:.1f}%")
if result.node_performance:
print(f"\nNode Performance:")
for node, perf in sorted(result.node_performance.items(), key=lambda x: x[1]['avg_ms'], reverse=True):
print(f" {node}: {perf['avg_ms']:.1f}ms avg, {perf['p95_ms']:.1f}ms p95 ({perf['percentage']:.1f}% of total)")
# Example usage
async def benchmark_document_processing_system():
"""Benchmark a complete document processing system."""
# Create test data
test_documents = [
{
"document_id": f"doc-{i}",
"content": f"Sample document {i} with various content..." * 50,
"processing_options": {
"extract_entities": True,
"analyze_sentiment": True,
"generate_summary": i % 2 == 0 # Only half generate summaries
}
}
for i in range(10)
]
# Create your agent system (workflow or agent)
document_processor = create_document_processing_workflow()
# Run benchmark
benchmark = PerformanceBenchmark(document_processor, test_documents)
result = await benchmark.run_benchmark(
duration_seconds=60,
concurrent_requests=5,
warmup_requests=10
)
# Generate report
benchmark.generate_report(result)

The node-level timing breakdown is the most actionable part of this framework. When your p95 latency spikes, you can immediately see which node is the bottleneck. In our system, we discovered that the entity extraction node was taking 4x longer than expected because of an unoptimized regex, something we never would have found with aggregate timing alone.

Strategies That Hold Up in Production#

Eight Practices We Learned the Hard Way#

After building and testing multiple LangGraph + Pydantic AI systems, here are the practices that made the biggest difference.

Isolate LLM dependencies. Always use TestModel or FunctionModel in your test suite. We save real LLM calls for nightly evaluation runs. Our unit tests went from 12 minutes and $3 per run to 40 seconds and $0.

Test at every pyramid level. Unit tests catch logic errors in seconds. Integration tests verify component handoffs. Workflow tests expose routing bugs. E2E tests confirm the user experience. Skip any layer and bugs slip through.

Validate state transitions explicitly. In LangGraph, the conditional edges are where the logic lives and where the bugs hide. Use the node-visitor tracking pattern to assert on exact execution paths, not just final outputs.

Test both sides of every validation boundary. With Pydantic AI, test that valid data passes and that invalid data fails with the correct error message. Our meeting scheduler accepted solo meetings for three weeks because we never tested the minimum attendees constraint.

Mock external services with dependency injection. Pass services as parameters to your workflow factory function. In tests, inject AsyncMock instances. In production, inject real clients. No monkeypatching, no test pollution.

Separate test tiers in CI. Fast tests on every push, integration tests on PR, workflow tests on merge, LLM tests on a nightly schedule. Developers get feedback in seconds, not minutes.

Establish performance baselines early. Run benchmarks from week one. A 50ms regression in a hot path compounds into seconds at scale. Track p95 and p99, not just averages, because tail latency is where users feel pain.

Test error handling as thoroughly as happy paths. AI systems fail in unique ways: API timeouts, rate limits, malformed model outputs, hallucinated tool calls. Every failure mode needs a test that verifies graceful degradation, not just crash-free execution.

KEY INSIGHT: The highest-value tests for AI agent systems are workflow-level tests that verify conditional routing and state transitions. Unit tests prove components work. Workflow tests prove the system works.

What Comes Next#

The AI agent testing landscape is moving fast. Automated test generation using LLMs to write test cases for other LLMs is already practical. Adversarial testing frameworks that probe agents for prompt injection vulnerabilities and edge-case failures are maturing. Semantic verification, where test assertions check meaning rather than string equality, is replacing brittle exact-match patterns.

The biggest shift ahead is distributed testing for multi-agent systems. As agent architectures scale across services, we will need chaos engineering approaches adapted for AI, injecting failures, latency, and malformed responses to verify that agent orchestration degrades gracefully under real-world conditions.

The fundamentals, though, stay the same. Test at every level. Isolate what you can control. Verify the paths your system actually takes. Measure what matters in production. If you build your testing strategy on those principles, you will have agent systems that are reliable enough to trust with real users and real stakes.

References#

[1] Pydantic AI Documentation, “Testing and Evaluation Guide”, https://ai.pydantic.dev/testing-evals/ (2025)

[2] Langgraph Documentation, “Testing Workflows and State Management”, https://langchain-ai.github.io/langgraph/tutorials/testing/ (2025)

[3] Harrison Chase, “Best Practices for Testing LangChain Applications”, LangChain Blog (2024)

[4] Samuel Colvin, “Pydantic V2 Validation Strategies”, https://docs.pydantic.dev/latest/concepts/validators/ (2024)

[5] OpenAI, “Best Practices for Testing LLM Applications”, https://platform.openai.com/docs/guides/testing (2024)

[6] Mitchell Hashimoto, “Testing Strategies for Non-Deterministic Systems”, HashiCorp Blog (2023)

[7] Martin Fowler, “Testing Strategies in a Microservice Architecture”, https://martinfowler.com/articles/microservice-testing/ (2024)

Advanced Testing Strategies for LangGraph and Pydantic AI Agent Systems
https://dotzlaw.com/insights/advanced-testing-strategies-for-langgraph-and-pydantic-ai-agent-systems/
Author
Gary Dotzlaw
Published at
2025-06-14
License
CC BY-NC-SA 4.0
← Back to Insights