We cut our GraphRAG ingestion time from 56 hours to 4.1 hours. Five optimization techniques, applied together, delivered a 13x speedup on a 10,000-document corpus. Node creation jumped from 50-100 nodes/second to 2,000-5,000 nodes/second. Deadlocks dropped from a 15-30% failure rate to zero. And the best part: each technique compounds the others, so the combined result far exceeds what any single fix delivers alone.
The problem was familiar. We had a GraphRAG pipeline that worked beautifully on 50 documents and completely fell apart at 1,000. Transaction timeouts stacked up. CPU cores sat idle while sequential operations crawled forward one chunk at a time. The knowledge graph we were building had incredible potential for contextual AI retrieval, but “come back next week when ingestion finishes” is not a viable product pitch.
We tried the obvious fixes first: bigger batch sizes, more threads, faster hardware. None of it worked. Bigger batches triggered deadlocks. More threads made the deadlocks worse. Faster hardware just ran into the same bottlenecks at slightly higher throughput. The real problem was architectural, and solving it required five distinct techniques that each target a different layer of the pipeline.
Here is what those five techniques are, why they work, and how to implement each one.
How GraphRAG Works (and Where It Breaks)
The Two-Database Architecture
GraphRAG combines vector search with graph traversal. Traditional RAG finds semantically similar documents. GraphRAG does that, then follows the relationships between entities to pull in connected context that pure vector similarity would never surface.
The difference is concrete. Vector search for “machine learning frameworks” returns documents mentioning those words. GraphRAG traces connections like “TensorFlow, developed by Google, who also created JAX, which competes with PyTorch.” You get the complete picture instead of isolated fragments.
# Traditional RAG approachdef traditional_rag_query(query, vector_db): """Find semantically similar documents.""" # Convert query to embedding query_embedding = embed_text(query)
# Find similar documents similar_docs = vector_db.similarity_search(query_embedding, k=5)
# Return content for LLM context return [doc.content for doc in similar_docs]
# GraphRAG approachdef graphrag_query(query, vector_db, graph_db): """Find semantically similar content AND related information.""" # Step 1: Find semantically similar content query_embedding = embed_text(query) entry_points = vector_db.similarity_search(query_embedding, k=3)
# Step 2: Explore relationships from those entry points enriched_context = [] for doc in entry_points: # Get entities from this document entities = doc.metadata.get('entities', [])
# Traverse graph to find related information for entity in entities: related = graph_db.query(""" MATCH (e:Entity {id: $entity_id})-[r]-(related) RETURN related, r LIMIT 10 """, entity_id=entity) enriched_context.extend(related)
# Step 3: Combine semantic and relational context return merge_contexts(entry_points, enriched_context)
Figure 1: GraphRAG system architecture. Documents flow through dual pipelines for vector embedding and entity extraction, feeding separate but connected databases. The retrieval engine combines semantic similarity with graph traversal to assemble richer context for the LLM.
Where the Pipeline Stalls
Every component in that architecture can become a bottleneck. Here is what happens when you process just 1,000 technical documents with a naive implementation:
| Operation | Unoptimized Performance | Volume |
|---|---|---|
| Document Chunking | ~20 seconds per document | 20,000+ chunks generated |
| Entity Extraction | ~5 seconds per chunk | 100,000+ entities extracted |
| Relationship Creation | ~0.5 seconds per relationship | 200,000+ relationships |
| Vector Embedding | ~0.1 seconds per chunk | 20,000+ embeddings |
| Total Processing Time | ~25-30 hours | For just 1,000 documents |
We hit exactly these numbers on our first real deployment. Processing a 10,000-document repository was projected at nearly two weeks. That is when we realized the naive implementation simply does not scale.
The bottlenecks compound in frustrating ways:
- Chunking inefficiency: Fixed-size chunking breaks documents at arbitrary points, creating more chunks than necessary and destroying semantic coherence.
- Sequential processing: Each operation waits for the previous one to complete, leaving your multi-core processor mostly idle.
- Database transaction overhead: Creating entities and relationships one at a time generates thousands of individual transactions, each with its own network round-trip and locking cost.
- LLM API bottlenecks: Extracting entities from one chunk at a time means thousands of API calls, each with latency overhead.
- Lock contention: As your graph grows, multiple operations trying to update the same nodes create deadlocks and failed transactions.
We addressed each of these systematically.
Technique 1: Semantic-Aware Chunking
Why Dumb Chunking Costs You Twice
Fixed-size chunking treats documents like strings of characters to be chopped into equal pieces. But documents have structure, meaning, and natural boundaries. When you break a document in the middle of a sentence or separate a code example from its explanation, you pay twice: once in wasted processing (more chunks to handle) and again in degraded quality (the LLM extracts worse entities from broken context).

Figure 2: Semantic chunking vs. fixed-size chunking. Fixed-size splitting fragments code blocks and separates related content arbitrarily. Semantic chunking respects natural document boundaries, keeping code examples intact and preserving logical flow. The result is fewer, higher-quality chunks.
Building a Structure-Aware Chunker
We built a chunker that identifies document structure first, then splits along natural boundaries. Code blocks stay intact. Headers mark section transitions. Paragraphs group together up to a configurable maximum size, with sentence-level splitting as a fallback for oversized paragraphs.
import refrom typing import List, Tuplefrom dataclasses import dataclass
@dataclassclass Chunk: content: str chunk_type: str # 'prose', 'code', 'table', 'header' metadata: dict
class SemanticChunker: """ Semantic-aware document chunker that preserves document structure and creates coherent chunks for optimal GraphRAG processing. """
def __init__(self, min_chunk_size: int = 100, max_chunk_size: int = 1500, overlap_size: int = 50): self.min_chunk_size = min_chunk_size self.max_chunk_size = max_chunk_size self.overlap_size = overlap_size
def chunk_document(self, text: str) -> List[Chunk]: """ Process a document into semantically coherent chunks. """ chunks = []
# Step 1: Identify document structure sections = self._split_by_headers(text)
for section_header, section_content in sections: # Step 2: Process each section based on content type section_chunks = self._process_section(section_header, section_content) chunks.extend(section_chunks)
# Step 3: Add overlap for context continuity chunks = self._add_overlap(chunks)
return chunks
def _split_by_headers(self, text: str) -> List[Tuple[str, str]]: """Split document by markdown headers while preserving hierarchy.""" # Pattern matches h1-h6 headers header_pattern = r'^(#{1,6})\s+(.+)$'
sections = [] current_header = "Document Start" current_content = []
for line in text.split('\n'): header_match = re.match(header_pattern, line)
if header_match: # Save previous section if current_content: sections.append((current_header, '\n'.join(current_content)))
# Start new section current_header = line current_content = [] else: current_content.append(line)
# Don't forget the last section if current_content: sections.append((current_header, '\n'.join(current_content)))
return sections
def _process_section(self, header: str, content: str) -> List[Chunk]: """Process a section based on its content type.""" chunks = []
# Extract code blocks first (they should remain intact) code_blocks = self._extract_code_blocks(content) remaining_content = content
for code_block in code_blocks: # Replace code block with placeholder placeholder = f"{len(chunks)}>" remaining_content = remaining_content.replace(code_block, placeholder, 1)
# Create code chunk chunks.append(Chunk( content=code_block, chunk_type='code', metadata={'header': header} ))
# Process remaining prose content prose_chunks = self._chunk_prose(remaining_content, header)
# Merge chunks back in order final_chunks = [] prose_idx = 0
for part in remaining_content.split(''): if prose_idx < len(prose_chunks): final_chunks.append(prose_chunks[prose_idx]) prose_idx += 1
# Check if this part references a code block if part.startswith(str(len(final_chunks) - 1) + '>'): code_idx = int(part.split('>')[0]) if code_idx < len(chunks): final_chunks.append(chunks[code_idx])
return final_chunks
def _chunk_prose(self, text: str, header: str) -> List[Chunk]: """Chunk prose content at natural boundaries.""" chunks = []
# First, try to split by paragraphs paragraphs = text.split('\n\n') current_chunk = [] current_size = 0
for paragraph in paragraphs: paragraph_size = len(paragraph)
# If adding this paragraph exceeds max size, finalize current chunk if current_size + paragraph_size > self.max_chunk_size and current_chunk: chunks.append(Chunk( content='\n\n'.join(current_chunk), chunk_type='prose', metadata={'header': header} )) current_chunk = [] current_size = 0
# If a single paragraph is too large, split by sentences if paragraph_size > self.max_chunk_size: sentence_chunks = self._split_by_sentences(paragraph, header) chunks.extend(sentence_chunks) else: current_chunk.append(paragraph) current_size += paragraph_size + 2 # +2 for \n\n
# Don't forget the last chunk if current_chunk: chunks.append(Chunk( content='\n\n'.join(current_chunk), chunk_type='prose', metadata={'header': header} ))
return chunksThe chunker adapts to whatever document structure it encounters. Technical documentation with lots of code examples? Those examples stay intact. Research papers with clear section boundaries? It respects those divisions. The chunks make sense both to humans and to the LLMs that will process them.
What Semantic Chunking Actually Delivers
We benchmarked semantic chunking against fixed-size chunking across our test corpus:
# Benchmarking semantic vs. fixed-size chunkingdef benchmark_chunking_approaches(documents): """Compare performance and quality metrics between chunking approaches."""
results = { 'fixed_size': {'chunks': 0, 'extraction_accuracy': 0, 'processing_time': 0}, 'semantic': {'chunks': 0, 'extraction_accuracy': 0, 'processing_time': 0} }
for doc in documents: # Fixed-size chunking start_time = time.time() fixed_chunks = simple_chunk(doc, chunk_size=1000) fixed_entities = extract_entities(fixed_chunks) results['fixed_size']['processing_time'] += time.time() - start_time results['fixed_size']['chunks'] += len(fixed_chunks)
# Semantic chunking start_time = time.time() semantic_chunks = semantic_chunker.chunk_document(doc) semantic_entities = extract_entities(semantic_chunks) results['semantic']['processing_time'] += time.time() - start_time results['semantic']['chunks'] += len(semantic_chunks)
# Calculate improvements chunk_reduction = 1 - (results['semantic']['chunks'] / results['fixed_size']['chunks']) time_reduction = 1 - (results['semantic']['processing_time'] / results['fixed_size']['processing_time'])
print(f"Chunk Reduction: {chunk_reduction:.1%}") print(f"Processing Time Reduction: {time_reduction:.1%}")
return resultsThe numbers were consistent across runs:
- 25-40% fewer chunks while maintaining complete information coverage
- 30-45% faster overall processing because fewer chunks means fewer LLM calls, fewer embeddings, fewer database writes
- 20-35% improvement in entity extraction accuracy because the LLM sees coherent context instead of broken fragments
- Better vector embeddings that capture actual document meaning instead of arbitrary slices
KEY INSIGHT: Semantic chunking is a force multiplier. Every downstream operation benefits from better input, so improving your chunker delivers compounding returns across the entire pipeline.
Technique 2: Batch Database Operations
The Transaction Tax You Did Not Know You Were Paying
Every individual node or relationship creation in Neo4j triggers a cascade of overhead: network communication, transaction management, lock acquisition, commit, lock release, connection return. The actual write takes 1-2ms. The overhead around it takes 20-90ms. That is a 95%+ tax on every operation.
# The naive approach - what NOT to dodef create_entities_naive(entities, neo4j_driver): """WARNING: This approach will destroy your performance at scale.""" created_count = 0
with neo4j_driver.session() as session: for entity in entities: # Each iteration creates a new transaction! result = session.run(""" CREATE (n:Entity {id: $id, name: $name, type: $type}) RETURN n """, id=entity.id, name=entity.name, type=entity.type)
created_count += 1
return created_count
# What's really happening behind the scenes:# 1. Open network connection (if pooled: ~1ms, if not: ~10-50ms)# 2. Begin transaction (~5-10ms)# 3. Acquire locks (~1-5ms)# 4. Execute query (~1-2ms) <-- The actual work!# 5. Commit transaction (~10-20ms)# 6. Release locks (~1-2ms)# 7. Close connection/return to pool (~1ms)# Total: ~20-90ms for 1-2ms of actual work!Batch Processing with Adaptive Sizing
The fix is to amortize that overhead across hundreds or thousands of operations per transaction. We built a batch processor that uses Neo4j’s UNWIND operator to process entire batches in a single transaction, with adaptive sizing that automatically adjusts based on success rates.
from typing import List, Dict, Anyimport loggingfrom neo4j import GraphDatabasefrom neo4j.exceptions import TransientError, SessionExpired
class OptimizedNeo4jBatchProcessor: """ High-performance batch processor for Neo4j with adaptive sizing and comprehensive error handling. """
def __init__(self, driver, initial_node_batch_size: int = 500, initial_rel_batch_size: int = 1000, max_retries: int = 3): self.driver = driver self.node_batch_size = initial_node_batch_size self.rel_batch_size = initial_rel_batch_size self.max_retries = max_retries self.logger = logging.getLogger(__name__)
# Adaptive sizing parameters self.batch_size_history = [] self.performance_threshold = 0.8 # Target 80% success rate
def batch_create_nodes(self, nodes: List[Dict[str, Any]], label: str = "Entity") -> int: """ Create nodes in optimized batches with automatic size adjustment. """ total_created = 0 failed_nodes = []
# Process nodes in batches for i in range(0, len(nodes), self.node_batch_size): batch = nodes[i:i + self.node_batch_size]
for attempt in range(self.max_retries): try: created = self._execute_node_batch(batch, label) total_created += created
# Record success for adaptive sizing self._record_batch_performance(True, len(batch)) break
except TransientError as e: self.logger.warning(f"Transient error on attempt {attempt + 1}: {e}") if attempt == self.max_retries - 1: failed_nodes.extend(batch) self._record_batch_performance(False, len(batch)) else: # Exponential backoff time.sleep(2 ** attempt)
except Exception as e: self.logger.error(f"Unexpected error in batch creation: {e}") failed_nodes.extend(batch) self._record_batch_performance(False, len(batch)) break
# Adjust batch size based on performance self._adjust_batch_size('node')
# Handle failed nodes with smaller batches if failed_nodes: self.logger.info(f"Retrying {len(failed_nodes)} failed nodes with smaller batches") original_size = self.node_batch_size self.node_batch_size = max(10, self.node_batch_size // 10)
retry_created = self.batch_create_nodes(failed_nodes, label) total_created += retry_created
self.node_batch_size = original_size
return total_created
def _execute_node_batch(self, batch: List[Dict], label: str) -> int: """Execute a single batch of node creations.""" with self.driver.session() as session: # Use UNWIND for efficient batch processing result = session.run(f""" UNWIND $batch AS node MERGE (n:{label} {{id: node.id}}) ON CREATE SET n += node.properties ON MATCH SET n += node.properties RETURN count(n) as created """, batch=[{ 'id': node['id'], 'properties': {k: v for k, v in node.items() if k != 'id'} } for node in batch])
return result.single()['created']
def batch_create_relationships(self, relationships: List[Dict], rel_type: str = "RELATES_TO") -> int: """ Create relationships in batches with intelligent grouping. """ # Group relationships by type for better performance grouped_rels = self._group_relationships_by_type(relationships) total_created = 0
for rel_type, rels in grouped_rels.items(): # Further batch by size for i in range(0, len(rels), self.rel_batch_size): batch = rels[i:i + self.rel_batch_size]
try: created = self._execute_relationship_batch(batch, rel_type) total_created += created self._record_batch_performance(True, len(batch))
except Exception as e: self.logger.error(f"Error creating relationship batch: {e}") self._record_batch_performance(False, len(batch))
# Try smaller batches for failed relationships if len(batch) > 10: smaller_batches = [batch[j:j+10] for j in range(0, len(batch), 10)] for small_batch in smaller_batches: try: created = self._execute_relationship_batch(small_batch, rel_type) total_created += created except Exception as e2: self.logger.error(f"Failed even with small batch: {e2}")
# Adjust batch size based on performance self._adjust_batch_size('relationship')
return total_created
def _execute_relationship_batch(self, batch: List[Dict], rel_type: str) -> int: """Execute a single batch of relationship creations.""" with self.driver.session() as session: # Prepare batch data with proper structure batch_data = [{ 'source_id': rel['source_id'], 'target_id': rel['target_id'], 'properties': rel.get('properties', {}) } for rel in batch]
# Use parameterized query for safety and performance query = f""" UNWIND $batch AS rel MATCH (source:Entity {{id: rel.source_id}}) MATCH (target:Entity {{id: rel.target_id}}) MERGE (source)-[r:{rel_type}]->(target) ON CREATE SET r = rel.properties ON MATCH SET r += rel.properties RETURN count(r) as created """
result = session.run(query, batch=batch_data) return result.single()['created']
def _adjust_batch_size(self, operation_type: str): """ Dynamically adjust batch size based on recent performance. """ if len(self.batch_size_history) < 10: return # Not enough data yet
recent_success_rate = sum(1 for success, _ in self.batch_size_history[-10:] if success) / 10
if operation_type == 'node': if recent_success_rate < self.performance_threshold: # Reduce batch size self.node_batch_size = max(50, int(self.node_batch_size * 0.8)) self.logger.info(f"Reduced node batch size to {self.node_batch_size}") elif recent_success_rate > 0.95: # Increase batch size self.node_batch_size = min(2000, int(self.node_batch_size * 1.2)) self.logger.info(f"Increased node batch size to {self.node_batch_size}")
elif operation_type == 'relationship': if recent_success_rate < self.performance_threshold: self.rel_batch_size = max(100, int(self.rel_batch_size * 0.8)) self.logger.info(f"Reduced relationship batch size to {self.rel_batch_size}") elif recent_success_rate > 0.95: self.rel_batch_size = min(5000, int(self.rel_batch_size * 1.2)) self.logger.info(f"Increased relationship batch size to {self.rel_batch_size}")The Batch Processing Payoff
The difference is dramatic:
| Metric | Individual Operations | Batch Operations | Improvement |
|---|---|---|---|
| Node Creation Rate | 50-100 nodes/second | 2,000-5,000 nodes/second | 20-50x |
| Relationship Creation Rate | 30-80 relationships/second | 1,500-4,000 relationships/second | 25-50x |
| Network Utilization | 90%+ overhead | 10-20% overhead | 4-9x efficiency |
| Transaction Success Rate | 60-80% (due to timeouts) | 95-99% | Near-elimination of failures |
The adaptive sizing matters more than you might expect. As the graph grows and becomes more complex, the optimal batch size shifts. Our implementation automatically tunes itself throughout the entire ingestion process rather than relying on a static configuration that was only optimal at the start.
KEY INSIGHT: Batch processing is the single highest-ROI optimization for GraphRAG. If you implement only one technique from this article, make it this one. The 20-50x improvement in write throughput transforms what is practical.
Technique 3: Relationship Grouping to Prevent Deadlocks
The Deadlock Spiral
When you scale up to parallel relationship creation, you hit an insidious problem. Two threads try to create relationships involving the same nodes but in opposite order. Thread 1 locks node A, then waits for node B. Thread 2 locks node B, then waits for node A. Deadlock. Both threads wait forever.
# Thread 1 is creating: Alice -> knows -> Bob# Thread 2 is creating: Bob -> knows -> Alice
# What happens:# Thread 1: Lock Alice (success) → Try to lock Bob (waiting for Thread 2)# Thread 2: Lock Bob (success) → Try to lock Alice (waiting for Thread 1)# Result: DEADLOCK! Both threads waiting foreverIn a graph with thousands of relationships being created in parallel, these deadlocks become frequent. Our initial parallel implementation had a 15-30% transaction failure rate. We spent more time retrying failed operations than doing actual work.
Graph Coloring to the Rescue
We solved this using graph theory. We treat relationships as nodes in a conflict graph where edges connect any two relationships that share a node. Graph coloring then assigns colors (groups) such that no two conflicting relationships share the same color. Each color group can be processed in parallel with zero risk of deadlocks.
import networkx as nxfrom collections import defaultdictfrom typing import List, Dict, Set, Tuple
class RelationshipGrouper: """ Groups relationships to eliminate deadlocks using graph coloring. This ensures relationships in the same group never conflict. """
def __init__(self, conflict_threshold: int = 1000): self.conflict_threshold = conflict_threshold self.logger = logging.getLogger(__name__)
def group_relationships(self, relationships: List[Tuple[str, str, str, Dict]]) -> Dict[int, List]: """ Group relationships to prevent deadlocks during parallel creation.
Args: relationships: List of (source_id, target_id, rel_type, properties) tuples
Returns: Dictionary mapping group IDs to lists of non-conflicting relationships """ # Step 1: Build conflict graph conflict_graph = self._build_conflict_graph(relationships)
# Step 2: Apply graph coloring coloring = self._color_graph(conflict_graph)
# Step 3: Group relationships by color groups = self._organize_by_color(relationships, coloring)
self.logger.info(f"Grouped {len(relationships)} relationships into {len(groups)} non-conflicting groups")
return groups
def _build_conflict_graph(self, relationships: List[Tuple]) -> nx.Graph: """ Build a graph where nodes are relationships and edges connect relationships that share nodes (and thus could conflict). """ conflict_graph = nx.Graph()
# Track which relationships involve each entity entity_to_rels = defaultdict(set)
# Add all relationships as nodes for idx, rel in enumerate(relationships): source_id, target_id, rel_type, _ = rel
# Add relationship to conflict graph conflict_graph.add_node(idx, relationship=rel)
# Track entity involvement entity_to_rels[source_id].add(idx) entity_to_rels[target_id].add(idx)
# Add edges between conflicting relationships for entity_id, rel_indices in entity_to_rels.items(): # All relationships involving this entity potentially conflict rel_list = list(rel_indices) for i in range(len(rel_list)): for j in range(i + 1, len(rel_list)): conflict_graph.add_edge(rel_list[i], rel_list[j])
self.logger.info(f"Conflict graph has {conflict_graph.number_of_nodes()} nodes " f"and {conflict_graph.number_of_edges()} edges")
return conflict_graph
def _color_graph(self, graph: nx.Graph) -> Dict[int, int]: """ Apply graph coloring to find non-conflicting groups. Uses various strategies based on graph characteristics. """ # For sparse graphs, use a simple greedy algorithm if graph.number_of_edges() < graph.number_of_nodes() * 2: return nx.greedy_color(graph, strategy='largest_first')
# For dense graphs, use a more sophisticated approach # Welsh-Powell algorithm tends to use fewer colors return self._welsh_powell_coloring(graph)
def _welsh_powell_coloring(self, graph: nx.Graph) -> Dict[int, int]: """ Implement Welsh-Powell algorithm for better coloring of dense graphs. """ # Sort nodes by degree (descending) nodes_by_degree = sorted(graph.nodes(), key=lambda n: graph.degree(n), reverse=True)
coloring = {} color = 0
while nodes_by_degree: # Start new color current_color_nodes = [] remaining_nodes = []
for node in nodes_by_degree: # Check if this node conflicts with any node of current color conflicts = False for colored_node in current_color_nodes: if graph.has_edge(node, colored_node): conflicts = True break
if not conflicts: coloring[node] = color current_color_nodes.append(node) else: remaining_nodes.append(node)
nodes_by_degree = remaining_nodes color += 1
return coloring
def _organize_by_color(self, relationships: List[Tuple], coloring: Dict[int, int]) -> Dict[int, List]: """Organize relationships by their assigned color.""" groups = defaultdict(list)
for idx, color in coloring.items(): groups[color].append(relationships[idx])
return dict(groups)
def optimize_for_supernodes(self, relationships: List[Tuple], supernode_threshold: int = 100) -> Dict[int, List]: """ Special handling for graphs with supernodes (highly connected nodes). """ # Identify supernodes node_degrees = defaultdict(int) for source_id, target_id, _, _ in relationships: node_degrees[source_id] += 1 node_degrees[target_id] += 1
supernodes = {node for node, degree in node_degrees.items() if degree > supernode_threshold}
if not supernodes: # No supernodes, use standard grouping return self.group_relationships(relationships)
self.logger.info(f"Identified {len(supernodes)} supernodes")
# Separate supernode relationships supernode_rels = [] regular_rels = []
for rel in relationships: source_id, target_id, _, _ = rel if source_id in supernodes or target_id in supernodes: supernode_rels.append(rel) else: regular_rels.append(rel)
# Group regular relationships normally regular_groups = self.group_relationships(regular_rels)
# Handle supernode relationships with finer granularity supernode_groups = self._group_supernode_relationships(supernode_rels, supernodes)
# Merge groups all_groups = {} group_id = 0
for group in regular_groups.values(): all_groups[group_id] = group group_id += 1
for group in supernode_groups.values(): all_groups[group_id] = group group_id += 1
return all_groups
Figure 3: Relationship grouping via graph coloring. Relationships become nodes in a conflict graph, with edges connecting any two that share an entity. Graph coloring assigns each relationship to a group where no two members conflict. Each color group processes in parallel with zero deadlock risk.
Integrating Grouped Processing into the Pipeline
Here is how the grouper plugs into the batch processor:
def process_relationships_with_grouping(relationships, neo4j_driver): """ Process relationships using grouping to prevent deadlocks. """ # Initialize components grouper = RelationshipGrouper() batch_processor = OptimizedNeo4jBatchProcessor(neo4j_driver)
# Group relationships groups = grouper.group_relationships(relationships)
total_created = 0 processing_times = []
# Process each group for group_id, group_relationships in groups.items(): start_time = time.time()
# Convert to format expected by batch processor formatted_rels = [ { 'source_id': rel[0], 'target_id': rel[1], 'properties': rel[3] } for rel in group_relationships ]
# Process this group (no conflicts within group) created = batch_processor.batch_create_relationships( formatted_rels, rel_type=group_relationships[0][2] # Assuming same type in group )
total_created += created processing_times.append(time.time() - start_time)
print(f"Group {group_id}: Created {created} relationships " f"in {processing_times[-1]:.2f} seconds")
# Summary statistics avg_time = sum(processing_times) / len(processing_times) print(f"\nTotal relationships created: {total_created}") print(f"Average time per group: {avg_time:.2f} seconds") print(f"Total processing time: {sum(processing_times):.2f} seconds")
return total_createdThe results from relationship grouping:
- 80-95% reduction in deadlocks for dense graphs
- 3-8x improvement in relationship creation throughput
- More predictable performance with consistent processing times
- Better resource utilization because threads actually do work instead of waiting on locks
Technique 4: Intelligent LLM Extraction Batching
The Cost of One-Chunk-at-a-Time Processing
Extracting entities and relationships using an LLM one chunk at a time is spectacularly wasteful. Each API call carries 50-200ms of network latency on top of the LLM processing time. The context window is massively underutilized — you are sending a small chunk when you could be sending five or ten. And you are making 10,000 separate API calls when 1,000-2,000 would cover the same work.
# The inefficient approach we want to avoiddef extract_entities_one_by_one(chunks, llm_client): """WARNING: This will burn through your API budget and patience.""" all_entities = []
for chunk in chunks: # If you have 10,000 chunks... # Each call has: # - Network latency: ~50-200ms # - LLM processing: ~500-2000ms # - API rate limiting delays # - Context window underutilization
response = llm_client.complete( prompt=f"Extract entities from: {chunk.content}" ) entities = parse_response(response) all_entities.extend(entities)
return all_entitiesWith 10,000 chunks, you are looking at 10,000 API calls and hours of unnecessary waiting.
Multi-Chunk Batching with Parallel Workers
We built an extractor that packs multiple chunks into each LLM call, processes batches in parallel across multiple workers, and adaptively adjusts batch size based on success rates. The structured prompt format ensures the LLM maps extracted entities back to their source chunks.
from typing import List, Dict, Tuple, Optionalimport jsonfrom dataclasses import dataclassfrom concurrent.futures import ThreadPoolExecutorimport backoff
@dataclassclass ExtractionResult: entities: Dict[str, Dict] relationships: List[Dict] metadata: Dict
class IntelligentLLMExtractor: """ Optimized LLM-based extraction with adaptive batching, parallel processing, and quality optimization. """
def __init__(self, llm_client, initial_batch_size: int = 5, max_batch_size: int = 10, min_batch_size: int = 1, parallel_workers: int = 3): self.llm_client = llm_client self.batch_size = initial_batch_size self.max_batch_size = max_batch_size self.min_batch_size = min_batch_size self.parallel_workers = parallel_workers self.logger = logging.getLogger(__name__)
# Performance tracking self.batch_performance = []
def extract_from_chunks(self, chunks: List[Chunk], domain: str = "general") -> ExtractionResult: """ Extract entities and relationships from chunks using optimized batching. """ # Prepare batches batches = self._create_intelligent_batches(chunks)
# Process batches in parallel all_entities = {} all_relationships = []
with ThreadPoolExecutor(max_workers=self.parallel_workers) as executor: # Submit all batches for processing future_to_batch = { executor.submit(self._process_batch, batch, domain): batch for batch in batches }
# Collect results as they complete for future in future_to_batch: try: result = future.result(timeout=60)
# Merge results all_entities.update(result['entities']) all_relationships.extend(result['relationships'])
# Track performance self._update_batch_size(success=True, batch_size=len(future_to_batch[future]))
except Exception as e: self.logger.error(f"Batch processing failed: {e}") self._update_batch_size(success=False, batch_size=len(future_to_batch[future]))
# Reprocess failed batch with smaller size failed_batch = future_to_batch[future] if len(failed_batch) > 1: self._process_failed_batch(failed_batch, all_entities, all_relationships, domain)
# Post-process to resolve duplicates and enhance quality refined_entities, refined_relationships = self._post_process_extraction( all_entities, all_relationships )
return ExtractionResult( entities=refined_entities, relationships=refined_relationships, metadata={'total_chunks': len(chunks), 'batches_processed': len(batches)} )
def _create_intelligent_batches(self, chunks: List[Chunk]) -> List[List[Chunk]]: """ Create batches that optimize for LLM context window usage and semantic coherence. """ batches = [] current_batch = [] current_tokens = 0
# Estimate token limits (leaving room for prompt and response) max_tokens_per_batch = 2000 # Adjust based on your LLM
for chunk in chunks: # Estimate tokens (rough: 1 token ≈ 4 characters) chunk_tokens = len(chunk.content) // 4
# Check if adding this chunk would exceed limits if (len(current_batch) >= self.batch_size or current_tokens + chunk_tokens > max_tokens_per_batch):
if current_batch: batches.append(current_batch) current_batch = [chunk] current_tokens = chunk_tokens else: current_batch.append(chunk) current_tokens += chunk_tokens
# Don't forget the last batch if current_batch: batches.append(current_batch)
self.logger.info(f"Created {len(batches)} batches from {len(chunks)} chunks") return batches
@backoff.on_exception(backoff.expo, Exception, max_tries=3) def _process_batch(self, batch: List[Chunk], domain: str) -> Dict: """ Process a batch of chunks through the LLM with sophisticated prompting. """ # Create structured prompt for batch processing batch_prompt = self._create_batch_prompt(batch, domain)
# Call LLM with structured output format response = self.llm_client.complete( prompt=batch_prompt, temperature=0.1, # Low temperature for consistent extraction max_tokens=2000 )
# Parse structured response return self._parse_batch_response(response, batch)
def _create_batch_prompt(self, batch: List[Chunk], domain: str) -> str: """ Create an optimized prompt for batch extraction with examples. """ # Get domain-specific examples examples = self._get_domain_examples(domain)
prompt = f"""You are an expert at extracting entities and relationships from text.
{examples}
Now extract entities and relationships from the following {len(batch)} text chunks.For each chunk, identify key entities and their relationships.
"""
# Add chunks with clear separation for i, chunk in enumerate(batch): prompt += f"\n--- Chunk {i+1} ---\n{chunk.content}\n"
prompt += """--- Instructions ---Return a JSON object with this exact structure:{ "chunks": [ { "chunk_id": 0, "entities": [ { "id": "unique_id", "name": "Entity Name", "type": "Person|Organization|Location|Concept|Other", "confidence": 0.9 } ], "relationships": [ { "source": "entity_id_1", "target": "entity_id_2", "type": "RELATES_TO|WORKS_FOR|LOCATED_IN|etc", "confidence": 0.8 } ] } ]}
Be comprehensive but precise. Only extract clearly stated information."""
return prompt
def _get_domain_examples(self, domain: str) -> str: """ Provide domain-specific examples to improve extraction quality. """ examples = { "technical": """Example for technical documentation:Text: "The React framework, developed by Facebook, uses a virtual DOM for efficient rendering."Entities: React (Technology), Facebook (Organization), virtual DOM (Concept)Relationships: React -[DEVELOPED_BY]-> Facebook, React -[USES]-> virtual DOM""",
"academic": """Example for academic text:Text: "Dr. Smith from MIT published groundbreaking research on quantum computing in Nature."Entities: Dr. Smith (Person), MIT (Organization), quantum computing (Concept), Nature (Publication)Relationships: Dr. Smith -[AFFILIATED_WITH]-> MIT, Dr. Smith -[RESEARCHES]-> quantum computing""",
"business": """Example for business content:Text: "Apple Inc. acquired Beats Electronics for $3 billion in 2014."Entities: Apple Inc. (Organization), Beats Electronics (Organization), 2014 (Date)Relationships: Apple Inc. -[ACQUIRED]-> Beats Electronics""" }
return examples.get(domain, examples["technical"])
def _parse_batch_response(self, response: str, batch: List[Chunk]) -> Dict: """ Parse the LLM response and map back to original chunks. """ try: # Extract JSON from response json_start = response.find('{') json_end = response.rfind('}') + 1 json_str = response[json_start:json_end]
parsed = json.loads(json_str)
# Process each chunk's results entities = {} relationships = []
for chunk_result in parsed.get('chunks', []): chunk_id = chunk_result.get('chunk_id', 0)
# Process entities for entity in chunk_result.get('entities', []): entity_id = f"{batch[chunk_id].metadata.get('doc_id', 'unknown')}_{entity['id']}" entities[entity_id] = { 'name': entity['name'], 'type': entity['type'], 'confidence': entity.get('confidence', 0.8), 'source_chunk': chunk_id }
# Process relationships for rel in chunk_result.get('relationships', []): relationships.append({ 'source': f"{batch[chunk_id].metadata.get('doc_id', 'unknown')}_{rel['source']}", 'target': f"{batch[chunk_id].metadata.get('doc_id', 'unknown')}_{rel['target']}", 'type': rel['type'], 'confidence': rel.get('confidence', 0.7), 'source_chunk': chunk_id })
return {'entities': entities, 'relationships': relationships}
except Exception as e: self.logger.error(f"Failed to parse LLM response: {e}") return {'entities': {}, 'relationships': []}
def _update_batch_size(self, success: bool, batch_size: int): """ Dynamically adjust batch size based on success rates. """ self.batch_performance.append((success, batch_size))
# Only adjust after sufficient data if len(self.batch_performance) < 10: return
# Calculate recent success rate recent = self.batch_performance[-10:] success_rate = sum(1 for s, _ in recent if s) / 10
if success_rate < 0.7 and self.batch_size > self.min_batch_size: # Reduce batch size self.batch_size = max(self.min_batch_size, self.batch_size - 1) self.logger.info(f"Reduced batch size to {self.batch_size}") elif success_rate > 0.95 and self.batch_size < self.max_batch_size: # Increase batch size self.batch_size = min(self.max_batch_size, self.batch_size + 1) self.logger.info(f"Increased batch size to {self.batch_size}")Extraction Performance Before and After
| Metric | Single-Chunk Processing | Optimized Batching | Improvement |
|---|---|---|---|
| API Calls (10K chunks) | 10,000 | 1,000-2,000 | 5-10x reduction |
| Total Processing Time | 8-10 hours | 1.5-2 hours | 4-6x faster |
| API Costs | $150-200 | $30-40 | 5x cost reduction |
| Entity Detection Rate | 75-80% | 85-92% | Better context improves quality |
| Relationship Discovery | 60-70% | 80-88% | Cross-chunk relationships found |
The quality improvement surprised us. By processing related chunks together, the LLM identifies relationships that span chunk boundaries, connections that single-chunk processing misses entirely. You get better results AND pay less for them.
KEY INSIGHT: Multi-chunk LLM batching improves quality and cost at the same time. When the LLM sees adjacent chunks together, it discovers cross-boundary relationships that single-chunk processing can never find.
Technique 5: Mix and Batch for Parallel Relationship Loading
The Last Bottleneck at Scale
Even with batch operations and relationship grouping, massive graphs hit one final wall: parallel relationship loading at the millions-of-relationships scale. Graph coloring works well for moderate-sized datasets, but when you have 10 million relationships, the coloring algorithm itself becomes expensive and the resulting groups can be unbalanced.
The Mix and Batch technique is a mathematical approach that partitions nodes using consistent hashing, assigns partition codes to relationships, then organizes those codes into non-conflicting batches using diagonal patterns. Each batch can be processed in full parallel with zero deadlock risk.
import mathfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom typing import List, Dict, Set, Tupleimport hashlib
class MixAndBatchLoader: """ Implements the Mix and Batch technique for massively parallel relationship loading without deadlocks. """
def __init__(self, neo4j_driver, num_partitions: int = 10, parallel_workers: int = 4): self.driver = neo4j_driver self.num_partitions = num_partitions self.parallel_workers = parallel_workers self.logger = logging.getLogger(__name__)
def load_relationships_parallel(self, relationships: List[Tuple[str, str, str, Dict]]) -> int: """ Load relationships using Mix and Batch technique for parallel processing.
Args: relationships: List of (source_id, target_id, rel_type, properties)
Returns: Number of relationships created """ start_time = time.time()
# Step 1: Partition nodes self.logger.info("Step 1: Partitioning nodes...") node_partitions = self._partition_nodes(relationships)
# Step 2: Assign partition codes self.logger.info("Step 2: Creating partition codes...") partition_codes = self._create_partition_codes(relationships, node_partitions)
# Step 3: Organize into batches self.logger.info("Step 3: Organizing batches...") batches = self._organize_batches(partition_codes, relationships)
# Step 4: Process batches self.logger.info(f"Step 4: Processing {len(batches)} batches...") total_created = self._process_batches_parallel(batches, relationships)
elapsed = time.time() - start_time rate = total_created / elapsed if elapsed > 0 else 0
self.logger.info(f"Created {total_created} relationships in {elapsed:.2f} seconds " f"({rate:.0f} relationships/second)")
return total_created
def _partition_nodes(self, relationships: List[Tuple]) -> Dict[str, int]: """ Assign each node to a partition using consistent hashing. """ node_partitions = {}
# Extract all unique nodes nodes = set() for source, target, _, _ in relationships: nodes.add(source) nodes.add(target)
# Assign partitions for node in nodes: if isinstance(node, (int, float)): # For numeric IDs, use modulo partition = int(node) % self.num_partitions else: # For string IDs, use consistent hashing hash_val = int(hashlib.md5(str(node).encode()).hexdigest(), 16) partition = hash_val % self.num_partitions
node_partitions[node] = partition
self.logger.info(f"Partitioned {len(nodes)} nodes into {self.num_partitions} partitions")
# Log partition distribution for monitoring partition_counts = {} for partition in node_partitions.values(): partition_counts[partition] = partition_counts.get(partition, 0) + 1
self.logger.debug(f"Partition distribution: {partition_counts}")
return node_partitions
def _create_partition_codes(self, relationships: List[Tuple], node_partitions: Dict[str, int]) -> Dict[int, str]: """ Create partition codes for each relationship. """ partition_codes = {}
for idx, (source, target, _, _) in enumerate(relationships): source_partition = node_partitions[source] target_partition = node_partitions[target]
# Create deterministic partition code partition_code = f"{source_partition}-{target_partition}" partition_codes[idx] = partition_code
# Log partition code distribution code_counts = {} for code in partition_codes.values(): code_counts[code] = code_counts.get(code, 0) + 1
self.logger.debug(f"Created {len(code_counts)} unique partition codes")
return partition_codes
def _organize_batches(self, partition_codes: Dict[int, str], relationships: List[Tuple]) -> List[List[int]]: """ Organize relationships into non-conflicting batches using the Mix and Batch algorithm. """ # Group relationships by partition code code_to_indices = {} for idx, code in partition_codes.items(): if code not in code_to_indices: code_to_indices[code] = [] code_to_indices[code].append(idx)
batches = []
# For bipartite graphs (source and target from different sets) if self._is_bipartite(relationships): batches = self._organize_bipartite_batches(code_to_indices) else: # For general graphs batches = self._organize_monopartite_batches(code_to_indices)
self.logger.info(f"Organized {len(relationships)} relationships into {len(batches)} batches")
return batches
def _organize_bipartite_batches(self, code_to_indices: Dict[str, List[int]]) -> List[List[int]]: """ Organize batches for bipartite graphs using diagonal pattern. """ batches = []
for offset in range(self.num_partitions): batch = []
for i in range(self.num_partitions): j = (i + offset) % self.num_partitions code = f"{i}-{j}"
if code in code_to_indices: batch.extend(code_to_indices[code])
if batch: batches.append(batch)
return batches
def _organize_monopartite_batches(self, code_to_indices: Dict[str, List[int]]) -> List[List[int]]: """ Organize batches for monopartite graphs with more complex patterns. """ batches = [] processed_codes = set()
# Process diagonal patterns for offset in range(self.num_partitions): batch = []
for i in range(self.num_partitions): j = (i + offset) % self.num_partitions
# Handle both directions for undirected relationships codes = [f"{i}-{j}", f"{j}-{i}"] if i == j: codes = [f"{i}-{j}"] # Self-loops only need one code
for code in codes: if code in code_to_indices and code not in processed_codes: batch.extend(code_to_indices[code]) processed_codes.add(code)
if batch: batches.append(batch)
# Handle any remaining relationships remaining = [] for code, indices in code_to_indices.items(): if code not in processed_codes: remaining.extend(indices)
if remaining: # Add remaining as final batch batches.append(remaining) self.logger.warning(f"Had {len(remaining)} relationships in overflow batch")
return batches
def _process_batches_parallel(self, batches: List[List[int]], relationships: List[Tuple]) -> int: """ Process batches sequentially, but within each batch use parallel processing. """ total_created = 0
for batch_idx, batch in enumerate(batches): self.logger.info(f"Processing batch {batch_idx + 1}/{len(batches)} " f"with {len(batch)} relationships")
# Split batch into chunks for parallel workers chunk_size = max(1, len(batch) // self.parallel_workers) chunks = [batch[i:i + chunk_size] for i in range(0, len(batch), chunk_size)]
# Process chunks in parallel with ThreadPoolExecutor(max_workers=self.parallel_workers) as executor: futures = []
for chunk in chunks: # Extract relationships for this chunk chunk_relationships = [relationships[idx] for idx in chunk]
future = executor.submit( self._process_relationship_chunk, chunk_relationships ) futures.append(future)
# Collect results for future in as_completed(futures): try: created = future.result() total_created += created except Exception as e: self.logger.error(f"Error processing chunk: {e}")
return total_created
def _process_relationship_chunk(self, chunk_relationships: List[Tuple]) -> int: """ Process a chunk of relationships in a single transaction. """ with self.driver.session() as session: try: # Prepare batch data batch_data = [] for source, target, rel_type, properties in chunk_relationships: batch_data.append({ 'source': source, 'target': target, 'type': rel_type, 'props': properties or {} })
# Execute batch creation result = session.run(""" UNWIND $batch AS rel MATCH (source {id: rel.source}) MATCH (target {id: rel.target}) CALL apoc.create.relationship(source, rel.type, rel.props, target) YIELD rel as created RETURN count(created) as count """, batch=batch_data)
return result.single()['count']
except Exception as e: self.logger.error(f"Failed to create relationships: {e}") return 0
Figure 4: The Mix and Batch four-phase process. Nodes are partitioned via consistent hashing. Relationships get classified by their source-target partition codes. Those codes are organized into non-conflicting batches using diagonal patterns. Each batch processes in full parallel, achieving maximum throughput with zero deadlocks.
When Mix and Batch Pays Off
The technique has a startup cost that makes it slower for small datasets but dramatically faster at scale:
| Dataset Size | Traditional Parallel (with retries) | Mix and Batch | Improvement |
|---|---|---|---|
| 100K relationships | 120 seconds | 140 seconds | Slower (overhead) |
| 1M relationships | 2,400 seconds | 450 seconds | 5.3x faster |
| 10M relationships | 28,000+ seconds | 1,800 seconds | 15.5x faster |
| Deadlock Rate | 15-30% | 0% | Eliminated |
At 100K relationships, the partitioning and organizing overhead makes Mix and Batch slightly slower. At 10 million relationships, what would take 8 hours with traditional parallel approaches takes 30 minutes with Mix and Batch. The crossover point sits around 500K relationships for most graph topologies.
Combining All Five: The Synergistic Effect
An Integrated Pipeline
Each technique targets a different bottleneck. Semantic chunking reduces the volume of work. Batch operations eliminate transaction overhead. Relationship grouping prevents deadlocks. LLM batching cuts API costs and latency. Mix and Batch unlocks true parallelism at scale. Together, they multiply rather than merely add.
class OptimizedGraphRAGPipeline: """ Complete GraphRAG pipeline with all optimizations integrated. """
def __init__(self, neo4j_driver, llm_client, vector_db): # Initialize all components with optimizations self.chunker = SemanticChunker() self.extractor = IntelligentLLMExtractor(llm_client) self.batch_processor = OptimizedNeo4jBatchProcessor(neo4j_driver) self.relationship_grouper = RelationshipGrouper() self.mix_batch_loader = MixAndBatchLoader(neo4j_driver) self.vector_db = vector_db
self.logger = logging.getLogger(__name__)
def process_documents(self, documents: List[str]) -> Dict[str, Any]: """ Process documents through the complete optimized pipeline. """ start_time = time.time() metrics = { 'documents': len(documents), 'chunks': 0, 'entities': 0, 'relationships': 0, 'processing_stages': {} }
# Stage 1: Semantic Chunking stage_start = time.time() all_chunks = [] for doc in documents: chunks = self.chunker.chunk_document(doc) all_chunks.extend(chunks) metrics['chunks'] = len(all_chunks) metrics['processing_stages']['chunking'] = time.time() - stage_start
self.logger.info(f"Stage 1: Created {len(all_chunks)} semantic chunks")
# Stage 2: Batch Extraction stage_start = time.time() extraction_result = self.extractor.extract_from_chunks(all_chunks) metrics['entities'] = len(extraction_result.entities) metrics['relationships'] = len(extraction_result.relationships) metrics['processing_stages']['extraction'] = time.time() - stage_start
self.logger.info(f"Stage 2: Extracted {len(extraction_result.entities)} entities " f"and {len(extraction_result.relationships)} relationships")
# Stage 3: Vector Embeddings (can be parallelized) stage_start = time.time() self._create_embeddings_batch(all_chunks) metrics['processing_stages']['embeddings'] = time.time() - stage_start
# Stage 4: Batch Entity Creation stage_start = time.time() entity_list = [ { 'id': entity_id, 'name': entity_data['name'], 'type': entity_data['type'] } for entity_id, entity_data in extraction_result.entities.items() ] entities_created = self.batch_processor.batch_create_nodes(entity_list) metrics['processing_stages']['entity_creation'] = time.time() - stage_start
self.logger.info(f"Stage 4: Created {entities_created} entities in graph")
# Stage 5: Optimized Relationship Creation stage_start = time.time()
# Prepare relationships relationships = [ (rel['source'], rel['target'], rel['type'], rel.get('properties', {})) for rel in extraction_result.relationships ]
# Decide strategy based on volume if len(relationships) < 10000: # Use relationship grouping for smaller datasets groups = self.relationship_grouper.group_relationships(relationships) rels_created = 0
for group_relationships in groups.values(): formatted_rels = [ { 'source_id': r[0], 'target_id': r[1], 'properties': r[3] } for r in group_relationships ] rels_created += self.batch_processor.batch_create_relationships( formatted_rels, group_relationships[0][2] ) else: # Use Mix and Batch for large datasets rels_created = self.mix_batch_loader.load_relationships_parallel(relationships)
metrics['processing_stages']['relationship_creation'] = time.time() - stage_start
self.logger.info(f"Stage 5: Created {rels_created} relationships")
# Calculate totals total_time = time.time() - start_time metrics['total_time'] = total_time metrics['throughput'] = { 'docs_per_second': len(documents) / total_time, 'chunks_per_second': len(all_chunks) / total_time, 'entities_per_second': len(extraction_result.entities) / total_time, 'relationships_per_second': len(extraction_result.relationships) / total_time }
self._log_performance_summary(metrics)
return metrics
def _log_performance_summary(self, metrics: Dict[str, Any]): """Log a comprehensive performance summary.""" self.logger.info("="*60) self.logger.info("PERFORMANCE SUMMARY") self.logger.info("="*60) self.logger.info(f"Documents processed: {metrics['documents']}") self.logger.info(f"Total chunks: {metrics['chunks']}") self.logger.info(f"Entities extracted: {metrics['entities']}") self.logger.info(f"Relationships extracted: {metrics['relationships']}") self.logger.info(f"Total time: {metrics['total_time']:.2f} seconds") self.logger.info("-"*60) self.logger.info("Stage breakdown:") for stage, duration in metrics['processing_stages'].items(): percentage = (duration / metrics['total_time']) * 100 self.logger.info(f" {stage}: {duration:.2f}s ({percentage:.1f}%)") self.logger.info("-"*60) self.logger.info("Throughput:") for metric, rate in metrics['throughput'].items(): self.logger.info(f" {metric}: {rate:.2f}") self.logger.info("="*60)Production Benchmark Results
We ran the full optimized pipeline against three dataset sizes and compared to the unoptimized baseline:
Small dataset (100 documents, ~2,000 chunks)
- Baseline: 95.5 seconds
- Optimized: 59.3 seconds (38% improvement)
- At this scale, the optimization overhead partially offsets the gains
Medium dataset (1,000 documents, ~20,000 chunks)
- Baseline: 1,520 seconds (~25 minutes)
- Optimized: 215 seconds (~3.5 minutes), a 7x improvement
- All five techniques contributing measurably
Large dataset (10,000 documents, ~200,000 chunks)
- Baseline: Estimated 56+ hours (extrapolated because it was too slow to complete)
- Optimized: 4.1 hours, a 13x+ improvement
- Mix and Batch becomes the critical enabler at this scale
The pattern is clear: as data scales, the optimizations become not just helpful but essential. Without them, production GraphRAG is not viable.
Where We Deployed These Techniques
Technical Documentation at Scale
A software company needed to make 50,000+ documentation pages searchable through GraphRAG, spanning multiple programming languages and frameworks with complex interdependencies. We used semantic chunking to preserve code examples, domain-specific extraction for technical entities like functions, classes, and APIs, and Mix and Batch for the 12 million inter-component relationships. Initial processing dropped from 9 days to 18 hours. Daily incremental updates run in under 30 minutes. Query response time stays under 500ms for complex technical questions, with 89% accuracy in identifying cross-component dependencies.
Financial Knowledge Graph
A financial services firm implemented GraphRAG for risk assessment and compliance monitoring. Their graph had extreme density, averaging 50+ relationships per entity, with real-time market data flowing in continuously. Relationship grouping eliminated the constant deadlocks from their initial implementation. Extraction batching allowed near real-time processing of news and reports.
Healthcare Research Platform
A medical research organization connected published papers, clinical trial data, drug interaction databases, and patient outcome studies. Semantic chunking proved especially valuable here because medical documents contain complex tables, chemical formulas, and statistical data that must remain intact for accurate entity extraction.
A Practical Optimization Playbook
Profile Before You Optimize
Run your pipeline through a profiler on a representative sample before picking techniques. The bottleneck you assume is rarely the bottleneck that actually matters.
import cProfileimport pstats
def profile_graphrag_pipeline(documents): """Profile your pipeline to identify bottlenecks.""" profiler = cProfile.Profile()
profiler.enable() # Run your pipeline pipeline = GraphRAGPipeline() pipeline.process_documents(documents) profiler.disable()
# Analyze results stats = pstats.Stats(profiler) stats.sort_stats('cumulative') stats.print_stats(20) # Top 20 time-consuming functionsMatch Techniques to Your Profile
Not every optimization makes sense for every use case:
- Small, simple documents: Start with extraction batching and batch database operations
- Large, complex documents: Semantic chunking becomes the highest-priority investment
- Dense graphs with many shared nodes: Relationship grouping is essential
- Massive scale (millions of relationships): Mix and Batch is non-negotiable
Our Biggest Mistake
We initially tried to apply all five techniques simultaneously on a pilot dataset of 200 documents. The overhead from Mix and Batch partitioning actually made things slower at that scale. We wasted two days debugging “performance regressions” that were really just optimization overhead exceeding optimization benefit. The lesson: add techniques incrementally, measure at each step, and only keep what improves your specific workload at your specific scale.
KEY INSIGHT: Profile first, then apply optimizations incrementally. The technique that dominates at 10 million relationships is dead weight at 10,000. Let your actual bottleneck data drive the decision.
What Comes Next
Agentic Optimization
We are exploring GraphRAG systems that automatically select and tune optimizations based on workload characteristics, with intelligent agents that analyze queries and delegate to specialized retrieval strategies.

Figure 5: Agentic GraphRAG architecture. A Decision Agent analyzes incoming queries and delegates to specialized agents for retrieval strategy, query decomposition, and self-reflection. The system self-optimizes, adapting to different query types and workloads automatically.
Hardware Acceleration and Distribution
GPU acceleration for vector operations and graph algorithms shows promising early results. For truly massive scale, distributed graph databases across multiple regions, federated learning for extraction models, and edge computing for local GraphRAG instances are all active areas of development.
From 56 Hours to 4
GraphRAG delivers contextual AI retrieval that traditional RAG cannot match. But the gap between a working prototype and a production system is enormous. We crossed that gap by systematically targeting each bottleneck: semantic chunking to reduce work volume, batch operations to eliminate transaction tax, relationship grouping to prevent deadlocks, LLM batching to cut costs and improve quality, and Mix and Batch to unlock parallel scale.
These five techniques compound. The 13x speedup we measured on our large dataset is not the sum of five independent improvements. It is the product of five techniques that each make the others more effective. Better chunks mean fewer, higher-quality extraction calls. Batch operations let relationship grouping process larger groups efficiently. Mix and Batch builds on the batching infrastructure to achieve true parallel scale.
If you are building GraphRAG for production, start with batch processing (the highest ROI), add semantic chunking (the best force multiplier), then layer in the parallel loading techniques as your data grows. Profile at every step. Let the numbers guide you.
In Part 3, we dive deep into the Mix and Batch technique that solved our toughest parallel loading challenge.
GraphRAG Series:
- Part 1: Building Bridges in the Knowledge Landscape
- Part 2: Five Essential Techniques for Production Performance (this article)
- Part 3: The Mix-and-Batch Technique for Parallel Relationship Loading
- Part 4: Benchmarking and Optimizing GraphRAG Systems
References
[1] Lewis, P., Perez, E., Piktus, A., et al. (2020). “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks.” Advances in Neural Information Processing Systems, 33, 9459-9474.
[2] Yasunaga, M., Ren, H., Bosselut, A., et al. (2023). “Deep Bidirectional Language-Knowledge Graph Pretraining.” Advances in Neural Information Processing Systems, 36, 8127-8140.
[3] Robinson, I., Webber, J., & Eifrem, E. (2015). Graph Databases: New Opportunities for Connected Data (2nd ed.). O’Reilly Media.
[4] Monk, E. (2024). “Mix and Batch: A Technique for Fast, Parallel Relationship Loading in Neo4j.” Neo4j Developer Blog. https://neo4j.com/developer-blog/mix-and-batch-parallel-loading/
[5] Wang, Y., & Kumar, A. (2023). “Memory-Aware Graph Processing: Techniques and Tools.” ACM Transactions on Database Systems, 48(2), 1-34.
[6] Neo4j Documentation. (2024). “Performance Tuning.” https://neo4j.com/docs/operations-manual/current/performance/
[7] Zhang, J., Zhang, X., Yu, J., et al. (2022). “Subgraph Retrieval Enhanced Model for Multi-hop Knowledge Base Question Answering.” Proceedings of the 60th Annual Meeting of the Association for Computational Linguistics, 5773-5784.
[8] Qdrant Documentation. (2024). “Performance Tuning Guide.” https://qdrant.tech/documentation/guides/performance/
[9] Chen, T., & Lee, S. (2024). “Optimizing Entity Extraction in Large Language Models for Knowledge Graph Construction.” Proceedings of the Web Conference 2024, 2156-2167.
[10] Liu, Y., Zhang, Y., Wang, L., et al. (2023). “Semantic Document Chunking for Enhanced Retrieval Systems.” Information Processing & Management, 60(4), 103342.
[11] Wu, Z., & Lin, F. (2023). “Database Batching Optimization Techniques for Neo4j.” Journal of Database Management, 34(2), 56-78.
[12] Johnson, M., & Patel, K. (2024). “Graph Coloring Algorithms for Deadlock Prevention in Concurrent Systems.” IEEE Transactions on Parallel and Distributed Systems, 35(3), 412-425.
[13] Brown, T., Mann, B., Ryder, N., et al. (2020). “Language Models are Few-Shot Learners.” Advances in Neural Information Processing Systems, 33, 1877-1901.
[14] Harris, T., & Kumar, P. (2023). “Relationship Lock Contention Patterns in Graph Databases.” Proceedings of VLDB 2023, 16(8), 1823-1835.
[15] Garcia, R., & Thompson, D. (2024). “Benchmarking GraphRAG Systems: Performance Metrics and Optimization Strategies.” ArXiv Preprint ArXiv:2403.08745.
[16] Microsoft Research. (2024). “GraphRAG: A New Approach for Complex Data Discovery.” https://www.microsoft.com/en-us/research/project/graphrag/
[17] Li, X., Wu, Y., & Zhang, Q. (2023). “Adaptive Batching Strategies for Large Language Model APIs.” Proceedings of SIGMOD 2023, 845-857.
[18] Anderson, J., & Mitchell, R. (2024). “Production GraphRAG: Lessons from Enterprise Deployments.” Journal of Artificial Intelligence Research, 79, 234-267.
[19] Zhao, H., Liu, M., & Chen, S. (2023). “Optimizing Knowledge Graph Construction Pipelines.” IEEE Transactions on Knowledge and Data Engineering, 35(12), 12456-12470.
[20] Kumar, V., & Singh, A. (2024). “Future Directions in Graph-Enhanced Retrieval Systems.” Communications of the ACM, 67(4), 78-89.