5227 words
26 minutes
Scaling LangGraph and Pydantic AI Systems: From Prototype to Production
2025-06-15
2026-02-01
No Tags

We took an AI research platform from 10 requests per minute to 10,000 concurrent users. P95 latency dropped below 2 seconds. Infrastructure costs fell by 60%. The secret was not throwing more servers at it. We rewrote three layers of the stack: caching, serialization, and parallel execution.

Langgraph and Pydantic AI make it deceptively easy to build sophisticated agent workflows. You wire up a graph, validate every input and output with Pydantic schemas, and the prototype works beautifully on your laptop. Then you deploy it. Fifteen concurrent users and the whole thing falls over. Response times spike, memory spirals, and your cloud bill triples overnight.

We learned this the hard way. Our first production deployment of a Langgraph-based document processing pipeline looked perfect in staging. Day one in production, it crashed under real load because every single state transition was serializing to JSON, every Pydantic model was validating fields that had already been validated upstream, and no two LLM calls ran in parallel. We were doing everything sequentially in a framework designed for parallelism.

The fix required rethinking how we approach scaling at every layer of the stack, from memory management to persistence. Here is what we learned.

Where the Time Actually Goes#

The Unique Challenge of Scaling Agent Systems#

Scaling a Langgraph pipeline is nothing like scaling a traditional web API. You are not serving static content. A single user request can trigger dozens of operations across your workflow graph: LLM calls, data validations, external service lookups, state serializations. Each of those operations has different scaling characteristics, and a bottleneck in one cascades through the entire pipeline.

Langgraph orchestrates data flow through processing nodes like a traffic controller. Each node might call an LLM, run a validation, or hit an external API. Pydantic ensures type safety at every boundary, catching bad data before it propagates. Both are powerful. Both add overhead that compounds at scale.

Through profiling multiple production deployments, we identified where time and resources actually go:

Bottleneck TypeImpactTypical Contribution
LLM API CallsHigh latency, API rate limits40-60% of response time
Pydantic ValidationCPU overhead, memory allocation10-20% of response time
State SerializationI/O overhead, memory usage15-25% of response time
Graph TraversalCoordination overhead5-10% of response time
External Service CallsNetwork latency, reliability10-30% of response time

Look at those numbers. LLM calls dominate. Yet we spent our first two weeks optimizing graph traversal. Total waste. Profile first, then optimize the thing that actually matters.

KEY INSIGHT: Always profile before optimizing. If 60% of your response time is LLM latency, no amount of graph traversal tuning will save you.

Layered Architecture for Production Scale#

The Four Layers You Need to Get Right#

Scaling these systems effectively means thinking in layers. Each layer has distinct responsibilities and different optimization levers.

Figure 1: Langgraph and Pydantic System Architecture — Four layers from memory management through persistence, each with distinct optimization opportunities. Separating synchronous and asynchronous execution paths allows flexible scaling based on workload type.

Memory Management Layer: Reference-based management passes pointers between components, minimizing copies. Copy-based management ensures isolation at the cost of higher memory use. The choice here directly controls your concurrency ceiling.

Serialization Layer: Every time state moves between nodes or gets persisted, serialization fires. JSON is readable but slow. MessagePack or Protocol Buffers run 3-5x faster. We will show the benchmarks below.

Execution Layer: Langgraph supports both synchronous and asynchronous execution. Async unlocks parallel node processing, which is transformative for I/O-bound workloads like LLM calls.

Persistence Layer: In-memory storage is fast but volatile. Document stores offer flexibility. Relational databases give you ACID guarantees. Pick the right tool for each type of state.

Vertical vs. Horizontal: Pick Your Scaling Path#

Vertical Scaling (Scaling Up) maximizes single-instance performance. Here we optimize Pydantic models to skip redundant validation for trusted internal data:

# Example: Optimizing Pydantic models for vertical scaling
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
import ujson # Faster JSON library
class OptimizedModel(BaseModel):
"""Pydantic model optimized for performance."""
class Config:
# Disable validation on assignment for performance
validate_assignment = False
# Use faster JSON library
json_loads = ujson.loads
json_dumps = ujson.dumps
# Keep instances in memory longer
keep_untouched = True
id: str
data: Dict[str, any]
def __init__(self, **data):
# Skip validation for trusted internal data
if data.get('_skip_validation', False):
super().__init__(**data)
object.__setattr__(self, '__dict__', data)
object.__setattr__(self, '__fields_set__', set(data.keys()))
else:
super().__init__(**data)

Horizontal Scaling (Scaling Out) distributes work across multiple instances. This executor partitions graph nodes across a process pool:

# Example: Distributed Langgraph execution
from langgraph.graph import StateGraph
from typing import Dict, List
import asyncio
from concurrent.futures import ProcessPoolExecutor
class DistributedGraphExecutor:
"""Execute Langgraph nodes across multiple processes."""
def __init__(self, graph: StateGraph, num_workers: int = 4):
self.graph = graph
self.executor = ProcessPoolExecutor(max_workers=num_workers)
self.node_assignments = self._partition_nodes()
def _partition_nodes(self) -> Dict[str, int]:
"""Assign nodes to workers for balanced execution."""
nodes = list(self.graph.nodes.keys())
assignments = {}
for i, node in enumerate(nodes):
# Simple round-robin assignment
# In practice, use workload characteristics
assignments[node] = i % self.executor._max_workers
return assignments
async def execute_distributed(self, initial_state: Dict) -> Dict:
"""Execute graph with distributed node processing."""
state = initial_state.copy()
# Track node execution across workers
futures = {}
for node_name in self.graph.execution_order:
worker_id = self.node_assignments[node_name]
# Submit node execution to assigned worker
future = self.executor.submit(
self._execute_node_isolated,
node_name,
state
)
futures[node_name] = future
# Wait for dependencies before continuing
if self._has_dependencies(node_name):
await self._wait_for_dependencies(node_name, futures)
# Update state with results
result = await asyncio.wrap_future(future)
state.update(result)
return state

Performance Optimization Techniques#

Memory: The Silent Killer#

Memory management can make or break a scaled deployment. We learned this when our document pipeline started OOM-killing pods at 200 concurrent users. The culprit: every Pydantic model was eagerly validating all 47 fields on construction, even though most code paths only touched 3-5 fields.

Lazy Validation Pattern — validate only when a field is actually accessed:

from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
from functools import lru_cache
class LazyValidationModel(BaseModel):
"""Model that delays validation until access."""
_raw_data: Dict[str, Any] = {}
_validated_fields: set = set()
class Config:
arbitrary_types_allowed = True
def __getattribute__(self, name):
# Check if this is a field that needs validation
if name in self.__fields__ and name not in self._validated_fields:
# Validate just this field
self._validate_field(name)
self._validated_fields.add(name)
return super().__getattribute__(name)
def _validate_field(self, field_name: str):
"""Validate a single field on demand."""
field = self.__fields__[field_name]
raw_value = self._raw_data.get(field_name)
# Apply field validation
validated_value, errors = field.validate(
raw_value, {}, loc=(field_name,)
)
if errors:
raise ValueError(f"Validation error for {field_name}: {errors}")
# Store validated value
setattr(self, field_name, validated_value)
# Usage example
large_dataset = {"field1": "value1", "field2": {"nested": "data"}, ...}
model = LazyValidationModel(_raw_data=large_dataset)
# No validation happens until you access a field
print(model.field1) # Validates only field1

Object Pooling — reuse model instances instead of allocating new ones on every request:

from typing import TypeVar, Generic, List
from threading import Lock
import weakref
T = TypeVar('T')
class ObjectPool(Generic[T]):
"""Thread-safe object pool for Pydantic models."""
def __init__(self, factory: callable, max_size: int = 100):
self._factory = factory
self._pool: List[T] = []
self._max_size = max_size
self._lock = Lock()
self._in_use: weakref.WeakSet = weakref.WeakSet()
def acquire(self) -> T:
"""Get an object from the pool or create a new one."""
with self._lock:
if self._pool:
obj = self._pool.pop()
else:
obj = self._factory()
self._in_use.add(obj)
return obj
def release(self, obj: T):
"""Return an object to the pool."""
with self._lock:
if obj in self._in_use:
self._in_use.remove(obj)
# Reset object state before returning to pool
if hasattr(obj, 'clear'):
obj.clear()
if len(self._pool) < self._max_size:
self._pool.append(obj)
# Example usage with Pydantic models
class PooledModel(BaseModel):
data: Dict[str, Any] = Field(default_factory=dict)
def clear(self):
"""Reset model state for reuse."""
self.data.clear()
# Create a pool for frequently used models
model_pool = ObjectPool(PooledModel, max_size=50)
# In your hot path
model = model_pool.acquire()
try:
model.data = process_data(raw_input)
# Use model
finally:
model_pool.release(model)

Serialization: The Overlooked Bottleneck#

Serialization accounted for 22% of our total response time. We did not even realize it until we profiled. Every state transition between Langgraph nodes was round-tripping through JSON — readable, yes, but painfully slow at volume.

Switching to MessagePack cut serialization time by 3-5x:

import msgpack
from pydantic import BaseModel
from typing import Dict, Any
import time
class OptimizedSerializationMixin:
"""Mixin for fast binary serialization."""
def to_msgpack(self) -> bytes:
"""Serialize to MessagePack format."""
# Get dict representation
data = self.dict()
# Add type hint for deserialization
data['__model__'] = self.__class__.__name__
# Use MessagePack for 3-5x faster serialization
return msgpack.packb(data, use_bin_type=True)
@classmethod
def from_msgpack(cls, data: bytes):
"""Deserialize from MessagePack format."""
unpacked = msgpack.unpackb(data, raw=False)
# Remove type hint
unpacked.pop('__model__', None)
# Create instance
return cls(**unpacked)
class FastModel(BaseModel, OptimizedSerializationMixin):
"""Model with optimized serialization."""
id: str
data: Dict[str, Any]
metadata: Dict[str, str]
# Performance comparison
def benchmark_serialization():
test_data = {
"id": "test-123",
"data": {"key": "value" * 100},
"metadata": {f"meta_{i}": f"value_{i}" for i in range(10)}
}
model = FastModel(**test_data)
# JSON serialization
start = time.time()
for _ in range(10000):
json_data = model.json()
FastModel.parse_raw(json_data)
json_time = time.time() - start
# MessagePack serialization
start = time.time()
for _ in range(10000):
msgpack_data = model.to_msgpack()
FastModel.from_msgpack(msgpack_data)
msgpack_time = time.time() - start
print(f"JSON: {json_time:.2f}s, MessagePack: {msgpack_time:.2f}s")
print(f"Speedup: {json_time/msgpack_time:.2f}x")

KEY INSIGHT: Serialization overhead hides in plain sight. Profile your state transitions — you may find 15-25% of response time is just encoding and decoding data between nodes.

Parallel Processing: Use the Graph#

Langgraph’s graph structure practically begs for parallel execution. Yet most teams run everything sequentially by default. Here are three patterns that unlock real concurrency.

Figure 2: Parallel Processing Patterns — Map-Reduce splits data for parallel processing then combines results. Fan-out/Fan-in distributes work to multiple service endpoints. Work-stealing dynamically rebalances load by letting idle workers pull tasks from busy ones.

Map-Reduce splits large datasets into chunks, processes them in parallel, then combines results:

from langgraph.graph import StateGraph
from typing import List, Dict, Any
import asyncio
class MapReduceNode:
"""Node that implements map-reduce pattern for parallel processing."""
def __init__(self, map_func: callable, reduce_func: callable):
self.map_func = map_func
self.reduce_func = reduce_func
async def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Execute map-reduce on input data."""
input_data = state.get('data', [])
# Split data into chunks for parallel processing
chunk_size = max(1, len(input_data) // asyncio.get_running_loop()._default_executor._max_workers)
chunks = [input_data[i:i + chunk_size] for i in range(0, len(input_data), chunk_size)]
# Map phase - process chunks in parallel
map_tasks = [
asyncio.create_task(self._map_chunk(chunk))
for chunk in chunks
]
mapped_results = await asyncio.gather(*map_tasks)
# Reduce phase - combine results
final_result = await self._reduce_results(mapped_results)
return {
'processed_data': final_result,
'chunks_processed': len(chunks)
}
async def _map_chunk(self, chunk: List[Any]) -> List[Any]:
"""Process a single chunk of data."""
loop = asyncio.get_running_loop()
# Run CPU-intensive map function in thread pool
return await loop.run_in_executor(
None,
lambda: [self.map_func(item) for item in chunk]
)
async def _reduce_results(self, mapped_results: List[List[Any]]) -> Any:
"""Combine mapped results."""
# Flatten results
all_results = []
for chunk_results in mapped_results:
all_results.extend(chunk_results)
# Apply reduce function
return self.reduce_func(all_results)
# Example usage in Langgraph
def create_parallel_workflow():
builder = StateGraph()
# Define map and reduce functions
def process_item(item):
# CPU-intensive processing
return {"id": item["id"], "score": calculate_score(item)}
def combine_scores(results):
# Aggregate scores
total_score = sum(r["score"] for r in results)
return {"average_score": total_score / len(results)}
# Add map-reduce node
builder.add_node(
"parallel_scoring",
MapReduceNode(process_item, combine_scores)
)
return builder.compile()

Fan-out/Fan-in fires requests to multiple services simultaneously and collects whatever comes back:

class FanOutFanInNode:
"""Node that fans out to multiple services and collects results."""
def __init__(self, service_configs: List[Dict[str, Any]]):
self.service_configs = service_configs
async def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Fan out requests to multiple services."""
query = state.get('query')
# Create tasks for each service
service_tasks = []
for config in self.service_configs:
task = asyncio.create_task(
self._call_service(config, query)
)
service_tasks.append(task)
# Wait for all services with timeout
results = await asyncio.gather(
*service_tasks,
return_exceptions=True
)
# Process results
successful_results = []
failed_services = []
for config, result in zip(self.service_configs, results):
if isinstance(result, Exception):
failed_services.append(config['name'])
else:
successful_results.append(result)
return {
'service_results': successful_results,
'failed_services': failed_services,
'success_rate': len(successful_results) / len(self.service_configs)
}
async def _call_service(self, config: Dict[str, Any], query: str):
"""Call a single service with timeout and retry."""
max_retries = config.get('max_retries', 3)
timeout = config.get('timeout', 5.0)
for attempt in range(max_retries):
try:
async with asyncio.timeout(timeout):
# Simulate service call
result = await self._make_request(
config['url'],
query
)
return result
except asyncio.TimeoutError:
if attempt == max_retries - 1:
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)

Database Integration and Caching#

Multi-Level Caching: The 90% Solution#

Caching delivered our single biggest performance win. A two-tier cache (in-memory L1 plus Redis L2) reduced database load by 90% and cut average response time in half.

Figure 3: Multi-Level Caching Architecture — L1 (in-memory) and L2 (distributed Redis) intercept requests before they reach the database. Separating read and write models allows optimized schemas, while an event stream enables eventual consistency between replicas.

Here is the full implementation of our multi-level cache with LRU eviction and a decorator for transparently caching Langgraph node results:

from typing import Any, Optional, Dict
import asyncio
from datetime import datetime, timedelta
import redis
import pickle
from functools import wraps
class MultiLevelCache:
"""High-performance multi-level caching system."""
def __init__(
self,
l1_max_size: int = 1000,
l1_ttl_seconds: int = 300,
redis_client: redis.Redis = None
):
# L1: In-memory LRU cache
self.l1_cache: Dict[str, tuple[Any, datetime]] = {}
self.l1_max_size = l1_max_size
self.l1_ttl = timedelta(seconds=l1_ttl_seconds)
self.l1_access_order: List[str] = []
# L2: Redis distributed cache
self.redis_client = redis_client or redis.Redis(
host='localhost',
port=6379,
decode_responses=False
)
# Metrics
self.metrics = {
'l1_hits': 0,
'l1_misses': 0,
'l2_hits': 0,
'l2_misses': 0
}
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache with multi-level lookup."""
# Check L1 cache
if key in self.l1_cache:
value, timestamp = self.l1_cache[key]
if datetime.now() - timestamp < self.l1_ttl:
self.metrics['l1_hits'] += 1
self._update_lru(key)
return value
else:
# Expired
del self.l1_cache[key]
self.metrics['l1_misses'] += 1
# Check L2 cache (Redis)
try:
redis_value = await asyncio.to_thread(
self.redis_client.get, key
)
if redis_value:
self.metrics['l2_hits'] += 1
value = pickle.loads(redis_value)
# Promote to L1
self._set_l1(key, value)
return value
except Exception as e:
print(f"Redis error: {e}")
self.metrics['l2_misses'] += 1
return None
async def set(
self,
key: str,
value: Any,
ttl_seconds: int = 3600
):
"""Set value in both cache levels."""
# Set in L1
self._set_l1(key, value)
# Set in L2 (Redis) asynchronously
try:
serialized = pickle.dumps(value)
await asyncio.to_thread(
self.redis_client.setex,
key,
ttl_seconds,
serialized
)
except Exception as e:
print(f"Redis write error: {e}")
def _set_l1(self, key: str, value: Any):
"""Set value in L1 cache with LRU eviction."""
# Evict if at capacity
if len(self.l1_cache) >= self.l1_max_size:
oldest = self.l1_access_order.pop(0)
del self.l1_cache[oldest]
self.l1_cache[key] = (value, datetime.now())
self._update_lru(key)
def _update_lru(self, key: str):
"""Update LRU access order."""
if key in self.l1_access_order:
self.l1_access_order.remove(key)
self.l1_access_order.append(key)
def get_hit_rate(self) -> Dict[str, float]:
"""Calculate cache hit rates."""
l1_total = self.metrics['l1_hits'] + self.metrics['l1_misses']
l2_total = self.metrics['l2_hits'] + self.metrics['l2_misses']
return {
'l1_hit_rate': self.metrics['l1_hits'] / l1_total if l1_total > 0 else 0,
'l2_hit_rate': self.metrics['l2_hits'] / l2_total if l2_total > 0 else 0,
'overall_hit_rate': (self.metrics['l1_hits'] + self.metrics['l2_hits']) /
(l1_total + l2_total) if (l1_total + l2_total) > 0 else 0
}
# Cache decorator for Langgraph nodes
def cached_node(cache: MultiLevelCache, ttl_seconds: int = 3600):
"""Decorator to cache Langgraph node results."""
def decorator(func):
@wraps(func)
async def wrapper(state: Dict[str, Any]) -> Dict[str, Any]:
# Generate cache key from state
cache_key = f"{func.__name__}:{hash(str(sorted(state.items())))}"
# Try cache first
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(state)
# Cache result
await cache.set(cache_key, result, ttl_seconds)
return result
return wrapper
return decorator
# Example usage
cache = MultiLevelCache()
@cached_node(cache, ttl_seconds=1800)
async def expensive_analysis_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""Node with expensive computation that benefits from caching."""
# Simulate expensive operation
await asyncio.sleep(2)
return {
'analysis_result': 'complex_computation_result',
'timestamp': datetime.now().isoformat()
}

State Persistence for Long-Running Workflows#

When workflows run for minutes or hours, you need durable state that survives crashes. Event sourcing gives you that durability plus a complete audit trail of every state change:

from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import json
class EventType(Enum):
WORKFLOW_STARTED = "workflow_started"
NODE_EXECUTED = "node_executed"
STATE_UPDATED = "state_updated"
WORKFLOW_COMPLETED = "workflow_completed"
ERROR_OCCURRED = "error_occurred"
class WorkflowEvent(BaseModel):
"""Immutable event representing a state change."""
event_id: str
workflow_id: str
event_type: EventType
timestamp: datetime
data: Dict[str, Any]
node_name: Optional[str] = None
class EventSourcingStateManager:
"""Manage workflow state using event sourcing."""
def __init__(self, event_store):
self.event_store = event_store
self._state_cache = {}
async def save_event(self, event: WorkflowEvent):
"""Persist an event to the event store."""
await self.event_store.append(event)
# Invalidate cache
if event.workflow_id in self._state_cache:
del self._state_cache[event.workflow_id]
async def reconstruct_state(self, workflow_id: str) -> Dict[str, Any]:
"""Reconstruct current state from events."""
# Check cache first
if workflow_id in self._state_cache:
return self._state_cache[workflow_id]
# Replay events
events = await self.event_store.get_events(workflow_id)
state = {}
for event in events:
state = self._apply_event(state, event)
# Cache reconstructed state
self._state_cache[workflow_id] = state
return state
def _apply_event(
self,
state: Dict[str, Any],
event: WorkflowEvent
) -> Dict[str, Any]:
"""Apply an event to the current state."""
if event.event_type == EventType.WORKFLOW_STARTED:
return event.data
elif event.event_type == EventType.STATE_UPDATED:
# Merge state updates
new_state = state.copy()
new_state.update(event.data)
return new_state
elif event.event_type == EventType.NODE_EXECUTED:
# Track node execution
if 'executed_nodes' not in state:
state['executed_nodes'] = []
state['executed_nodes'].append({
'node': event.node_name,
'timestamp': event.timestamp.isoformat(),
'result': event.data
})
return state
return state
async def get_workflow_history(
self,
workflow_id: str
) -> List[WorkflowEvent]:
"""Get complete workflow history."""
return await self.event_store.get_events(workflow_id)
# Integrate with Langgraph
class EventSourcingWorkflow:
"""Langgraph workflow with event sourcing."""
def __init__(self, graph: StateGraph, state_manager: EventSourcingStateManager):
self.graph = graph
self.state_manager = state_manager
async def execute(
self,
workflow_id: str,
initial_state: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute workflow with event sourcing."""
# Record workflow start
await self.state_manager.save_event(
WorkflowEvent(
event_id=generate_id(),
workflow_id=workflow_id,
event_type=EventType.WORKFLOW_STARTED,
timestamp=datetime.now(),
data=initial_state
)
)
# Execute graph with event recording
state = initial_state
for node_name in self.graph.execution_order:
# Execute node
node_func = self.graph.nodes[node_name]
result = await node_func(state)
# Record execution
await self.state_manager.save_event(
WorkflowEvent(
event_id=generate_id(),
workflow_id=workflow_id,
event_type=EventType.NODE_EXECUTED,
timestamp=datetime.now(),
node_name=node_name,
data=result
)
)
# Update state
state.update(result)
return state

Benchmarking: Measure Before You Cut#

Building a Benchmarking Framework#

Our first optimization attempt was a disaster. We “optimized” the wrong thing, made the system 15% slower, and did not notice for a week because we had no benchmarks. After that, we built a proper measurement framework before touching another line of production code.

Figure 4: Iterative Benchmarking Process — Starting with baseline establishment, the cycle moves through hypothesis, testing (load tests, A/B tests, profiling), analysis, optimization, and verification. Every change gets measured against the baseline before it ships.

Here is the benchmarking suite we now run before and after every optimization:

import time
import asyncio
from dataclasses import dataclass, field
from typing import List, Dict, Callable
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import psutil
import matplotlib.pyplot as plt
@dataclass
class BenchmarkResult:
"""Results from a benchmark run."""
operation: str
duration_seconds: float
throughput_ops_per_sec: float
latency_p50_ms: float
latency_p95_ms: float
latency_p99_ms: float
memory_used_mb: float
cpu_percent: float
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs."""
name: str
warm_up_iterations: int = 10
test_iterations: int = 100
concurrent_workers: int = 1
duration_seconds: Optional[int] = None
class LanggraphBenchmark:
"""Comprehensive benchmark suite for Langgraph + Pydantic AI."""
def __init__(self):
self.results: List[BenchmarkResult] = []
self.process = psutil.Process()
async def benchmark_workflow(
self,
workflow: StateGraph,
test_states: List[Dict[str, Any]],
config: BenchmarkConfig
) -> BenchmarkResult:
"""Benchmark a complete workflow."""
# Warm-up phase
print(f"Warming up {config.name}...")
for i in range(config.warm_up_iterations):
await workflow.ainvoke(test_states[i % len(test_states)])
# Reset metrics
latencies = []
start_time = time.time()
operations_completed = 0
# Initial resource snapshot
initial_memory = self.process.memory_info().rss / 1024 / 1024
self.process.cpu_percent() # Initialize CPU monitoring
# Run benchmark
print(f"Running {config.name} benchmark...")
if config.duration_seconds:
# Time-based benchmark
end_time = start_time + config.duration_seconds
while time.time() < end_time:
operation_start = time.time()
state = test_states[operations_completed % len(test_states)]
await workflow.ainvoke(state)
latency = (time.time() - operation_start) * 1000
latencies.append(latency)
operations_completed += 1
else:
# Iteration-based benchmark
for i in range(config.test_iterations):
operation_start = time.time()
state = test_states[i % len(test_states)]
await workflow.ainvoke(state)
latency = (time.time() - operation_start) * 1000
latencies.append(latency)
operations_completed += 1
# Calculate metrics
total_duration = time.time() - start_time
# Sort for percentiles
latencies.sort()
result = BenchmarkResult(
operation=config.name,
duration_seconds=total_duration,
throughput_ops_per_sec=operations_completed / total_duration,
latency_p50_ms=latencies[int(len(latencies) * 0.50)],
latency_p95_ms=latencies[int(len(latencies) * 0.95)],
latency_p99_ms=latencies[int(len(latencies) * 0.99)],
memory_used_mb=self.process.memory_info().rss / 1024 / 1024 - initial_memory,
cpu_percent=self.process.cpu_percent()
)
self.results.append(result)
return result
async def benchmark_parallel_execution(
self,
workflow: StateGraph,
test_states: List[Dict[str, Any]],
worker_counts: List[int] = [1, 2, 4, 8, 16]
) -> Dict[int, BenchmarkResult]:
"""Benchmark workflow with different parallelism levels."""
results = {}
for worker_count in worker_counts:
config = BenchmarkConfig(
name=f"Parallel-{worker_count}",
test_iterations=100,
concurrent_workers=worker_count
)
# Run concurrent workflows
async def run_worker(worker_id: int):
worker_latencies = []
for i in range(config.test_iterations // worker_count):
start = time.time()
await workflow.ainvoke(test_states[i % len(test_states)])
worker_latencies.append((time.time() - start) * 1000)
return worker_latencies
start = time.time()
all_latencies = await asyncio.gather(*[
run_worker(i) for i in range(worker_count)
])
duration = time.time() - start
# Flatten latencies
latencies = []
for worker_latencies in all_latencies:
latencies.extend(worker_latencies)
latencies.sort()
results[worker_count] = BenchmarkResult(
operation=f"Parallel-{worker_count}",
duration_seconds=duration,
throughput_ops_per_sec=config.test_iterations / duration,
latency_p50_ms=latencies[int(len(latencies) * 0.50)],
latency_p95_ms=latencies[int(len(latencies) * 0.95)],
latency_p99_ms=latencies[int(len(latencies) * 0.99)],
memory_used_mb=0, # Not measured for parallel
cpu_percent=0
)
return results
def generate_report(self, output_file: str = "benchmark_report.png"):
"""Generate visual benchmark report."""
if not self.results:
print("No benchmark results to report")
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Throughput comparison
operations = [r.operation for r in self.results]
throughputs = [r.throughput_ops_per_sec for r in self.results]
ax1.bar(operations, throughputs, color='blue', alpha=0.7)
ax1.set_xlabel('Operation')
ax1.set_ylabel('Throughput (ops/sec)')
ax1.set_title('Throughput Comparison')
ax1.tick_params(axis='x', rotation=45)
# Latency percentiles
p50s = [r.latency_p50_ms for r in self.results]
p95s = [r.latency_p95_ms for r in self.results]
p99s = [r.latency_p99_ms for r in self.results]
x = np.arange(len(operations))
width = 0.25
ax2.bar(x - width, p50s, width, label='P50', alpha=0.7)
ax2.bar(x, p95s, width, label='P95', alpha=0.7)
ax2.bar(x + width, p99s, width, label='P99', alpha=0.7)
ax2.set_xlabel('Operation')
ax2.set_ylabel('Latency (ms)')
ax2.set_title('Latency Percentiles')
ax2.set_xticks(x)
ax2.set_xticklabels(operations, rotation=45)
ax2.legend()
# Memory usage
memory_usage = [r.memory_used_mb for r in self.results]
ax3.bar(operations, memory_usage, color='green', alpha=0.7)
ax3.set_xlabel('Operation')
ax3.set_ylabel('Memory Used (MB)')
ax3.set_title('Memory Usage')
ax3.tick_params(axis='x', rotation=45)
# CPU usage
cpu_usage = [r.cpu_percent for r in self.results]
ax4.bar(operations, cpu_usage, color='red', alpha=0.7)
ax4.set_xlabel('Operation')
ax4.set_ylabel('CPU Usage (%)')
ax4.set_title('CPU Utilization')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(output_file)
plt.close()
# Print summary
print("\nBenchmark Summary:")
print("-" * 80)
for result in self.results:
print(f"\n{result.operation}:")
print(f" Throughput: {result.throughput_ops_per_sec:.2f} ops/sec")
print(f" Latency P50: {result.latency_p50_ms:.2f}ms")
print(f" Latency P95: {result.latency_p95_ms:.2f}ms")
print(f" Latency P99: {result.latency_p99_ms:.2f}ms")
print(f" Memory Used: {result.memory_used_mb:.2f}MB")
print(f" CPU Usage: {result.cpu_percent:.1f}%")

Production Monitoring: Keep Your Eyes Open#

Benchmarks tell you where you started. Production monitoring tells you where you are right now. We use Prometheus metrics on every workflow and node, so regressions surface in hours, not weeks:

from prometheus_client import Counter, Histogram, Gauge, Summary
import time
from functools import wraps
# Define metrics
workflow_duration = Histogram(
'langgraph_workflow_duration_seconds',
'Time spent processing workflow',
['workflow_name', 'status']
)
node_duration = Histogram(
'langgraph_node_duration_seconds',
'Time spent in each node',
['workflow_name', 'node_name']
)
validation_errors = Counter(
'pydantic_validation_errors_total',
'Total validation errors',
['model_name', 'field_name']
)
active_workflows = Gauge(
'langgraph_active_workflows',
'Number of currently active workflows'
)
cache_hit_rate = Gauge(
'cache_hit_rate',
'Cache hit rate',
['cache_level']
)
def monitor_workflow(workflow_name: str):
"""Decorator to monitor workflow execution."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
active_workflows.inc()
start_time = time.time()
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
raise
finally:
duration = time.time() - start_time
workflow_duration.labels(
workflow_name=workflow_name,
status=status
).observe(duration)
active_workflows.dec()
return wrapper
return decorator
def monitor_node(workflow_name: str, node_name: str):
"""Decorator to monitor individual node execution."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
node_duration.labels(
workflow_name=workflow_name,
node_name=node_name
).observe(duration)
return wrapper
return decorator
# Integrate with Pydantic validation
from pydantic import ValidationError, BaseModel
class MonitoredModel(BaseModel):
"""Base model with validation monitoring."""
@classmethod
def parse_obj(cls, obj):
try:
return super().parse_obj(obj)
except ValidationError as e:
# Record validation errors
for error in e.errors():
validation_errors.labels(
model_name=cls.__name__,
field_name=error['loc'][0] if error['loc'] else 'unknown'
).inc()
raise
# Example monitored workflow
@monitor_workflow('document_processing')
async def process_document_monitored(document: Dict[str, Any]):
"""Example workflow with full monitoring."""
state = {'document': document}
# Each node is monitored
@monitor_node('document_processing', 'validation')
async def validate_node(state):
model = MonitoredModel.parse_obj(state['document'])
return {'validated': True}
@monitor_node('document_processing', 'analysis')
async def analysis_node(state):
# Simulate analysis
await asyncio.sleep(0.1)
return {'analysis_complete': True}
# Execute nodes
state.update(await validate_node(state))
state.update(await analysis_node(state))
return state

Putting It All Together: A Research Platform at Scale#

From 10 Users to 10,000#

Here is where all the techniques converge. We built an AI research platform that started as a prototype handling a handful of queries. After applying multi-level caching, parallel source searching, lazy validation, and rate limiting, it handles 10,000 concurrent users with p95 latency under 2 seconds.

The full implementation below shows how each optimization layer integrates into a single coherent system:

from langgraph.graph import StateGraph
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
import asyncio
from datetime import datetime
# Domain models
class ResearchQuery(BaseModel):
"""User research query with validation."""
query_id: str
user_id: str
question: str = Field(..., min_length=10, max_length=500)
max_sources: int = Field(default=10, ge=1, le=50)
domains: List[str] = Field(default_factory=list)
class ResearchResult(BaseModel):
"""Structured research output."""
query_id: str
findings: List[Dict[str, Any]]
summary: str
confidence_score: float = Field(ge=0, le=1)
sources_used: int
processing_time_ms: float
# Scalable research platform
class ScalableResearchPlatform:
def __init__(
self,
cache: MultiLevelCache,
max_concurrent_queries: int = 100
):
self.cache = cache
self.semaphore = asyncio.Semaphore(max_concurrent_queries)
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the research workflow graph."""
builder = StateGraph()
# Define nodes with caching and parallel execution
builder.add_node(
"parse_query",
cached_node(self.cache, ttl_seconds=3600)(self._parse_query)
)
builder.add_node(
"search_sources",
self._search_sources_parallel
)
builder.add_node(
"analyze_findings",
cached_node(self.cache, ttl_seconds=1800)(self._analyze_findings)
)
builder.add_node(
"generate_summary",
self._generate_summary
)
# Define flow
builder.set_entry_point("parse_query")
builder.add_edge("parse_query", "search_sources")
builder.add_edge("search_sources", "analyze_findings")
builder.add_edge("analyze_findings", "generate_summary")
return builder.compile()
async def process_query(self, query: ResearchQuery) -> ResearchResult:
"""Process a research query with rate limiting."""
async with self.semaphore:
start_time = time.time()
# Initialize state
state = {
'query': query.dict(),
'start_time': start_time
}
try:
# Execute workflow
result = await self.workflow.ainvoke(state)
# Build response
return ResearchResult(
query_id=query.query_id,
findings=result['findings'],
summary=result['summary'],
confidence_score=result['confidence_score'],
sources_used=len(result['findings']),
processing_time_ms=(time.time() - start_time) * 1000
)
except Exception as e:
# Log error and return partial result
return ResearchResult(
query_id=query.query_id,
findings=[],
summary=f"Error processing query: {str(e)}",
confidence_score=0.0,
sources_used=0,
processing_time_ms=(time.time() - start_time) * 1000
)
async def _parse_query(self, state: Dict) -> Dict:
"""Parse and enhance the query."""
query = ResearchQuery(**state['query'])
# Extract key terms and enhance query
enhanced_terms = await self._extract_key_terms(query.question)
return {
'parsed_query': query.dict(),
'search_terms': enhanced_terms
}
async def _search_sources_parallel(self, state: Dict) -> Dict:
"""Search multiple sources in parallel."""
search_terms = state['search_terms']
max_sources = state['parsed_query']['max_sources']
# Create search tasks for different sources
search_tasks = []
# Academic sources
search_tasks.append(
self._search_academic(search_terms, max_sources // 3)
)
# News sources
search_tasks.append(
self._search_news(search_terms, max_sources // 3)
)
# General web
search_tasks.append(
self._search_web(search_terms, max_sources // 3)
)
# Execute all searches in parallel
all_results = await asyncio.gather(*search_tasks)
# Combine and deduplicate results
findings = []
seen_urls = set()
for results in all_results:
for result in results:
if result['url'] not in seen_urls:
findings.append(result)
seen_urls.add(result['url'])
return {'findings': findings[:max_sources]}
async def _analyze_findings(self, state: Dict) -> Dict:
"""Analyze findings for relevance and quality."""
findings = state['findings']
# Score each finding
scored_findings = []
for finding in findings:
score = await self._score_relevance(
finding,
state['parsed_query']['question']
)
finding['relevance_score'] = score
scored_findings.append(finding)
# Sort by relevance
scored_findings.sort(
key=lambda x: x['relevance_score'],
reverse=True
)
# Calculate confidence
avg_score = sum(f['relevance_score'] for f in scored_findings) / len(scored_findings)
return {
'analyzed_findings': scored_findings,
'confidence_score': avg_score
}
async def _generate_summary(self, state: Dict) -> Dict:
"""Generate final summary from findings."""
findings = state['analyzed_findings']
query = state['parsed_query']['question']
# Use only top findings for summary
top_findings = findings[:5]
# Generate summary (simplified for example)
summary_points = []
for finding in top_findings:
summary_points.append(
f"- {finding['title']}: {finding['snippet']}"
)
summary = f"Based on {len(findings)} sources, here are the key findings:\n"
summary += "\n".join(summary_points)
return {
'summary': summary,
'findings': findings
}
# Deployment configuration for scale
async def deploy_research_platform():
"""Deploy the research platform with scaling configurations."""
# Initialize cache with Redis cluster
cache = MultiLevelCache(
l1_max_size=10000, # Large L1 for hot queries
l1_ttl_seconds=300,
redis_client=redis.RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": 6379},
{"host": "redis-2", "port": 6379},
{"host": "redis-3", "port": 6379}
]
)
)
# Create platform instance
platform = ScalableResearchPlatform(
cache=cache,
max_concurrent_queries=100
)
# Set up monitoring
from aiohttp import web
from prometheus_client import generate_latest
async def metrics(request):
return web.Response(
body=generate_latest(),
content_type="text/plain"
)
# Create web app
app = web.Application()
app.router.add_get('/metrics', metrics)
# Research endpoint
async def research_endpoint(request):
data = await request.json()
query = ResearchQuery(**data)
result = await platform.process_query(query)
return web.json_response(result.dict())
app.router.add_post('/research', research_endpoint)
# Run with gunicorn for production
return app

The combined impact of these techniques: multi-level caching reduced database load by 90%, parallel source searching improved response time by 3x, lazy validation and object pooling cut memory usage by 40%, and rate limiting prevented cascade failures under spike traffic.

What Worked, What Hurt, and What We Would Do Differently#

The Wins#

After applying these optimizations across multiple production deployments, four results stand out:

  1. 10-100x throughput improvement: Parallelization and caching together transformed systems that could barely handle 10 users into platforms serving thousands concurrently.
  2. Predictable tail latency: Rate limiting and resource management eliminated the death spirals that plague unoptimized systems under load.
  3. 50-70% cost reduction: Optimized serialization and caching slashed infrastructure spend compared to the naive “just add servers” approach.
  4. Proactive scaling: Comprehensive monitoring meant we spotted capacity issues days before users did.

The Pain#

Honesty matters here. Every one of these optimizations added complexity.

Your elegant prototype becomes a distributed system with cache invalidation bugs, distributed tracing requirements, and operational overhead you did not sign up for. When something breaks in a highly optimized system, finding the root cause is detective work. Caches improve speed but risk serving stale data. Parallelization increases throughput but makes error handling harder. The operational burden of dashboards, alerting, and cache invalidation strategies is real and ongoing.

KEY INSIGHT: Every optimization is a trade-off. Add caching only after you have proven you need it, and always ship the invalidation strategy alongside the cache itself.

What Comes Next#

Emerging Patterns Worth Watching#

Four trends are reshaping how we scale agent systems:

  1. Serverless agent architectures: Deploying individual Langgraph nodes as serverless functions for automatic scaling and pay-per-use pricing.
  2. Edge computing for agents: Running lighter agent workloads closer to users to cut latency and reduce centralized load.
  3. Adaptive optimization: Systems that auto-tune caching, parallelization, and resource allocation based on observed workload patterns.
  4. Federated agent networks: Distributed agent systems that collaborate across organizational boundaries while preserving data privacy.

Five Things You Can Do Monday Morning#

  1. Profile first, optimize second: Run cProfile and memory_profiler on your hottest workflow. Find the actual bottleneck before you touch a line of code.
  2. Start with caching: A well-designed cache delivers the biggest return per hour of engineering time. Begin with simple in-memory caching and evolve from there.
  3. Build observability from day one: Add Prometheus metrics to your workflows now, not after the first outage. You cannot fix what you cannot see.
  4. Go async everywhere: Langgraph’s async support is powerful. Use it for every I/O-bound operation, especially LLM calls.
  5. Load test before production, not after: Use Locust or K6 to hammer your workflows weekly. Discovering scaling issues in CI is cheap. Discovering them in production is expensive.

Conclusion#

Scaling Langgraph and Pydantic AI systems from prototype to production requires methodical work across architecture, optimization, and monitoring. Multi-level caching, parallel processing patterns, lazy validation, and event-sourced state management form a toolkit that can take a struggling prototype to a system handling thousands of concurrent users.

The core lesson from every deployment we have done: scaling is not about handling more load. Scaling is about maintaining reliability, type safety, and predictability while meeting performance targets. Profile your bottlenecks, apply targeted optimizations, measure the results, and iterate. The frameworks give you the building blocks. The architecture decisions determine whether those blocks hold up under real-world pressure.

References#

[1] Samuel Colvin, “Pydantic V2 Performance,” Pydantic Documentation, https://docs.pydantic.dev/latest/blog/pydantic-v2-performance/ (2024).

[2] Langchain Team, “Langgraph - Building Stateful Multi-Agent Systems,” https://python.langchain.com/docs/langgraph (2024).

[3] Sebastian Ramirez, “From Idea to Production with FastAPI and Pydantic,” https://fastapi.tiangolo.com/advanced/performance/ (2023).

[4] E. Woods and R. Socher, “An Empirical Study of LLM Orchestration,” arXiv preprint arXiv:2304.12987 (2024).

[5] Martin Fowler, “Command Query Responsibility Segregation (CQRS),” https://martinfowler.com/bliki/CQRS.html (2023).

[6] Python Software Foundation, “asyncio — Asynchronous I/O,” https://docs.python.org/3/library/asyncio.html (2024).

[7] Redis Labs, “Caching Strategies for AI Workloads,” https://redis.com/solutions/use-cases/caching/ (2024).

[8] H. Chase, “Building Multi-Agent Systems with Langgraph,” Langchain Blog, https://blog.langchain.dev/introducing-langgraph/ (2024).

[9] MongoDB, “Document Database Integration with Pydantic,” https://www.mongodb.com/developer/languages/python/python-quickstart-fastapi/ (2024).

[10] Amazon Web Services, “Best Practices for Workflow Orchestration,” https://aws.amazon.com/step-functions/best-practices/ (2024).

[11] Google Cloud, “Scaling AI Workloads: Architecture and Best Practices,” https://cloud.google.com/architecture/scaling-ai-workloads (2024).

[12] Microsoft Research, “Distributed Systems for AI Workflows,” https://www.microsoft.com/en-us/research/project/distributed-systems-for-ai/ (2024).

[13] Uber Engineering, “Scaling Machine Learning at Uber,” https://eng.uber.com/scaling-machine-learning/ (2023).

[14] Netflix Technology Blog, “Optimizing Content Delivery with AI,” https://netflixtechblog.com/optimizing-content-delivery-with-ai (2024).

[15] Anthropic, “Constitutional AI: Harmlessness from AI Feedback,” https://www.anthropic.com/research/constitutional-ai-harmlessness-from-ai-feedback (2023).

[16] MIT CSAIL, “Memory Management for Large Language Models,” https://www.csail.mit.edu/research/memory-management-large-language-models (2024).

[17] Snowflake, “Data Processing at Scale: Lessons Learned,” https://www.snowflake.com/blog/data-processing-at-scale/ (2024).

[18] Jina AI, “Scaling Challenges in LLM Applications,” https://jina.ai/blog/scaling-challenges-in-llm-applications/ (2023).

[19] P. Abbeel, “Visual Planning and Acting in Multi-Agent Systems,” Berkeley AI Research, https://bair.berkeley.edu/blog/2023/06/02/visual-planning/ (2023).

[20] N. Lawrence, “Intelligent Agents and Multiagent Systems,” https://inverseprobability.com/2023/08/30/intelligent-agents (2023).

Scaling LangGraph and Pydantic AI Systems: From Prototype to Production
https://dotzlaw.com/insights/scaling-langgraph-and-pydantic-ai-systems-from-prototype-to-production/
Author
Gary Dotzlaw
Published at
2025-06-15
License
CC BY-NC-SA 4.0
← Back to Insights