We shipped an AI agent that passed every unit test in the suite. Green across the board. Then it hit production and started routing customer complaints to the sales team, hallucinating product names that didn’t exist, and occasionally responding in French. The tests never caught any of it because we were testing the wrong things at the wrong level. After rebuilding our entire test strategy around LangGraph’s state machine model and Pydantic AI’s validation primitives, we cut production incidents by 90% and dropped our test execution time from 12 minutes to 40 seconds.
That experience taught us a hard lesson. Traditional testing assumes deterministic inputs and outputs. AI agents laugh at that assumption. The same prompt can produce different responses across runs. State mutates as it flows through graph nodes. External LLM calls fail in ways you can’t predict. You need a testing strategy built specifically for this kind of system, one that accounts for probabilistic behavior, validates state transitions, and catches the subtle orchestration bugs that only surface three nodes deep in a workflow.
The good news: LangGraph and Pydantic AI give you the tools to make this tractable. LangGraph’s explicit state graphs turn implicit agent behavior into something you can inspect and verify at every step. Pydantic AI’s TestModel and FunctionModel let you rip out the LLM entirely and test your logic in milliseconds. Together, they turn “hope it works” into “prove it works.”
Why AI Agent Testing Breaks Traditional Approaches
Five Layers of Complexity You Didn’t Ask For
When we first tried to test our LangGraph agents with standard pytest patterns, we kept running into the same wall. The tests passed, but the system still broke. Here is why.
Stateful complexity is the biggest culprit. LangGraph maintains state as information flows through nodes, and that state mutates at each step. A bug in node two might not manifest until node five, when the corrupted state triggers an unexpected conditional branch. You cannot catch this with isolated unit tests.
LLM non-determinism compounds the problem. Even with temperature set to 0, language models produce slightly different outputs for the same input. Your tests need to verify the intent and structure of responses, not exact string matches.
Validation boundaries in Pydantic AI double your test surface. You need to verify that valid data passes through and that invalid data gets properly rejected with the right error messages. Testing only the happy path is testing half the system.
Tool integration introduces external dependencies that break test isolation. When your agent calls APIs, queries databases, or hits other services, you need strategies that test the integration logic without creating brittle or expensive external calls.
Orchestration logic demands testing at multiple levels. Individual nodes might work perfectly in isolation but fail when composed into a graph, because the routing conditions, state merges, or error propagation paths contain bugs that only appear during full execution.
The Testing Pyramid for AI Agents
To catch bugs at every level, we need a layered strategy where each tier builds on the one below.

Figure 1: AI Agent Testing Pyramid. Unit tests form the base, verifying individual node functions and validation rules. Integration tests check component interactions. Workflow tests validate graph execution and state transitions. End-to-end tests at the top verify complete system behavior. Each layer catches different failure modes.
Unit tests verify individual components: node functions, validation rules, tool interfaces. They run in milliseconds and catch basic logic errors.
Integration tests validate interactions between components. Can a Pydantic AI agent call its tools correctly? Does a state update from one LangGraph node arrive intact at the next? These catch the subtle “components don’t quite fit together” bugs.
Workflow tests exercise complete LangGraph graphs. They validate state transitions, conditional routing, and error propagation across multiple nodes. These are where orchestration bugs surface.
End-to-end tests confirm the system works from the user’s perspective. They simulate full conversations and verify that all pieces deliver the expected result together.
You need all four layers. Unit tests alone miss orchestration bugs. End-to-end tests alone make it impossible to pinpoint which component failed.
KEY INSIGHT: Test AI agents at every layer of the pyramid. Unit tests that pass tell you components work in isolation. Only workflow and E2E tests tell you the system actually works.
Testing Pydantic AI Components
Unit Testing with TestModel
Pydantic AI’s TestModel changed how we write agent tests. Instead of making expensive, non-deterministic LLM calls during testing, TestModel provides predictable responses. Tests run in milliseconds, and they actually test your logic instead of the LLM’s mood that day.
from pydantic_ai import Agentfrom pydantic_ai.models.test import TestModelfrom pydantic import BaseModel, Fieldfrom typing import List, Optionalimport pytest
# First, let's define a structured output modelclass CustomerAnalysis(BaseModel): """Structured analysis of customer sentiment and needs.""" sentiment: float = Field(..., ge=-1, le=1, description="Sentiment score from -1 to 1") primary_issue: str = Field(..., description="Main customer concern") urgency_level: int = Field(..., ge=1, le=5, description="Urgency from 1-5") recommended_actions: List[str] = Field(..., min_items=1, max_items=3)
# Create our agent with structured outputcustomer_agent = Agent( 'openai:gpt-4o', result_type=CustomerAnalysis, system_prompt="You are a customer service analyst. Analyze customer messages for sentiment, issues, and recommended actions.")
# Now let's write comprehensive testsclass TestCustomerAgent: def test_basic_analysis(self): """Test that agent produces valid structured output.""" # TestModel returns a simple response by default with customer_agent.override(model=TestModel()): result = customer_agent.run_sync("I'm frustrated with the slow shipping")
# The result is guaranteed to be a CustomerAnalysis instance assert isinstance(result.data, CustomerAnalysis) assert -1 <= result.data.sentiment <= 1 assert 1 <= result.data.urgency_level <= 5 assert len(result.data.recommended_actions) >= 1
def test_custom_responses(self): """Test agent with specific mock responses.""" # Create a TestModel with a specific response mock_response = CustomerAnalysis( sentiment=-0.8, primary_issue="Shipping delays", urgency_level=4, recommended_actions=["Expedite shipping", "Offer compensation", "Follow up"] )
# TestModel can return structured data directly test_model = TestModel(response=mock_response.model_dump_json())
with customer_agent.override(model=test_model): result = customer_agent.run_sync("My order is 2 weeks late!")
# Verify we got our expected response assert result.data.sentiment == -0.8 assert result.data.primary_issue == "Shipping delays" assert "Expedite shipping" in result.data.recommended_actions
def test_error_handling(self): """Test that agent handles errors gracefully.""" # TestModel can simulate errors too error_model = TestModel(response=Exception("API Error"))
with customer_agent.override(model=error_model): with pytest.raises(Exception) as exc_info: customer_agent.run_sync("Test message")
assert "API Error" in str(exc_info.value)The pattern here is straightforward: override the model, run your agent, assert on the structured output. You are testing how your agent handles responses, validates data, and deals with errors, all without touching an external LLM.
Controlling Agent Behavior with FunctionModel
When you need finer-grained control over how your mock agent responds, FunctionModel lets you write custom response logic. We used this to test multi-step tool workflows where the agent’s behavior depends on what happened in previous turns.
from pydantic_ai.models.function import FunctionModel, AgentInfofrom pydantic_ai.messages import ModelMessage, ModelResponse, TextPart, ToolCallPartimport asyncio
# Let's create a more complex agent with toolsagent_with_tools = Agent( 'openai:gpt-4o', system_prompt="You are a helpful assistant with access to various tools.")
@agent_with_tools.toolasync def check_inventory(product_id: str) -> int: """Check inventory levels for a product.""" # In tests, this might return mock data return 42
@agent_with_tools.toolasync def calculate_shipping(weight: float, destination: str) -> float: """Calculate shipping cost.""" return weight * 2.5 # Simplified calculation
# Now create sophisticated test scenariosasync def test_multi_tool_workflow(): """Test complex workflows involving multiple tool calls.""" call_sequence = []
async def custom_model_behavior(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: """Simulate specific model behavior based on conversation state.""" # Track what's been called nonlocal call_sequence
# Get the last user message last_message = messages[-1].content
if "check stock" in last_message.lower(): # First, indicate we'll check inventory call_sequence.append("intent_recognized") return ModelResponse( parts=[ TextPart("I'll check the inventory for you."), ToolCallPart( tool_name="check_inventory", args={"product_id": "PROD-123"} ) ] ) elif any(part.part_kind == "tool-return" for part in messages[-1].parts): # We got tool results back, now calculate shipping call_sequence.append("tool_result_processed") return ModelResponse( parts=[ TextPart("The product is in stock. Let me calculate shipping."), ToolCallPart( tool_name="calculate_shipping", args={"weight": 2.5, "destination": "New York"} ) ] ) else: # Final response after all tools call_sequence.append("final_response") return ModelResponse( parts=[TextPart("Product PROD-123 is in stock (42 units) with shipping cost of $6.25 to New York.")] )
# Run the test with our custom function with agent_with_tools.override(model=FunctionModel(custom_model_behavior)): result = await agent_with_tools.run("Check stock for PROD-123 and shipping to New York")
# Verify the workflow executed correctly assert call_sequence == ["intent_recognized", "tool_result_processed", "final_response"] assert "42 units" in result.data assert "$6.25" in result.data
# Test error recovery in tool executionasync def test_tool_error_recovery(): """Test how agent handles tool failures."""
async def model_with_fallback(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: # Check if we just got a tool error last_message = messages[-1] if hasattr(last_message, 'parts'): for part in last_message.parts: if part.part_kind == "tool-return" and "error" in str(part.content).lower(): # Provide helpful fallback response return ModelResponse( parts=[TextPart("I encountered an error checking inventory, but I can help you place a backorder instead.")] )
# Initial tool call that will fail return ModelResponse( parts=[ToolCallPart(tool_name="check_inventory", args={"product_id": "INVALID"})] )
# Override the tool to simulate failure async def failing_inventory_check(product_id: str) -> int: raise ValueError("Product not found")
agent_with_tools._tools["check_inventory"].func = failing_inventory_check
with agent_with_tools.override(model=FunctionModel(model_with_fallback)): result = await agent_with_tools.run("Check stock for INVALID") assert "backorder" in result.data.lower()With FunctionModel, you control exactly what the agent “thinks” at each step. You can simulate multi-step tool chains, test error recovery paths, and verify conversation flow, all deterministically and all in milliseconds.

Figure 2: Pydantic AI Testing Flow. TestModel provides predetermined responses for fast, simple tests. FunctionModel lets you write custom response logic based on conversation state, enabling sophisticated scenario testing. Both approaches avoid real LLM calls while thoroughly exercising your agent logic.
KEY INSIGHT: Use TestModel for “does the plumbing work” tests and FunctionModel for “does the agent behave correctly across multi-turn interactions” tests. Together they cover the full range of agent behavior without a single LLM call.
Validation Testing at the Boundaries
One of Pydantic AI’s biggest strengths is its validation layer. But we learned the hard way that validation only protects you if you test both sides of every boundary. Our first agent accepted malformed meeting requests for weeks before anyone noticed, because we only tested the happy path.
Here is how we test validation comprehensively now:
from pydantic import BaseModel, Field, validator, model_validatorfrom typing import List, Optional, Dictimport pytestfrom datetime import datetime, timedelta
class MeetingScheduleRequest(BaseModel): """Complex model with multiple validation rules.""" title: str = Field(..., min_length=3, max_length=100) attendees: List[str] = Field(..., min_items=2, max_items=20) duration_minutes: int = Field(..., ge=15, le=480) # 15 min to 8 hours proposed_time: datetime meeting_type: str = Field(..., pattern="^(video|audio|in-person)$")
@validator('attendees') def validate_attendees(cls, v): """Ensure all attendees have valid email format.""" import re email_pattern = re.compile(r'^[\w\.-]+@[\w\.-]+\.\w+$')
invalid_emails = [email for email in v if not email_pattern.match(email)] if invalid_emails: raise ValueError(f"Invalid email addresses: {invalid_emails}")
# Check for duplicates if len(set(v)) != len(v): raise ValueError("Duplicate attendees not allowed")
return v
@validator('proposed_time') def validate_future_time(cls, v): """Ensure meeting is scheduled in the future.""" if v <= datetime.now(): raise ValueError("Meeting must be scheduled in the future")
# Not too far in the future if v > datetime.now() + timedelta(days=365): raise ValueError("Cannot schedule meetings more than a year in advance")
return v
@model_validator(mode='after') def validate_video_meeting_duration(self): """Video meetings shouldn't exceed 2 hours.""" if self.meeting_type == 'video' and self.duration_minutes > 120: raise ValueError("Video meetings should not exceed 2 hours") return self
# Comprehensive validation testsclass TestMeetingValidation: def test_boundary_values(self): """Test validation at the edges of acceptable ranges.""" base_data = { "title": "Test Meeting", "attendees": ["user1@example.com", "user2@example.com"], "proposed_time": datetime.now() + timedelta(hours=1), "meeting_type": "video" }
# Test duration boundaries # Minimum valid duration valid_min = MeetingScheduleRequest(**{**base_data, "duration_minutes": 15}) assert valid_min.duration_minutes == 15
# Maximum valid duration valid_max = MeetingScheduleRequest(**{**base_data, "duration_minutes": 480}) assert valid_max.duration_minutes == 480
# Below minimum with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{**base_data, "duration_minutes": 14}) assert "greater than or equal to 15" in str(exc_info.value)
# Above maximum with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{**base_data, "duration_minutes": 481}) assert "less than or equal to 480" in str(exc_info.value)
def test_custom_validators(self): """Test complex custom validation logic.""" base_data = { "title": "Team Standup", "duration_minutes": 30, "proposed_time": datetime.now() + timedelta(days=1), "meeting_type": "video" }
# Test invalid email format with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{ **base_data, "attendees": ["valid@example.com", "invalid-email"] }) assert "Invalid email addresses" in str(exc_info.value)
# Test duplicate attendees with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{ **base_data, "attendees": ["user@example.com", "user@example.com"] }) assert "Duplicate attendees" in str(exc_info.value)
# Test past meeting time with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{ **base_data, "attendees": ["user1@example.com", "user2@example.com"], "proposed_time": datetime.now() - timedelta(hours=1) }) assert "must be scheduled in the future" in str(exc_info.value)
def test_model_level_validation(self): """Test validation that depends on multiple fields.""" base_data = { "title": "Long Video Call", "attendees": ["user1@example.com", "user2@example.com"], "proposed_time": datetime.now() + timedelta(hours=1) }
# Video meeting within 2-hour limit - should pass valid_video = MeetingScheduleRequest(**{ **base_data, "meeting_type": "video", "duration_minutes": 120 }) assert valid_video.meeting_type == "video"
# Video meeting exceeding 2-hour limit - should fail with pytest.raises(ValueError) as exc_info: MeetingScheduleRequest(**{ **base_data, "meeting_type": "video", "duration_minutes": 150 }) assert "should not exceed 2 hours" in str(exc_info.value)
# In-person meeting can exceed 2 hours - should pass valid_in_person = MeetingScheduleRequest(**{ **base_data, "meeting_type": "in-person", "duration_minutes": 240 }) assert valid_in_person.duration_minutes == 240
# Test validation in the context of an agentasync def test_agent_validation_handling(): """Test how agents handle validation errors."""
scheduling_agent = Agent( 'openai:gpt-4o', result_type=MeetingScheduleRequest, system_prompt="You are a meeting scheduler. Extract meeting details from requests." )
# Create a test scenario where the model returns invalid data invalid_response = { "title": "Quick Sync", "attendees": ["only-one@example.com"], # Too few attendees "duration_minutes": 30, "proposed_time": "2024-01-01T10:00:00", # Likely in the past "meeting_type": "video" }
test_model = TestModel(response=json.dumps(invalid_response))
with scheduling_agent.override(model=test_model): with pytest.raises(ValueError) as exc_info: await scheduling_agent.run("Schedule a quick sync with just me")
# The validation error should bubble up assert "at least 2 items" in str(exc_info.value)The key patterns here: test boundaries (15 and 480 minutes, not just 60), test custom validators with intentionally bad data, test cross-field validation (video meetings have different duration limits than in-person), and test that validation errors propagate correctly through the agent.
Testing LangGraph Workflows
Verifying Nodes in Isolation
LangGraph workflows are built from node functions, and those nodes are where you start testing. Each node takes a state dict, does some work, and returns a partial state update. Test them as pure functions first.
from langgraph.graph import StateGraph, Statefrom typing import TypedDict, List, Optional, Dictimport pytest
# Define a complex state structureclass DocumentProcessingState(TypedDict): document_id: str content: str entities: Optional[List[Dict[str, str]]] sentiment_analysis: Optional[Dict[str, float]] summary: Optional[str] processing_errors: List[str] metadata: Dict[str, any]
# Individual node functions to testdef extract_entities_node(state: DocumentProcessingState) -> Dict: """Extract named entities from document content.""" try: # In production, this would use NLP models # For testing, we'll simulate the extraction content = state["content"]
# Simulate entity extraction entities = [] if "Apple" in content: entities.append({"text": "Apple", "type": "ORG", "confidence": 0.95}) if "Tim Cook" in content: entities.append({"text": "Tim Cook", "type": "PERSON", "confidence": 0.98}) if "Cupertino" in content: entities.append({"text": "Cupertino", "type": "LOC", "confidence": 0.92})
return { "entities": entities, "metadata": {**state.get("metadata", {}), "entities_extracted": True} } except Exception as e: return { "processing_errors": state.get("processing_errors", []) + [f"Entity extraction failed: {str(e)}"] }
def analyze_sentiment_node(state: DocumentProcessingState) -> Dict: """Analyze document sentiment.""" try: content = state["content"]
# Simulate sentiment analysis # In reality, this would use a sentiment model sentiment_keywords = { "positive": ["excellent", "amazing", "innovative", "breakthrough"], "negative": ["disappointing", "failed", "problem", "issue"], "neutral": ["announced", "stated", "reported", "mentioned"] }
scores = {"positive": 0.0, "negative": 0.0, "neutral": 0.0}
for category, keywords in sentiment_keywords.items(): for keyword in keywords: if keyword in content.lower(): scores[category] += 0.25
# Normalize scores total = sum(scores.values()) or 1.0 normalized_scores = {k: v / total for k, v in scores.items()}
return { "sentiment_analysis": normalized_scores, "metadata": {**state.get("metadata", {}), "sentiment_analyzed": True} } except Exception as e: return { "processing_errors": state.get("processing_errors", []) + [f"Sentiment analysis failed: {str(e)}"] }
# Comprehensive node-level testsclass TestDocumentProcessingNodes: def test_entity_extraction_success(self): """Test successful entity extraction.""" test_state = DocumentProcessingState( document_id="doc123", content="Apple CEO Tim Cook announced new products in Cupertino today.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
result = extract_entities_node(test_state)
# Verify entities were extracted assert "entities" in result assert len(result["entities"]) == 3
# Check specific entities entity_texts = {e["text"] for e in result["entities"]} assert "Apple" in entity_texts assert "Tim Cook" in entity_texts assert "Cupertino" in entity_texts
# Verify metadata update assert result["metadata"]["entities_extracted"] is True
def test_entity_extraction_empty_content(self): """Test entity extraction with no recognizable entities.""" test_state = DocumentProcessingState( document_id="doc124", content="This is a generic statement with no specific entities.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
result = extract_entities_node(test_state)
assert "entities" in result assert len(result["entities"]) == 0 assert result["metadata"]["entities_extracted"] is True
def test_sentiment_analysis_mixed(self): """Test sentiment analysis with mixed sentiment.""" test_state = DocumentProcessingState( document_id="doc125", content="The product launch was amazing but faced some disappointing technical issues.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
result = analyze_sentiment_node(test_state)
assert "sentiment_analysis" in result scores = result["sentiment_analysis"]
# Should have both positive and negative sentiment assert scores["positive"] > 0 assert scores["negative"] > 0
# Verify scores sum to 1 (normalized) assert abs(sum(scores.values()) - 1.0) < 0.001
def test_node_error_handling(self): """Test that nodes handle errors gracefully.""" # Create state that will cause an error (missing required field) test_state = { "document_id": "doc126", # Missing 'content' field "processing_errors": [], "metadata": {} }
result = extract_entities_node(test_state)
# Should add error to processing_errors assert "processing_errors" in result assert len(result["processing_errors"]) > 0 assert "Entity extraction failed" in result["processing_errors"][0]
# Test node composition and data flowdef test_node_composition(): """Test that nodes can be composed and data flows correctly.""" initial_state = DocumentProcessingState( document_id="doc127", content="Apple's innovative products continue to amaze customers worldwide.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={"source": "test"} )
# Process through multiple nodes state = initial_state.copy()
# First node: entity extraction entity_result = extract_entities_node(state) state.update(entity_result)
# Second node: sentiment analysis sentiment_result = analyze_sentiment_node(state) state.update(sentiment_result)
# Verify cumulative results assert state["entities"] is not None assert len(state["entities"]) > 0 assert state["sentiment_analysis"] is not None assert state["sentiment_analysis"]["positive"] > state["sentiment_analysis"]["negative"]
# Verify metadata accumulation assert state["metadata"]["entities_extracted"] is True assert state["metadata"]["sentiment_analyzed"] is True assert state["metadata"]["source"] == "test" # Original metadata preservedNotice the error handling test. We pass a state dict missing the content key and verify the node catches the exception and appends to processing_errors instead of crashing. Nodes in a LangGraph workflow must be resilient, because a crash in one node can take down the entire graph execution.

Figure 3: LangGraph Testing Architecture. Node-level tests verify individual functions with controlled inputs. Graph-level tests check full workflow execution from initial to final state. State transition tests validate conditional routing and verify the correct execution paths fire based on state conditions.
Testing Complete Graph Execution
Once individual nodes pass their tests, you move up the pyramid to graph-level testing. Here we compile an actual LangGraph workflow and run it end-to-end with controlled inputs.
from langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint import MemorySaverimport asyncio
def create_document_processing_workflow(): """Create a complete document processing workflow.""" builder = StateGraph(DocumentProcessingState)
# Add nodes builder.add_node("extract_entities", extract_entities_node) builder.add_node("analyze_sentiment", analyze_sentiment_node) builder.add_node("generate_summary", generate_summary_node) builder.add_node("quality_check", quality_check_node)
# Define the flow builder.add_edge(START, "extract_entities") builder.add_edge("extract_entities", "analyze_sentiment")
# Conditional edge based on sentiment def route_after_sentiment(state: DocumentProcessingState) -> str: if state.get("processing_errors"): return "quality_check"
sentiment = state.get("sentiment_analysis", {}) # Only generate summary for positive/neutral content if sentiment.get("negative", 0) > 0.7: return "quality_check" return "generate_summary"
builder.add_conditional_edges( "analyze_sentiment", route_after_sentiment, { "generate_summary": "generate_summary", "quality_check": "quality_check" } )
builder.add_edge("generate_summary", "quality_check") builder.add_edge("quality_check", END)
return builder.compile()
# Additional nodes for complete workflowdef generate_summary_node(state: DocumentProcessingState) -> Dict: """Generate document summary.""" try: content = state["content"] entities = state.get("entities", [])
# Simple summary generation (in production, use LLM) entity_names = [e["text"] for e in entities] summary = f"Document discusses {', '.join(entity_names)}. " if entity_names else "" summary += f"Content length: {len(content)} characters."
return { "summary": summary, "metadata": {**state.get("metadata", {}), "summary_generated": True} } except Exception as e: return { "processing_errors": state.get("processing_errors", []) + [f"Summary generation failed: {str(e)}"] }
def quality_check_node(state: DocumentProcessingState) -> Dict: """Perform final quality checks.""" issues = []
# Check for processing errors if state.get("processing_errors"): issues.append("Processing errors encountered")
# Check completeness if not state.get("entities"): issues.append("No entities extracted") if not state.get("sentiment_analysis"): issues.append("No sentiment analysis performed")
# For negative content, flag for review sentiment = state.get("sentiment_analysis", {}) if sentiment.get("negative", 0) > 0.7: issues.append("High negative sentiment detected")
return { "metadata": { **state.get("metadata", {}), "quality_check_completed": True, "quality_issues": issues } }
# Comprehensive graph-level testsclass TestDocumentWorkflow: async def test_complete_positive_flow(self): """Test the happy path through the workflow.""" workflow = create_document_processing_workflow()
initial_state = DocumentProcessingState( document_id="test001", content="Apple announced groundbreaking innovations at their Cupertino headquarters. CEO Tim Cook expressed excitement about the future.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={"test_case": "positive_flow"} )
# Execute the workflow result = await workflow.ainvoke(initial_state)
# Verify all steps completed successfully assert result["entities"] is not None assert len(result["entities"]) == 3 # Apple, Cupertino, Tim Cook
assert result["sentiment_analysis"] is not None assert result["sentiment_analysis"]["positive"] > result["sentiment_analysis"]["negative"]
assert result["summary"] is not None assert "Apple" in result["summary"]
# Check metadata for execution tracking assert result["metadata"]["entities_extracted"] is True assert result["metadata"]["sentiment_analyzed"] is True assert result["metadata"]["summary_generated"] is True assert result["metadata"]["quality_check_completed"] is True assert len(result["metadata"]["quality_issues"]) == 0
async def test_negative_sentiment_routing(self): """Test that negative content skips summary generation.""" workflow = create_document_processing_workflow()
initial_state = DocumentProcessingState( document_id="test002", content="The product launch was a complete disaster. Multiple critical failures and disappointed customers.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={"test_case": "negative_flow"} )
result = await workflow.ainvoke(initial_state)
# Should have completed sentiment analysis assert result["sentiment_analysis"] is not None assert result["sentiment_analysis"]["negative"] > 0.5
# Should NOT have generated summary due to negative sentiment assert result["summary"] is None assert "summary_generated" not in result["metadata"]
# Should have quality issues flagged assert "High negative sentiment detected" in result["metadata"]["quality_issues"]
async def test_error_propagation(self): """Test that errors are properly propagated through the workflow.""" workflow = create_document_processing_workflow()
# Create state that will cause errors initial_state = DocumentProcessingState( document_id="test003", content="", # Empty content should cause issues entities=None, sentiment_analysis=None, summary=None, processing_errors=["Pre-existing error"], metadata={"test_case": "error_flow"} )
result = await workflow.ainvoke(initial_state)
# Errors should be accumulated assert len(result["processing_errors"]) >= 1 assert "Pre-existing error" in result["processing_errors"]
# Quality check should flag the errors assert "Processing errors encountered" in result["metadata"]["quality_issues"]
# Test with checkpointing for complex workflowsasync def test_workflow_checkpointing(): """Test workflow execution with checkpointing.""" checkpointer = MemorySaver() workflow = create_document_processing_workflow()
# Compile with checkpointer app = workflow.compile(checkpointer=checkpointer)
initial_state = DocumentProcessingState( document_id="test004", content="Test content for checkpointing workflow.", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
# Execute with thread ID for checkpointing thread_config = {"configurable": {"thread_id": "test-thread-001"}}
# First execution result1 = await app.ainvoke(initial_state, config=thread_config)
# Verify checkpoint was saved saved_state = await checkpointer.aget(thread_config) assert saved_state is not None
# Simulate resuming from checkpoint # In a real scenario, this might be after a failure result2 = await app.ainvoke(None, config=thread_config)
# Results should be consistent assert result2["document_id"] == result1["document_id"] assert result2["entities"] == result1["entities"]The negative sentiment routing test is the critical one. It verifies that when sentiment exceeds the 0.7 negative threshold, the workflow skips summary generation and routes directly to quality check. That conditional branch is exactly the kind of logic that unit tests cannot catch, because it only manifests during full graph execution.
Validating State Transitions and Routing
State transitions are where LangGraph bugs hide. We built a dedicated test pattern that tracks which nodes execute and in what order, giving us full visibility into the routing logic.
class TestStateTransitions: def test_conditional_routing_logic(self): """Test that conditional edges route correctly based on state.""" # Create a simple workflow with conditional routing builder = StateGraph(DocumentProcessingState)
# Track which nodes were visited visited_nodes = []
def track_node(name: str): def node_func(state): visited_nodes.append(name) return state return node_func
builder.add_node("start", track_node("start")) builder.add_node("path_a", track_node("path_a")) builder.add_node("path_b", track_node("path_b")) builder.add_node("end", track_node("end"))
# Conditional routing based on document length def route_by_length(state): if len(state.get("content", "")) > 100: return "path_a" return "path_b"
builder.add_edge(START, "start") builder.add_conditional_edges( "start", route_by_length, { "path_a": "path_a", "path_b": "path_b" } ) builder.add_edge("path_a", "end") builder.add_edge("path_b", "end") builder.add_edge("end", END)
workflow = builder.compile()
# Test path A (long content) visited_nodes.clear() long_content_state = DocumentProcessingState( document_id="test", content="x" * 200, # Long content entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
workflow.invoke(long_content_state) assert visited_nodes == ["start", "path_a", "end"]
# Test path B (short content) visited_nodes.clear() short_content_state = DocumentProcessingState( document_id="test", content="short", # Short content entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )
workflow.invoke(short_content_state) assert visited_nodes == ["start", "path_b", "end"]
async def test_parallel_execution_paths(self): """Test workflows with parallel execution branches.""" builder = StateGraph(DocumentProcessingState)
execution_times = {}
async def slow_node_a(state): start = asyncio.get_event_loop().time() await asyncio.sleep(0.1) # Simulate work execution_times["node_a"] = asyncio.get_event_loop().time() - start return {"metadata": {**state.get("metadata", {}), "node_a_completed": True}}
async def slow_node_b(state): start = asyncio.get_event_loop().time() await asyncio.sleep(0.1) # Simulate work execution_times["node_b"] = asyncio.get_event_loop().time() - start return {"metadata": {**state.get("metadata", {}), "node_b_completed": True}}
def merge_results(state): # This node runs after both parallel nodes complete return { "metadata": { **state.get("metadata", {}), "merge_completed": True, "parallel_execution_verified": True } }
# Build workflow with parallel execution builder.add_node("split", lambda x: x) # Pass-through node builder.add_node("node_a", slow_node_a) builder.add_node("node_b", slow_node_b) builder.add_node("merge", merge_results)
builder.add_edge(START, "split") builder.add_edge("split", "node_a") builder.add_edge("split", "node_b") builder.add_edge("node_a", "merge") builder.add_edge("node_b", "merge") builder.add_edge("merge", END)
workflow = builder.compile()
# Execute workflow start_time = asyncio.get_event_loop().time() result = await workflow.ainvoke(DocumentProcessingState( document_id="parallel_test", content="test", entities=None, sentiment_analysis=None, summary=None, processing_errors=[], metadata={} )) total_time = asyncio.get_event_loop().time() - start_time
# Verify both nodes executed assert result["metadata"]["node_a_completed"] is True assert result["metadata"]["node_b_completed"] is True assert result["metadata"]["merge_completed"] is True
# Verify parallel execution (total time should be ~0.1s, not ~0.2s) assert total_time < 0.15 # Allow some overhead assert execution_times["node_a"] >= 0.1 assert execution_times["node_b"] >= 0.1The track_node pattern is one of our most reused test utilities. Wrap each node in a tracker, run the workflow, then assert on the exact sequence. The parallel execution test goes further: it verifies that LangGraph actually runs the branches concurrently by checking that total wall-clock time is roughly equal to one branch, not both added together.
KEY INSIGHT: Build “node visitor” tracking into your LangGraph tests. Asserting on the exact sequence of visited nodes catches routing bugs that would be invisible if you only checked final state.
Bringing It All Together: End-to-End Testing
Full Pipeline Verification
End-to-end tests wire together everything: Pydantic AI agents, LangGraph workflows, mocked external services, and validation logic. These tests verify that the entire pipeline behaves correctly from input to output.
from pydantic_ai import Agentfrom pydantic_ai.models.test import TestModelfrom langgraph.graph import StateGraphfrom pydantic import BaseModel, Fieldfrom typing import List, Dict, Optionalfrom unittest.mock import MagicMock, AsyncMockimport json
# Define our domain modelsclass NewsArticle(BaseModel): """Structured news article data.""" headline: str = Field(..., min_length=10, max_length=200) content: str = Field(..., min_length=50) category: str = Field(..., pattern="^(tech|business|health|sports)$") entities: List[str] = Field(default_factory=list) sentiment_score: float = Field(default=0.0, ge=-1, le=1)
class AnalysisResult(BaseModel): """Result of article analysis.""" article_id: str key_insights: List[str] = Field(..., min_items=1, max_items=5) recommended_actions: List[str] = Field(default_factory=list) risk_level: str = Field(default="low", pattern="^(low|medium|high)$")
# Create specialized agentscontent_analyzer = Agent( 'openai:gpt-4o', result_type=AnalysisResult, system_prompt="Analyze news articles for key insights and actionable recommendations.")
# Define workflow stateclass NewsAnalysisState(TypedDict): article: NewsArticle raw_content: str analysis: Optional[AnalysisResult] external_data: Optional[Dict] notifications_sent: List[str]
# Mock external servicesclass ExternalServices: def __init__(self): self.database = AsyncMock() self.notification_service = AsyncMock() self.enrichment_api = AsyncMock()
# Comprehensive end-to-end testclass TestNewsAnalysisSystem: async def test_complete_article_processing_flow(self): """Test the entire article processing pipeline end-to-end.""" # Set up mocks services = ExternalServices()
# Mock database responses services.database.fetch_article.return_value = { "id": "article-123", "raw_content": "Apple announced record profits today. CEO Tim Cook stated that innovation continues to drive growth. The tech giant's stock rose 5% in after-hours trading.", "metadata": {"source": "Reuters", "timestamp": "2024-01-15T10:00:00Z"} }
# Mock enrichment API services.enrichment_api.enrich.return_value = { "related_articles": ["article-120", "article-121"], "market_data": {"AAPL": {"change": "+5%", "volume": "high"}} }
# Create workflow with mocked services def create_workflow(services): builder = StateGraph(NewsAnalysisState)
async def fetch_article(state): # Fetch from database article_data = await services.database.fetch_article("article-123") return {"raw_content": article_data["raw_content"]}
async def parse_article(state): # In real system, this would use an LLM # For testing, we create structured data article = NewsArticle( headline="Apple Reports Record Profits", content=state["raw_content"], category="tech", entities=["Apple", "Tim Cook"], sentiment_score=0.8 ) return {"article": article}
async def enrich_data(state): # Call external enrichment service enrichment = await services.enrichment_api.enrich( entities=state["article"].entities ) return {"external_data": enrichment}
async def analyze_article(state): # Use our Pydantic AI agent with content_analyzer.override(model=TestModel()): # Simulate agent response analysis = AnalysisResult( article_id="article-123", key_insights=[ "Apple shows strong financial performance", "Positive market reaction with 5% stock increase", "Leadership emphasizes continued innovation" ], recommended_actions=[ "Monitor competitor responses", "Track sustained stock performance" ], risk_level="low" ) return {"analysis": analysis}
async def send_notifications(state): # Send notifications based on analysis notifications = []
if state["analysis"].risk_level == "high": await services.notification_service.send_alert( "High risk article detected", state["article"].headline ) notifications.append("risk_alert")
if state["article"].sentiment_score < -0.5: await services.notification_service.send_alert( "Negative sentiment detected", state["article"].headline ) notifications.append("sentiment_alert")
return {"notifications_sent": notifications}
# Build the workflow builder.add_node("fetch", fetch_article) builder.add_node("parse", parse_article) builder.add_node("enrich", enrich_data) builder.add_node("analyze", analyze_article) builder.add_node("notify", send_notifications)
# Define flow builder.add_edge(START, "fetch") builder.add_edge("fetch", "parse") builder.add_edge("parse", "enrich") builder.add_edge("enrich", "analyze") builder.add_edge("analyze", "notify") builder.add_edge("notify", END)
return builder.compile()
# Execute the workflow workflow = create_workflow(services) initial_state = NewsAnalysisState( article=None, raw_content="", analysis=None, external_data=None, notifications_sent=[] )
result = await workflow.ainvoke(initial_state)
# Comprehensive assertions # 1. Verify article was fetched and parsed assert result["article"] is not None assert result["article"].headline == "Apple Reports Record Profits" assert result["article"].category == "tech" assert len(result["article"].entities) == 2
# 2. Verify enrichment occurred assert result["external_data"] is not None assert "related_articles" in result["external_data"] assert "market_data" in result["external_data"]
# 3. Verify analysis was performed assert result["analysis"] is not None assert len(result["analysis"].key_insights) == 3 assert result["analysis"].risk_level == "low"
# 4. Verify service interactions services.database.fetch_article.assert_called_once_with("article-123") services.enrichment_api.enrich.assert_called_once()
# 5. Verify notifications (none sent for positive low-risk article) assert len(result["notifications_sent"]) == 0 services.notification_service.send_alert.assert_not_called()
async def test_error_handling_and_recovery(self): """Test system behavior when components fail.""" services = ExternalServices()
# Configure enrichment API to fail services.enrichment_api.enrich.side_effect = Exception("API timeout")
# Create workflow with error handling def create_resilient_workflow(services): builder = StateGraph(NewsAnalysisState)
async def enrich_with_fallback(state): try: enrichment = await services.enrichment_api.enrich( entities=state["article"].entities ) return {"external_data": enrichment} except Exception as e: # Fallback to basic data return { "external_data": { "error": str(e), "fallback": True, "related_articles": [] } }
# ... (other nodes remain the same)
builder.add_node("enrich", enrich_with_fallback) # ... (build rest of workflow)
return builder.compile()
workflow = create_resilient_workflow(services) result = await workflow.ainvoke(initial_state)
# Verify graceful degradation assert result["external_data"] is not None assert result["external_data"]["fallback"] is True assert "error" in result["external_data"]
# Verify workflow continued despite enrichment failure assert result["analysis"] is not NoneThe error recovery test is particularly important. When the enrichment API times out, the workflow should degrade gracefully, filling in fallback data and continuing rather than crashing. We check both that the fallback was used and that downstream nodes still executed successfully.
CI/CD Pipeline for Agent Testing
Running these tests consistently requires a CI pipeline that separates fast deterministic tests from slow LLM-dependent ones. Here is the GitHub Actions configuration we use:
name: Langgraph + Pydantic AI Test Suite
on: push: branches: [ main, develop ] pull_request: branches: [ main ] schedule: # Run nightly tests with live LLMs - cron: '0 2 * * *'
jobs: unit-tests: runs-on: ubuntu-latest strategy: matrix: python-version: ['3.9', '3.10', '3.11']
steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }}
- name: Cache dependencies uses: actions/cache@v3 with: path: | ~/.cache/pip .venv key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install pytest pytest-asyncio pytest-cov pytest-timeout
- name: Run unit tests with coverage run: | pytest tests/unit/ -v --cov=src --cov-report=xml --timeout=30 env: TESTING: "true"
- name: Upload coverage uses: codecov/codecov-action@v3 with: file: ./coverage.xml
integration-tests: runs-on: ubuntu-latest needs: unit-tests
services: redis: image: redis:alpine ports: - 6379:6379 options: >- --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
steps: - uses: actions/checkout@v3
- name: Run integration tests run: | pytest tests/integration/ -v --timeout=60 env: REDIS_URL: redis://localhost:6379 USE_TEST_MODELS: "true"
workflow-tests: runs-on: ubuntu-latest needs: integration-tests
steps: - uses: actions/checkout@v3
- name: Run workflow tests run: | pytest tests/workflows/ -v -m "not slow" --timeout=120
nightly-llm-tests: if: github.event_name == 'schedule' runs-on: ubuntu-latest
steps: - uses: actions/checkout@v3
- name: Run tests with real LLMs run: | pytest tests/e2e/ -v -m "llm_required" --timeout=300 env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} RUN_EXPENSIVE_TESTS: "true"
performance-benchmarks: runs-on: ubuntu-latest if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps: - uses: actions/checkout@v3
- name: Run performance benchmarks run: | python -m pytest tests/benchmarks/ -v --benchmark-only
- name: Store benchmark results uses: benchmark-action/github-action-benchmark@v1 with: tool: 'pytest' output-file-path: output.json github-token: ${{ secrets.GITHUB_TOKEN }} auto-push: trueThe pipeline has five layers. Unit tests run on every push across 3 Python versions with a 30-second timeout. Integration tests spin up Redis and use TestModel. Workflow tests exercise full LangGraph graphs. Nightly tests hit real LLMs to catch model behavior changes. Performance benchmarks run on main branch pushes and track regressions over time.
Building an Evaluation Framework
Beyond pass/fail testing, AI agents need quality evaluation. We built a lightweight evaluation framework that scores agent responses across multiple dimensions: required entities, forbidden phrases, and flexible output matching.
from typing import List, Dict, Tuplefrom dataclasses import dataclassfrom pydantic_ai import Agentfrom pydantic_ai.models.test import TestModelimport numpy as np
@dataclassclass EvaluationCase: """Single test case for agent evaluation.""" input_text: str expected_outputs: List[str] # Multiple acceptable outputs required_entities: List[str] # Entities that must be detected forbidden_phrases: List[str] # Phrases that shouldn't appear min_quality_score: float = 0.8
@dataclassclass EvaluationResult: """Results from evaluating an agent.""" total_cases: int passed_cases: int failed_cases: List[Tuple[str, str]] # (input, reason) accuracy: float average_quality_score: float performance_metrics: Dict[str, float]
class AgentEvaluator: """Comprehensive evaluation framework for AI agents."""
def __init__(self, agent: Agent, test_cases: List[EvaluationCase]): self.agent = agent self.test_cases = test_cases
async def evaluate(self, use_test_model: bool = True) -> EvaluationResult: """Run comprehensive evaluation of the agent.""" passed = 0 failed_cases = [] quality_scores = []
# Override with TestModel for consistent evaluation if use_test_model: context_manager = self.agent.override(model=TestModel()) else: context_manager = nullcontext() # No override
with context_manager: for case in self.test_cases: try: # Run agent result = await self.agent.run(case.input_text)
# Evaluate result evaluation = self._evaluate_single_case(result.data, case)
if evaluation["passed"]: passed += 1 quality_scores.append(evaluation["quality_score"]) else: failed_cases.append((case.input_text, evaluation["reason"]))
except Exception as e: failed_cases.append((case.input_text, f"Exception: {str(e)}"))
return EvaluationResult( total_cases=len(self.test_cases), passed_cases=passed, failed_cases=failed_cases, accuracy=passed / len(self.test_cases), average_quality_score=np.mean(quality_scores) if quality_scores else 0.0, performance_metrics=self._calculate_performance_metrics() )
def _evaluate_single_case(self, output: str, case: EvaluationCase) -> Dict: """Evaluate a single test case result.""" reasons = [] quality_score = 1.0
# Check for required entities missing_entities = [] for entity in case.required_entities: if entity.lower() not in output.lower(): missing_entities.append(entity) quality_score -= 0.1
if missing_entities: reasons.append(f"Missing entities: {missing_entities}")
# Check for forbidden phrases found_forbidden = [] for phrase in case.forbidden_phrases: if phrase.lower() in output.lower(): found_forbidden.append(phrase) quality_score -= 0.2
if found_forbidden: reasons.append(f"Contains forbidden phrases: {found_forbidden}")
# Check if output matches any expected outputs matched_expected = False for expected in case.expected_outputs: # Flexible matching - could be substring, semantic similarity, etc. if self._flexible_match(output, expected): matched_expected = True break
if not matched_expected and case.expected_outputs: reasons.append("Output doesn't match expected patterns") quality_score -= 0.3
# Ensure quality score is within bounds quality_score = max(0.0, min(1.0, quality_score))
return { "passed": quality_score >= case.min_quality_score and not reasons, "quality_score": quality_score, "reason": "; ".join(reasons) if reasons else "Passed" }
def _flexible_match(self, output: str, expected: str) -> bool: """Flexible matching that handles variations.""" # Simple implementation - in practice, use semantic similarity output_lower = output.lower().strip() expected_lower = expected.lower().strip()
# Exact match if output_lower == expected_lower: return True
# Substring match if expected_lower in output_lower: return True
# Key phrases match (80% of words present) expected_words = set(expected_lower.split()) output_words = set(output_lower.split()) overlap = len(expected_words.intersection(output_words))
return overlap / len(expected_words) >= 0.8 if expected_words else False
def _calculate_performance_metrics(self) -> Dict[str, float]: """Calculate additional performance metrics.""" # In a real implementation, track timing, token usage, etc. return { "avg_response_time": 0.1, # seconds "avg_tokens_used": 150, "error_rate": 0.02 }
# Example usageasync def evaluate_customer_service_agent(): """Evaluate a customer service agent comprehensively."""
# Define evaluation cases test_cases = [ EvaluationCase( input_text="My order #12345 hasn't arrived and it's been 2 weeks!", expected_outputs=[ "I sincerely apologize for the delay with order #12345", "I'm sorry to hear about the delay with your order #12345" ], required_entities=["#12345", "apologize"], forbidden_phrases=["calm down", "not my problem"], min_quality_score=0.8 ), EvaluationCase( input_text="How do I return a defective product?", expected_outputs=[ "To return a defective product, please follow these steps", "I'll help you with the return process for your defective product" ], required_entities=["return", "defective"], forbidden_phrases=["figure it out yourself", "too bad"], min_quality_score=0.85 ), # ... more test cases ]
# Create agent agent = Agent( 'openai:gpt-4o', system_prompt="You are a helpful customer service representative." )
# Run evaluation evaluator = AgentEvaluator(agent, test_cases) results = await evaluator.evaluate(use_test_model=True)
# Report results print(f"Evaluation Results:") print(f"- Accuracy: {results.accuracy:.1%}") print(f"- Average Quality: {results.average_quality_score:.2f}") print(f"- Failed Cases: {len(results.failed_cases)}")
for input_text, reason in results.failed_cases[:3]: # Show first 3 failures print(f" - Input: '{input_text[:50]}...'") print(f" Reason: {reason}")Performance Testing Under Load
Measuring What Matters in Production
Performance testing for AI agent systems goes beyond simple response time. You need to track latency percentiles, throughput under concurrency, per-node execution time within LangGraph workflows, and resource consumption over sustained load.

Figure 4: Performance Testing Framework. Test queries of varying complexity feed into the test executor, which runs the agent system and collects raw data. The analysis covers four dimensions: latency metrics (avg, p50, p95, p99), throughput (requests per second), resource usage (memory and CPU), and per-node execution times. Bottleneck identification drives optimization priorities.
Here is the benchmarking framework we use:
import asyncioimport timeimport psutilimport statisticsfrom dataclasses import dataclass, fieldfrom typing import List, Dict, Optional, Callablefrom concurrent.futures import ThreadPoolExecutorimport matplotlib.pyplot as pltfrom datetime import datetime
@dataclassclass PerformanceMetrics: """Comprehensive performance metrics for agent execution.""" request_id: str start_time: float end_time: float total_duration: float node_timings: Dict[str, float] = field(default_factory=dict) memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 tokens_used: int = 0 error_occurred: bool = False error_message: Optional[str] = None
@dataclassclass BenchmarkResult: """Aggregated benchmark results.""" total_requests: int successful_requests: int failed_requests: int avg_latency_ms: float p50_latency_ms: float p95_latency_ms: float p99_latency_ms: float throughput_rps: float avg_memory_mb: float peak_memory_mb: float avg_cpu_percent: float node_performance: Dict[str, Dict[str, float]]
class PerformanceBenchmark: """Comprehensive performance benchmarking for Langgraph + Pydantic AI systems."""
def __init__(self, agent_system, test_data: List[Dict]): self.agent_system = agent_system self.test_data = test_data self.metrics: List[PerformanceMetrics] = []
async def run_benchmark( self, duration_seconds: int = 60, concurrent_requests: int = 10, warmup_requests: int = 5 ) -> BenchmarkResult: """Run a comprehensive performance benchmark."""
# Warmup phase print(f"Running {warmup_requests} warmup requests...") for i in range(warmup_requests): await self._execute_single_request(f"warmup-{i}", self.test_data[0])
# Clear warmup metrics self.metrics.clear()
# Main benchmark print(f"Running benchmark for {duration_seconds} seconds with {concurrent_requests} concurrent requests...")
start_time = time.time() end_time = start_time + duration_seconds request_count = 0
# Create a pool of requests async def request_worker(worker_id: int): nonlocal request_count while time.time() < end_time: test_case = self.test_data[request_count % len(self.test_data)] request_id = f"req-{worker_id}-{request_count}" request_count += 1
await self._execute_single_request(request_id, test_case)
# Run concurrent workers workers = [request_worker(i) for i in range(concurrent_requests)] await asyncio.gather(*workers)
# Calculate results return self._calculate_results(time.time() - start_time)
async def _execute_single_request(self, request_id: str, test_input: Dict) -> PerformanceMetrics: """Execute a single request and collect metrics.""" # Initialize metrics metrics = PerformanceMetrics( request_id=request_id, start_time=time.time(), end_time=0, total_duration=0 )
# Monitor system resources process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 # MB
try: # Track node execution times if using Langgraph if hasattr(self.agent_system, '_graph'): node_timings = {}
# Monkey-patch nodes to track timing original_nodes = {} for node_name, node_func in self.agent_system._graph.nodes.items(): original_nodes[node_name] = node_func
async def timed_node(state, _node_name=node_name, _original=node_func): node_start = time.time() result = await _original(state) if asyncio.iscoroutinefunction(_original) else _original(state) node_timings[_node_name] = time.time() - node_start return result
self.agent_system._graph.nodes[node_name] = timed_node
# Execute request result = await self.agent_system.ainvoke(test_input)
# Restore original nodes for node_name, node_func in original_nodes.items(): self.agent_system._graph.nodes[node_name] = node_func
metrics.node_timings = node_timings else: # Direct agent execution result = await self.agent_system.run(test_input['query'])
# Collect resource usage metrics.memory_usage_mb = process.memory_info().rss / 1024 / 1024 - initial_memory metrics.cpu_usage_percent = process.cpu_percent(interval=0.1)
# Extract token usage if available if hasattr(result, 'usage'): metrics.tokens_used = result.usage.get('total_tokens', 0)
except Exception as e: metrics.error_occurred = True metrics.error_message = str(e)
metrics.end_time = time.time() metrics.total_duration = metrics.end_time - metrics.start_time
self.metrics.append(metrics) return metrics
def _calculate_results(self, total_duration: float) -> BenchmarkResult: """Calculate aggregate benchmark results.""" successful_metrics = [m for m in self.metrics if not m.error_occurred] failed_count = len([m for m in self.metrics if m.error_occurred])
if not successful_metrics: raise ValueError("No successful requests to analyze")
# Calculate latency percentiles latencies = [m.total_duration * 1000 for m in successful_metrics] # Convert to ms latencies.sort()
# Calculate node performance node_performance = {} all_nodes = set() for m in successful_metrics: all_nodes.update(m.node_timings.keys())
for node in all_nodes: node_times = [m.node_timings.get(node, 0) * 1000 for m in successful_metrics if node in m.node_timings] if node_times: node_performance[node] = { 'avg_ms': statistics.mean(node_times), 'p95_ms': node_times[int(len(node_times) * 0.95)], 'percentage': statistics.mean(node_times) / statistics.mean(latencies) * 100 }
return BenchmarkResult( total_requests=len(self.metrics), successful_requests=len(successful_metrics), failed_requests=failed_count, avg_latency_ms=statistics.mean(latencies), p50_latency_ms=latencies[int(len(latencies) * 0.50)], p95_latency_ms=latencies[int(len(latencies) * 0.95)], p99_latency_ms=latencies[int(len(latencies) * 0.99)], throughput_rps=len(successful_metrics) / total_duration, avg_memory_mb=statistics.mean([m.memory_usage_mb for m in successful_metrics]), peak_memory_mb=max([m.memory_usage_mb for m in successful_metrics]), avg_cpu_percent=statistics.mean([m.cpu_usage_percent for m in successful_metrics]), node_performance=node_performance )
def generate_report(self, result: BenchmarkResult, output_file: str = "benchmark_report.png"): """Generate a visual performance report.""" fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Latency distribution latencies = [m.total_duration * 1000 for m in self.metrics if not m.error_occurred] ax1.hist(latencies, bins=50, alpha=0.7, color='blue', edgecolor='black') ax1.axvline(result.p50_latency_ms, color='red', linestyle='--', label=f'P50: {result.p50_latency_ms:.1f}ms') ax1.axvline(result.p95_latency_ms, color='orange', linestyle='--', label=f'P95: {result.p95_latency_ms:.1f}ms') ax1.set_xlabel('Latency (ms)') ax1.set_ylabel('Frequency') ax1.set_title('Latency Distribution') ax1.legend()
# Throughput over time time_buckets = {} for m in self.metrics: bucket = int(m.start_time - self.metrics[0].start_time) time_buckets[bucket] = time_buckets.get(bucket, 0) + 1
times = sorted(time_buckets.keys()) throughputs = [time_buckets[t] for t in times] ax2.plot(times, throughputs, marker='o') ax2.set_xlabel('Time (seconds)') ax2.set_ylabel('Requests per second') ax2.set_title('Throughput Over Time') ax2.grid(True, alpha=0.3)
# Node performance breakdown if result.node_performance: nodes = list(result.node_performance.keys()) avg_times = [result.node_performance[n]['avg_ms'] for n in nodes]
ax3.barh(nodes, avg_times, color='green', alpha=0.7) ax3.set_xlabel('Average Time (ms)') ax3.set_title('Node Performance Breakdown') ax3.grid(True, alpha=0.3)
# Resource usage memory_usage = [m.memory_usage_mb for m in self.metrics if not m.error_occurred] cpu_usage = [m.cpu_usage_percent for m in self.metrics if not m.error_occurred]
ax4_twin = ax4.twinx() ax4.plot(range(len(memory_usage)), memory_usage, 'b-', label='Memory (MB)') ax4_twin.plot(range(len(cpu_usage)), cpu_usage, 'r-', label='CPU (%)') ax4.set_xlabel('Request Number') ax4.set_ylabel('Memory (MB)', color='b') ax4_twin.set_ylabel('CPU (%)', color='r') ax4.set_title('Resource Usage') ax4.tick_params(axis='y', labelcolor='b') ax4_twin.tick_params(axis='y', labelcolor='r')
plt.tight_layout() plt.savefig(output_file) plt.close()
# Print summary print("\n" + "="*50) print("PERFORMANCE BENCHMARK RESULTS") print("="*50) print(f"Total Requests: {result.total_requests}") print(f"Successful: {result.successful_requests} ({result.successful_requests/result.total_requests*100:.1f}%)") print(f"Failed: {result.failed_requests}") print(f"\nLatency Metrics:") print(f" Average: {result.avg_latency_ms:.1f}ms") print(f" P50: {result.p50_latency_ms:.1f}ms") print(f" P95: {result.p95_latency_ms:.1f}ms") print(f" P99: {result.p99_latency_ms:.1f}ms") print(f"\nThroughput: {result.throughput_rps:.1f} requests/second") print(f"\nResource Usage:") print(f" Avg Memory: {result.avg_memory_mb:.1f}MB") print(f" Peak Memory: {result.peak_memory_mb:.1f}MB") print(f" Avg CPU: {result.avg_cpu_percent:.1f}%")
if result.node_performance: print(f"\nNode Performance:") for node, perf in sorted(result.node_performance.items(), key=lambda x: x[1]['avg_ms'], reverse=True): print(f" {node}: {perf['avg_ms']:.1f}ms avg, {perf['p95_ms']:.1f}ms p95 ({perf['percentage']:.1f}% of total)")
# Example usageasync def benchmark_document_processing_system(): """Benchmark a complete document processing system."""
# Create test data test_documents = [ { "document_id": f"doc-{i}", "content": f"Sample document {i} with various content..." * 50, "processing_options": { "extract_entities": True, "analyze_sentiment": True, "generate_summary": i % 2 == 0 # Only half generate summaries } } for i in range(10) ]
# Create your agent system (workflow or agent) document_processor = create_document_processing_workflow()
# Run benchmark benchmark = PerformanceBenchmark(document_processor, test_documents) result = await benchmark.run_benchmark( duration_seconds=60, concurrent_requests=5, warmup_requests=10 )
# Generate report benchmark.generate_report(result)The node-level timing breakdown is the most actionable part of this framework. When your p95 latency spikes, you can immediately see which node is the bottleneck. In our system, we discovered that the entity extraction node was taking 4x longer than expected because of an unoptimized regex, something we never would have found with aggregate timing alone.
Strategies That Hold Up in Production
Eight Practices We Learned the Hard Way
After building and testing multiple LangGraph + Pydantic AI systems, here are the practices that made the biggest difference.
Isolate LLM dependencies. Always use TestModel or FunctionModel in your test suite. We save real LLM calls for nightly evaluation runs. Our unit tests went from 12 minutes and $3 per run to 40 seconds and $0.
Test at every pyramid level. Unit tests catch logic errors in seconds. Integration tests verify component handoffs. Workflow tests expose routing bugs. E2E tests confirm the user experience. Skip any layer and bugs slip through.
Validate state transitions explicitly. In LangGraph, the conditional edges are where the logic lives and where the bugs hide. Use the node-visitor tracking pattern to assert on exact execution paths, not just final outputs.
Test both sides of every validation boundary. With Pydantic AI, test that valid data passes and that invalid data fails with the correct error message. Our meeting scheduler accepted solo meetings for three weeks because we never tested the minimum attendees constraint.
Mock external services with dependency injection. Pass services as parameters to your workflow factory function. In tests, inject AsyncMock instances. In production, inject real clients. No monkeypatching, no test pollution.
Separate test tiers in CI. Fast tests on every push, integration tests on PR, workflow tests on merge, LLM tests on a nightly schedule. Developers get feedback in seconds, not minutes.
Establish performance baselines early. Run benchmarks from week one. A 50ms regression in a hot path compounds into seconds at scale. Track p95 and p99, not just averages, because tail latency is where users feel pain.
Test error handling as thoroughly as happy paths. AI systems fail in unique ways: API timeouts, rate limits, malformed model outputs, hallucinated tool calls. Every failure mode needs a test that verifies graceful degradation, not just crash-free execution.
KEY INSIGHT: The highest-value tests for AI agent systems are workflow-level tests that verify conditional routing and state transitions. Unit tests prove components work. Workflow tests prove the system works.
What Comes Next
The AI agent testing landscape is moving fast. Automated test generation using LLMs to write test cases for other LLMs is already practical. Adversarial testing frameworks that probe agents for prompt injection vulnerabilities and edge-case failures are maturing. Semantic verification, where test assertions check meaning rather than string equality, is replacing brittle exact-match patterns.
The biggest shift ahead is distributed testing for multi-agent systems. As agent architectures scale across services, we will need chaos engineering approaches adapted for AI, injecting failures, latency, and malformed responses to verify that agent orchestration degrades gracefully under real-world conditions.
The fundamentals, though, stay the same. Test at every level. Isolate what you can control. Verify the paths your system actually takes. Measure what matters in production. If you build your testing strategy on those principles, you will have agent systems that are reliable enough to trust with real users and real stakes.
References
[1] Pydantic AI Documentation, “Testing and Evaluation Guide”, https://ai.pydantic.dev/testing-evals/ (2025)
[2] Langgraph Documentation, “Testing Workflows and State Management”, https://langchain-ai.github.io/langgraph/tutorials/testing/ (2025)
[3] Harrison Chase, “Best Practices for Testing LangChain Applications”, LangChain Blog (2024)
[4] Samuel Colvin, “Pydantic V2 Validation Strategies”, https://docs.pydantic.dev/latest/concepts/validators/ (2024)
[5] OpenAI, “Best Practices for Testing LLM Applications”, https://platform.openai.com/docs/guides/testing (2024)
[6] Mitchell Hashimoto, “Testing Strategies for Non-Deterministic Systems”, HashiCorp Blog (2023)
[7] Martin Fowler, “Testing Strategies in a Microservice Architecture”, https://martinfowler.com/articles/microservice-testing/ (2024)