Spaces:
Running
Running
File size: 10,837 Bytes
016b413 d45d242 f173aad a2ca479 d45d242 a2ca479 d45d242 a2ca479 d45d242 a2ca479 d45d242 a2ca479 d45d242 a2ca479 016b413 d45d242 016b413 d45d242 016b413 b4ff56e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# Graph Orchestration Architecture
## Overview
DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.
## Conversation History
DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:
1. **User Conversation History**: Multi-turn user interactions (from Gradio chat interface) stored as `list[ModelMessage]`
2. **Research Iteration History**: Internal research process state (existing `Conversation` model)
### Message History Flow
```
Gradio Chat History β convert_gradio_to_message_history() β GraphOrchestrator.run(message_history)
β
GraphExecutionContext (stores message_history)
β
Agent Nodes (receive message_history via agent.run())
β
WorkflowState (persists user_message_history)
```
### Usage
Message history is automatically converted from Gradio format and passed through the orchestrator:
```python
# In app.py - automatic conversion
message_history = convert_gradio_to_message_history(history) if history else None
async for event in orchestrator.run(query, message_history=message_history):
yield event
```
Agents receive message history through their `run()` methods:
```python
# In agent execution
if message_history:
result = await agent.run(input_data, message_history=message_history)
```
## Graph Patterns
### Iterative Research Graph
The iterative research graph follows this pattern:
```
[Input] β [Thinking] β [Knowledge Gap] β [Decision: Complete?]
β No β Yes
[Tool Selector] [Writer]
β
[Execute Tools] β [Loop Back]
```
**Node IDs**: `thinking` β `knowledge_gap` β `continue_decision` β `tool_selector`/`writer` β `execute_tools` β (loop back to `thinking`)
**Special Node Handling**:
- `execute_tools`: State node that uses `search_handler` to execute searches and add evidence to workflow state
- `continue_decision`: Decision node that routes based on `research_complete` flag from `KnowledgeGapOutput`
### Deep Research Graph
The deep research graph follows this pattern:
```
[Input] β [Planner] β [Store Plan] β [Parallel Loops] β [Collect Drafts] β [Synthesizer]
β β β
[Loop1] [Loop2] [Loop3]
```
**Node IDs**: `planner` β `store_plan` β `parallel_loops` β `collect_drafts` β `synthesizer`
**Special Node Handling**:
- `planner`: Agent node that creates `ReportPlan` with report outline
- `store_plan`: State node that stores `ReportPlan` in context for parallel loops
- `parallel_loops`: Parallel node that executes `IterativeResearchFlow` instances for each section
- `collect_drafts`: State node that collects section drafts from parallel loops
- `synthesizer`: Agent node that calls `LongWriterAgent.write_report()` directly with `ReportDraft`
### Deep Research
```mermaid
sequenceDiagram
actor User
participant GraphOrchestrator
participant InputParser
participant GraphBuilder
participant GraphExecutor
participant Agent
participant BudgetTracker
participant WorkflowState
User->>GraphOrchestrator: run(query)
GraphOrchestrator->>InputParser: detect_research_mode(query)
InputParser-->>GraphOrchestrator: mode (iterative/deep)
GraphOrchestrator->>GraphBuilder: build_graph(mode)
GraphBuilder-->>GraphOrchestrator: ResearchGraph
GraphOrchestrator->>WorkflowState: init_workflow_state()
GraphOrchestrator->>BudgetTracker: create_budget()
GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
loop For each node in graph
GraphExecutor->>Agent: execute_node(agent_node)
Agent->>Agent: process_input
Agent-->>GraphExecutor: result
GraphExecutor->>WorkflowState: update_state(result)
GraphExecutor->>BudgetTracker: add_tokens(used)
GraphExecutor->>BudgetTracker: check_budget()
alt Budget exceeded
GraphExecutor->>GraphOrchestrator: emit(error_event)
else Continue
GraphExecutor->>GraphOrchestrator: emit(progress_event)
end
end
GraphOrchestrator->>User: AsyncGenerator[AgentEvent]
```
### Iterative Research
```mermaid
sequenceDiagram
participant IterativeFlow
participant ThinkingAgent
participant KnowledgeGapAgent
participant ToolSelector
participant ToolExecutor
participant JudgeHandler
participant WriterAgent
IterativeFlow->>IterativeFlow: run(query)
loop Until complete or max_iterations
IterativeFlow->>ThinkingAgent: generate_observations()
ThinkingAgent-->>IterativeFlow: observations
IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
alt Research complete
IterativeFlow->>WriterAgent: create_final_report()
WriterAgent-->>IterativeFlow: final_report
else Gaps remain
IterativeFlow->>ToolSelector: select_agents(gap)
ToolSelector-->>IterativeFlow: AgentSelectionPlan
IterativeFlow->>ToolExecutor: execute_tool_tasks()
ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
IterativeFlow->>JudgeHandler: assess_evidence()
JudgeHandler-->>IterativeFlow: should_continue
end
end
```
## Graph Structure
### Nodes
Graph nodes represent different stages in the research workflow:
1. **Agent Nodes**: Execute Pydantic AI agents
- Input: Prompt/query
- Output: Structured or unstructured response
- Examples: `KnowledgeGapAgent`, `ToolSelectorAgent`, `ThinkingAgent`
2. **State Nodes**: Update or read workflow state
- Input: Current state
- Output: Updated state
- Examples: Update evidence, update conversation history
3. **Decision Nodes**: Make routing decisions based on conditions
- Input: Current state/results
- Output: Next node ID
- Examples: Continue research vs. complete research
4. **Parallel Nodes**: Execute multiple nodes concurrently
- Input: List of node IDs
- Output: Aggregated results
- Examples: Parallel iterative research loops
### Edges
Edges define transitions between nodes:
1. **Sequential Edges**: Always traversed (no condition)
- From: Source node
- To: Target node
- Condition: None (always True)
2. **Conditional Edges**: Traversed based on condition
- From: Source node
- To: Target node
- Condition: Callable that returns bool
- Example: If research complete β go to writer, else β continue loop
3. **Parallel Edges**: Used for parallel execution branches
- From: Parallel node
- To: Multiple target nodes
- Execution: All targets run concurrently
## State Management
State is managed via `WorkflowState` using `ContextVar` for thread-safe isolation:
- **Evidence**: Collected evidence from searches
- **Conversation**: Iteration history (gaps, tool calls, findings, thoughts)
- **Embedding Service**: For semantic search
State transitions occur at state nodes, which update the global workflow state.
## Execution Flow
1. **Graph Construction**: Build graph from nodes and edges using `create_iterative_graph()` or `create_deep_graph()`
2. **Graph Validation**: Ensure graph is valid (no cycles, all nodes reachable) via `ResearchGraph.validate_structure()`
3. **Graph Execution**: Traverse graph from entry node using `GraphOrchestrator._execute_graph()`
4. **Node Execution**: Execute each node based on type:
- **Agent Nodes**: Call `agent.run()` with transformed input
- **State Nodes**: Update workflow state via `state_updater` function
- **Decision Nodes**: Evaluate `decision_function` to get next node ID
- **Parallel Nodes**: Execute all parallel nodes concurrently via `asyncio.gather()`
5. **Edge Evaluation**: Determine next node(s) based on edges and conditions
6. **Parallel Execution**: Use `asyncio.gather()` for parallel nodes
7. **State Updates**: Update state at state nodes via `GraphExecutionContext.update_state()`
8. **Event Streaming**: Yield `AgentEvent` objects during execution for UI
### GraphExecutionContext
The `GraphExecutionContext` class manages execution state during graph traversal:
- **State**: Current `WorkflowState` instance
- **Budget Tracker**: `BudgetTracker` instance for budget enforcement
- **Node Results**: Dictionary storing results from each node execution
- **Visited Nodes**: Set of node IDs that have been executed
- **Current Node**: ID of the node currently being executed
Methods:
- `set_node_result(node_id, result)`: Store result from node execution
- `get_node_result(node_id)`: Retrieve stored result
- `has_visited(node_id)`: Check if node was visited
- `mark_visited(node_id)`: Mark node as visited
- `update_state(updater, data)`: Update workflow state
## Conditional Routing
Decision nodes evaluate conditions and return next node IDs:
- **Knowledge Gap Decision**: If `research_complete` β writer, else β tool selector
- **Budget Decision**: If budget exceeded β exit, else β continue
- **Iteration Decision**: If max iterations β exit, else β continue
## Parallel Execution
Parallel nodes execute multiple nodes concurrently:
- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches
## Budget Enforcement
Budget constraints are enforced at decision nodes:
- **Token Budget**: Track LLM token usage
- **Time Budget**: Track elapsed time
- **Iteration Budget**: Track iteration count
If any budget is exceeded, execution routes to exit node.
## Error Handling
Errors are handled at multiple levels:
1. **Node Level**: Catch errors in individual node execution
2. **Graph Level**: Handle errors during graph traversal
3. **State Level**: Rollback state changes on error
Errors are logged and yield error events for UI.
## Backward Compatibility
Graph execution is optional via feature flag:
- `USE_GRAPH_EXECUTION=true`: Use graph-based execution
- `USE_GRAPH_EXECUTION=false`: Use agent chain execution (existing)
This allows gradual migration and fallback if needed.
## See Also
- [Orchestrators](orchestrators.md) - Overview of all orchestrator patterns
- [Workflow Diagrams](workflow-diagrams.md) - Detailed workflow diagrams
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation
|