DeepCritical / docs /architecture /graph_orchestration.md
Joseph Pollack
adds chathistory , docs, tests , integration
f173aad
# Graph Orchestration Architecture
## Overview
DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.
## Conversation History
DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:
1. **User Conversation History**: Multi-turn user interactions (from Gradio chat interface) stored as `list[ModelMessage]`
2. **Research Iteration History**: Internal research process state (existing `Conversation` model)
### Message History Flow
```
Gradio Chat History β†’ convert_gradio_to_message_history() β†’ GraphOrchestrator.run(message_history)
↓
GraphExecutionContext (stores message_history)
↓
Agent Nodes (receive message_history via agent.run())
↓
WorkflowState (persists user_message_history)
```
### Usage
Message history is automatically converted from Gradio format and passed through the orchestrator:
```python
# In app.py - automatic conversion
message_history = convert_gradio_to_message_history(history) if history else None
async for event in orchestrator.run(query, message_history=message_history):
yield event
```
Agents receive message history through their `run()` methods:
```python
# In agent execution
if message_history:
result = await agent.run(input_data, message_history=message_history)
```
## Graph Patterns
### Iterative Research Graph
The iterative research graph follows this pattern:
```
[Input] β†’ [Thinking] β†’ [Knowledge Gap] β†’ [Decision: Complete?]
↓ No ↓ Yes
[Tool Selector] [Writer]
↓
[Execute Tools] β†’ [Loop Back]
```
**Node IDs**: `thinking` β†’ `knowledge_gap` β†’ `continue_decision` β†’ `tool_selector`/`writer` β†’ `execute_tools` β†’ (loop back to `thinking`)
**Special Node Handling**:
- `execute_tools`: State node that uses `search_handler` to execute searches and add evidence to workflow state
- `continue_decision`: Decision node that routes based on `research_complete` flag from `KnowledgeGapOutput`
### Deep Research Graph
The deep research graph follows this pattern:
```
[Input] β†’ [Planner] β†’ [Store Plan] β†’ [Parallel Loops] β†’ [Collect Drafts] β†’ [Synthesizer]
↓ ↓ ↓
[Loop1] [Loop2] [Loop3]
```
**Node IDs**: `planner` β†’ `store_plan` β†’ `parallel_loops` β†’ `collect_drafts` β†’ `synthesizer`
**Special Node Handling**:
- `planner`: Agent node that creates `ReportPlan` with report outline
- `store_plan`: State node that stores `ReportPlan` in context for parallel loops
- `parallel_loops`: Parallel node that executes `IterativeResearchFlow` instances for each section
- `collect_drafts`: State node that collects section drafts from parallel loops
- `synthesizer`: Agent node that calls `LongWriterAgent.write_report()` directly with `ReportDraft`
### Deep Research
```mermaid
sequenceDiagram
actor User
participant GraphOrchestrator
participant InputParser
participant GraphBuilder
participant GraphExecutor
participant Agent
participant BudgetTracker
participant WorkflowState
User->>GraphOrchestrator: run(query)
GraphOrchestrator->>InputParser: detect_research_mode(query)
InputParser-->>GraphOrchestrator: mode (iterative/deep)
GraphOrchestrator->>GraphBuilder: build_graph(mode)
GraphBuilder-->>GraphOrchestrator: ResearchGraph
GraphOrchestrator->>WorkflowState: init_workflow_state()
GraphOrchestrator->>BudgetTracker: create_budget()
GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
loop For each node in graph
GraphExecutor->>Agent: execute_node(agent_node)
Agent->>Agent: process_input
Agent-->>GraphExecutor: result
GraphExecutor->>WorkflowState: update_state(result)
GraphExecutor->>BudgetTracker: add_tokens(used)
GraphExecutor->>BudgetTracker: check_budget()
alt Budget exceeded
GraphExecutor->>GraphOrchestrator: emit(error_event)
else Continue
GraphExecutor->>GraphOrchestrator: emit(progress_event)
end
end
GraphOrchestrator->>User: AsyncGenerator[AgentEvent]
```
### Iterative Research
```mermaid
sequenceDiagram
participant IterativeFlow
participant ThinkingAgent
participant KnowledgeGapAgent
participant ToolSelector
participant ToolExecutor
participant JudgeHandler
participant WriterAgent
IterativeFlow->>IterativeFlow: run(query)
loop Until complete or max_iterations
IterativeFlow->>ThinkingAgent: generate_observations()
ThinkingAgent-->>IterativeFlow: observations
IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
alt Research complete
IterativeFlow->>WriterAgent: create_final_report()
WriterAgent-->>IterativeFlow: final_report
else Gaps remain
IterativeFlow->>ToolSelector: select_agents(gap)
ToolSelector-->>IterativeFlow: AgentSelectionPlan
IterativeFlow->>ToolExecutor: execute_tool_tasks()
ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
IterativeFlow->>JudgeHandler: assess_evidence()
JudgeHandler-->>IterativeFlow: should_continue
end
end
```
## Graph Structure
### Nodes
Graph nodes represent different stages in the research workflow:
1. **Agent Nodes**: Execute Pydantic AI agents
- Input: Prompt/query
- Output: Structured or unstructured response
- Examples: `KnowledgeGapAgent`, `ToolSelectorAgent`, `ThinkingAgent`
2. **State Nodes**: Update or read workflow state
- Input: Current state
- Output: Updated state
- Examples: Update evidence, update conversation history
3. **Decision Nodes**: Make routing decisions based on conditions
- Input: Current state/results
- Output: Next node ID
- Examples: Continue research vs. complete research
4. **Parallel Nodes**: Execute multiple nodes concurrently
- Input: List of node IDs
- Output: Aggregated results
- Examples: Parallel iterative research loops
### Edges
Edges define transitions between nodes:
1. **Sequential Edges**: Always traversed (no condition)
- From: Source node
- To: Target node
- Condition: None (always True)
2. **Conditional Edges**: Traversed based on condition
- From: Source node
- To: Target node
- Condition: Callable that returns bool
- Example: If research complete β†’ go to writer, else β†’ continue loop
3. **Parallel Edges**: Used for parallel execution branches
- From: Parallel node
- To: Multiple target nodes
- Execution: All targets run concurrently
## State Management
State is managed via `WorkflowState` using `ContextVar` for thread-safe isolation:
- **Evidence**: Collected evidence from searches
- **Conversation**: Iteration history (gaps, tool calls, findings, thoughts)
- **Embedding Service**: For semantic search
State transitions occur at state nodes, which update the global workflow state.
## Execution Flow
1. **Graph Construction**: Build graph from nodes and edges using `create_iterative_graph()` or `create_deep_graph()`
2. **Graph Validation**: Ensure graph is valid (no cycles, all nodes reachable) via `ResearchGraph.validate_structure()`
3. **Graph Execution**: Traverse graph from entry node using `GraphOrchestrator._execute_graph()`
4. **Node Execution**: Execute each node based on type:
- **Agent Nodes**: Call `agent.run()` with transformed input
- **State Nodes**: Update workflow state via `state_updater` function
- **Decision Nodes**: Evaluate `decision_function` to get next node ID
- **Parallel Nodes**: Execute all parallel nodes concurrently via `asyncio.gather()`
5. **Edge Evaluation**: Determine next node(s) based on edges and conditions
6. **Parallel Execution**: Use `asyncio.gather()` for parallel nodes
7. **State Updates**: Update state at state nodes via `GraphExecutionContext.update_state()`
8. **Event Streaming**: Yield `AgentEvent` objects during execution for UI
### GraphExecutionContext
The `GraphExecutionContext` class manages execution state during graph traversal:
- **State**: Current `WorkflowState` instance
- **Budget Tracker**: `BudgetTracker` instance for budget enforcement
- **Node Results**: Dictionary storing results from each node execution
- **Visited Nodes**: Set of node IDs that have been executed
- **Current Node**: ID of the node currently being executed
Methods:
- `set_node_result(node_id, result)`: Store result from node execution
- `get_node_result(node_id)`: Retrieve stored result
- `has_visited(node_id)`: Check if node was visited
- `mark_visited(node_id)`: Mark node as visited
- `update_state(updater, data)`: Update workflow state
## Conditional Routing
Decision nodes evaluate conditions and return next node IDs:
- **Knowledge Gap Decision**: If `research_complete` β†’ writer, else β†’ tool selector
- **Budget Decision**: If budget exceeded β†’ exit, else β†’ continue
- **Iteration Decision**: If max iterations β†’ exit, else β†’ continue
## Parallel Execution
Parallel nodes execute multiple nodes concurrently:
- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches
## Budget Enforcement
Budget constraints are enforced at decision nodes:
- **Token Budget**: Track LLM token usage
- **Time Budget**: Track elapsed time
- **Iteration Budget**: Track iteration count
If any budget is exceeded, execution routes to exit node.
## Error Handling
Errors are handled at multiple levels:
1. **Node Level**: Catch errors in individual node execution
2. **Graph Level**: Handle errors during graph traversal
3. **State Level**: Rollback state changes on error
Errors are logged and yield error events for UI.
## Backward Compatibility
Graph execution is optional via feature flag:
- `USE_GRAPH_EXECUTION=true`: Use graph-based execution
- `USE_GRAPH_EXECUTION=false`: Use agent chain execution (existing)
This allows gradual migration and fallback if needed.
## See Also
- [Orchestrators](orchestrators.md) - Overview of all orchestrator patterns
- [Workflow Diagrams](workflow-diagrams.md) - Detailed workflow diagrams
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation