File size: 4,812 Bytes
016b413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4ff56e
 
 
 
 
 
 
 
016b413
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Graph Orchestration Architecture

## Overview

Phase 4 implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.

## Graph Structure

### Nodes

Graph nodes represent different stages in the research workflow:

1. **Agent Nodes**: Execute Pydantic AI agents
   - Input: Prompt/query
   - Output: Structured or unstructured response
   - Examples: `KnowledgeGapAgent`, `ToolSelectorAgent`, `ThinkingAgent`

2. **State Nodes**: Update or read workflow state
   - Input: Current state
   - Output: Updated state
   - Examples: Update evidence, update conversation history

3. **Decision Nodes**: Make routing decisions based on conditions
   - Input: Current state/results
   - Output: Next node ID
   - Examples: Continue research vs. complete research

4. **Parallel Nodes**: Execute multiple nodes concurrently
   - Input: List of node IDs
   - Output: Aggregated results
   - Examples: Parallel iterative research loops

### Edges

Edges define transitions between nodes:

1. **Sequential Edges**: Always traversed (no condition)
   - From: Source node
   - To: Target node
   - Condition: None (always True)

2. **Conditional Edges**: Traversed based on condition
   - From: Source node
   - To: Target node
   - Condition: Callable that returns bool
   - Example: If research complete β†’ go to writer, else β†’ continue loop

3. **Parallel Edges**: Used for parallel execution branches
   - From: Parallel node
   - To: Multiple target nodes
   - Execution: All targets run concurrently

## Graph Patterns

### Iterative Research Graph

```
[Input] β†’ [Thinking] β†’ [Knowledge Gap] β†’ [Decision: Complete?]
                                              ↓ No          ↓ Yes
                                    [Tool Selector]    [Writer]
                                              ↓
                                    [Execute Tools] β†’ [Loop Back]
```

### Deep Research Graph

```
[Input] β†’ [Planner] β†’ [Parallel Iterative Loops] β†’ [Synthesizer]
                           ↓         ↓         ↓
                        [Loop1]  [Loop2]  [Loop3]
```

## State Management

State is managed via `WorkflowState` using `ContextVar` for thread-safe isolation:

- **Evidence**: Collected evidence from searches
- **Conversation**: Iteration history (gaps, tool calls, findings, thoughts)
- **Embedding Service**: For semantic search

State transitions occur at state nodes, which update the global workflow state.

## Execution Flow

1. **Graph Construction**: Build graph from nodes and edges
2. **Graph Validation**: Ensure graph is valid (no cycles, all nodes reachable)
3. **Graph Execution**: Traverse graph from entry node
4. **Node Execution**: Execute each node based on type
5. **Edge Evaluation**: Determine next node(s) based on edges
6. **Parallel Execution**: Use `asyncio.gather()` for parallel nodes
7. **State Updates**: Update state at state nodes
8. **Event Streaming**: Yield events during execution for UI

## Conditional Routing

Decision nodes evaluate conditions and return next node IDs:

- **Knowledge Gap Decision**: If `research_complete` β†’ writer, else β†’ tool selector
- **Budget Decision**: If budget exceeded β†’ exit, else β†’ continue
- **Iteration Decision**: If max iterations β†’ exit, else β†’ continue

## Parallel Execution

Parallel nodes execute multiple nodes concurrently:

- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches

## Budget Enforcement

Budget constraints are enforced at decision nodes:

- **Token Budget**: Track LLM token usage
- **Time Budget**: Track elapsed time
- **Iteration Budget**: Track iteration count

If any budget is exceeded, execution routes to exit node.

## Error Handling

Errors are handled at multiple levels:

1. **Node Level**: Catch errors in individual node execution
2. **Graph Level**: Handle errors during graph traversal
3. **State Level**: Rollback state changes on error

Errors are logged and yield error events for UI.

## Backward Compatibility

Graph execution is optional via feature flag:

- `USE_GRAPH_EXECUTION=true`: Use graph-based execution
- `USE_GRAPH_EXECUTION=false`: Use agent chain execution (existing)

This allows gradual migration and fallback if needed.

## See Also

- [Orchestrators](orchestrators.md) - Overview of all orchestrator patterns
- [Workflows](workflows.md) - Workflow diagrams and patterns
- [Workflow Diagrams](workflow-diagrams.md) - Detailed workflow diagrams
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation