File size: 10,837 Bytes
016b413
 
d45d242
 
 
 
f173aad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca479
 
 
 
d45d242
 
a2ca479
 
 
 
 
 
 
 
d45d242
 
 
 
 
 
a2ca479
 
d45d242
 
a2ca479
d45d242
 
 
a2ca479
 
d45d242
 
 
 
 
 
 
 
 
a2ca479
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
016b413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d45d242
 
 
 
 
 
 
 
 
016b413
d45d242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
016b413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4ff56e
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# Graph Orchestration Architecture

## Overview

DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.

## Conversation History

DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:

1. **User Conversation History**: Multi-turn user interactions (from Gradio chat interface) stored as `list[ModelMessage]`
2. **Research Iteration History**: Internal research process state (existing `Conversation` model)

### Message History Flow

```
Gradio Chat History β†’ convert_gradio_to_message_history() β†’ GraphOrchestrator.run(message_history)
    ↓
GraphExecutionContext (stores message_history)
    ↓
Agent Nodes (receive message_history via agent.run())
    ↓
WorkflowState (persists user_message_history)
```

### Usage

Message history is automatically converted from Gradio format and passed through the orchestrator:

```python
# In app.py - automatic conversion
message_history = convert_gradio_to_message_history(history) if history else None
async for event in orchestrator.run(query, message_history=message_history):
    yield event
```

Agents receive message history through their `run()` methods:

```python
# In agent execution
if message_history:
    result = await agent.run(input_data, message_history=message_history)
```

## Graph Patterns

### Iterative Research Graph

The iterative research graph follows this pattern:

```
[Input] β†’ [Thinking] β†’ [Knowledge Gap] β†’ [Decision: Complete?]
                                              ↓ No          ↓ Yes
                                    [Tool Selector]    [Writer]
                                              ↓
                                    [Execute Tools] β†’ [Loop Back]
```

**Node IDs**: `thinking` β†’ `knowledge_gap` β†’ `continue_decision` β†’ `tool_selector`/`writer` β†’ `execute_tools` β†’ (loop back to `thinking`)

**Special Node Handling**:
- `execute_tools`: State node that uses `search_handler` to execute searches and add evidence to workflow state
- `continue_decision`: Decision node that routes based on `research_complete` flag from `KnowledgeGapOutput`

### Deep Research Graph

The deep research graph follows this pattern:

```
[Input] β†’ [Planner] β†’ [Store Plan] β†’ [Parallel Loops] β†’ [Collect Drafts] β†’ [Synthesizer]
                                        ↓         ↓         ↓
                                     [Loop1]  [Loop2]  [Loop3]
```

**Node IDs**: `planner` β†’ `store_plan` β†’ `parallel_loops` β†’ `collect_drafts` β†’ `synthesizer`

**Special Node Handling**:
- `planner`: Agent node that creates `ReportPlan` with report outline
- `store_plan`: State node that stores `ReportPlan` in context for parallel loops
- `parallel_loops`: Parallel node that executes `IterativeResearchFlow` instances for each section
- `collect_drafts`: State node that collects section drafts from parallel loops
- `synthesizer`: Agent node that calls `LongWriterAgent.write_report()` directly with `ReportDraft`

### Deep Research

```mermaid

sequenceDiagram
    actor User
    participant GraphOrchestrator
    participant InputParser
    participant GraphBuilder
    participant GraphExecutor
    participant Agent
    participant BudgetTracker
    participant WorkflowState

    User->>GraphOrchestrator: run(query)
    GraphOrchestrator->>InputParser: detect_research_mode(query)
    InputParser-->>GraphOrchestrator: mode (iterative/deep)
    GraphOrchestrator->>GraphBuilder: build_graph(mode)
    GraphBuilder-->>GraphOrchestrator: ResearchGraph
    GraphOrchestrator->>WorkflowState: init_workflow_state()
    GraphOrchestrator->>BudgetTracker: create_budget()
    GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
    
    loop For each node in graph
        GraphExecutor->>Agent: execute_node(agent_node)
        Agent->>Agent: process_input
        Agent-->>GraphExecutor: result
        GraphExecutor->>WorkflowState: update_state(result)
        GraphExecutor->>BudgetTracker: add_tokens(used)
        GraphExecutor->>BudgetTracker: check_budget()
        alt Budget exceeded
            GraphExecutor->>GraphOrchestrator: emit(error_event)
        else Continue
            GraphExecutor->>GraphOrchestrator: emit(progress_event)
        end
    end
    
    GraphOrchestrator->>User: AsyncGenerator[AgentEvent]

```

### Iterative Research

```mermaid
sequenceDiagram
    participant IterativeFlow
    participant ThinkingAgent
    participant KnowledgeGapAgent
    participant ToolSelector
    participant ToolExecutor
    participant JudgeHandler
    participant WriterAgent

    IterativeFlow->>IterativeFlow: run(query)
    
    loop Until complete or max_iterations
        IterativeFlow->>ThinkingAgent: generate_observations()
        ThinkingAgent-->>IterativeFlow: observations
        
        IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
        KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
        
        alt Research complete
            IterativeFlow->>WriterAgent: create_final_report()
            WriterAgent-->>IterativeFlow: final_report
        else Gaps remain
            IterativeFlow->>ToolSelector: select_agents(gap)
            ToolSelector-->>IterativeFlow: AgentSelectionPlan
            
            IterativeFlow->>ToolExecutor: execute_tool_tasks()
            ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
            
            IterativeFlow->>JudgeHandler: assess_evidence()
            JudgeHandler-->>IterativeFlow: should_continue
        end
    end
```


## Graph Structure

### Nodes

Graph nodes represent different stages in the research workflow:

1. **Agent Nodes**: Execute Pydantic AI agents
   - Input: Prompt/query
   - Output: Structured or unstructured response
   - Examples: `KnowledgeGapAgent`, `ToolSelectorAgent`, `ThinkingAgent`

2. **State Nodes**: Update or read workflow state
   - Input: Current state
   - Output: Updated state
   - Examples: Update evidence, update conversation history

3. **Decision Nodes**: Make routing decisions based on conditions
   - Input: Current state/results
   - Output: Next node ID
   - Examples: Continue research vs. complete research

4. **Parallel Nodes**: Execute multiple nodes concurrently
   - Input: List of node IDs
   - Output: Aggregated results
   - Examples: Parallel iterative research loops

### Edges

Edges define transitions between nodes:

1. **Sequential Edges**: Always traversed (no condition)
   - From: Source node
   - To: Target node
   - Condition: None (always True)

2. **Conditional Edges**: Traversed based on condition
   - From: Source node
   - To: Target node
   - Condition: Callable that returns bool
   - Example: If research complete β†’ go to writer, else β†’ continue loop

3. **Parallel Edges**: Used for parallel execution branches
   - From: Parallel node
   - To: Multiple target nodes
   - Execution: All targets run concurrently


## State Management

State is managed via `WorkflowState` using `ContextVar` for thread-safe isolation:

- **Evidence**: Collected evidence from searches
- **Conversation**: Iteration history (gaps, tool calls, findings, thoughts)
- **Embedding Service**: For semantic search

State transitions occur at state nodes, which update the global workflow state.

## Execution Flow

1. **Graph Construction**: Build graph from nodes and edges using `create_iterative_graph()` or `create_deep_graph()`
2. **Graph Validation**: Ensure graph is valid (no cycles, all nodes reachable) via `ResearchGraph.validate_structure()`
3. **Graph Execution**: Traverse graph from entry node using `GraphOrchestrator._execute_graph()`
4. **Node Execution**: Execute each node based on type:
   - **Agent Nodes**: Call `agent.run()` with transformed input
   - **State Nodes**: Update workflow state via `state_updater` function
   - **Decision Nodes**: Evaluate `decision_function` to get next node ID
   - **Parallel Nodes**: Execute all parallel nodes concurrently via `asyncio.gather()`
5. **Edge Evaluation**: Determine next node(s) based on edges and conditions
6. **Parallel Execution**: Use `asyncio.gather()` for parallel nodes
7. **State Updates**: Update state at state nodes via `GraphExecutionContext.update_state()`
8. **Event Streaming**: Yield `AgentEvent` objects during execution for UI

### GraphExecutionContext

The `GraphExecutionContext` class manages execution state during graph traversal:

- **State**: Current `WorkflowState` instance
- **Budget Tracker**: `BudgetTracker` instance for budget enforcement
- **Node Results**: Dictionary storing results from each node execution
- **Visited Nodes**: Set of node IDs that have been executed
- **Current Node**: ID of the node currently being executed

Methods:
- `set_node_result(node_id, result)`: Store result from node execution
- `get_node_result(node_id)`: Retrieve stored result
- `has_visited(node_id)`: Check if node was visited
- `mark_visited(node_id)`: Mark node as visited
- `update_state(updater, data)`: Update workflow state

## Conditional Routing

Decision nodes evaluate conditions and return next node IDs:

- **Knowledge Gap Decision**: If `research_complete` β†’ writer, else β†’ tool selector
- **Budget Decision**: If budget exceeded β†’ exit, else β†’ continue
- **Iteration Decision**: If max iterations β†’ exit, else β†’ continue

## Parallel Execution

Parallel nodes execute multiple nodes concurrently:

- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches

## Budget Enforcement

Budget constraints are enforced at decision nodes:

- **Token Budget**: Track LLM token usage
- **Time Budget**: Track elapsed time
- **Iteration Budget**: Track iteration count

If any budget is exceeded, execution routes to exit node.

## Error Handling

Errors are handled at multiple levels:

1. **Node Level**: Catch errors in individual node execution
2. **Graph Level**: Handle errors during graph traversal
3. **State Level**: Rollback state changes on error

Errors are logged and yield error events for UI.

## Backward Compatibility

Graph execution is optional via feature flag:

- `USE_GRAPH_EXECUTION=true`: Use graph-based execution
- `USE_GRAPH_EXECUTION=false`: Use agent chain execution (existing)

This allows gradual migration and fallback if needed.

## See Also

- [Orchestrators](orchestrators.md) - Overview of all orchestrator patterns
- [Workflow Diagrams](workflow-diagrams.md) - Detailed workflow diagrams
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation