File size: 3,840 Bytes
026ee5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Middleware Architecture

DeepCritical uses middleware for state management, budget tracking, and workflow coordination.

## State Management

### WorkflowState

**File**: `src/middleware/state_machine.py`

**Purpose**: Thread-safe state management for research workflows

**Implementation**: Uses `ContextVar` for thread-safe isolation

**State Components**:
- `evidence: list[Evidence]`: Collected evidence from searches
- `conversation: Conversation`: Iteration history (gaps, tool calls, findings, thoughts)
- `embedding_service: Any`: Embedding service for semantic search

**Methods**:
- `add_evidence(evidence: Evidence)`: Adds evidence with URL-based deduplication
- `async search_related(query: str, top_k: int = 5) -> list[Evidence]`: Semantic search

**Initialization**:
```python
from src.middleware.state_machine import init_workflow_state

init_workflow_state(embedding_service)
```

**Access**:
```python
from src.middleware.state_machine import get_workflow_state

state = get_workflow_state()  # Auto-initializes if missing
```

## Workflow Manager

**File**: `src/middleware/workflow_manager.py`

**Purpose**: Coordinates parallel research loops

**Methods**:
- `add_loop(loop: ResearchLoop)`: Add a research loop to manage
- `async run_loops_parallel() -> list[ResearchLoop]`: Run all loops in parallel
- `update_loop_status(loop_id: str, status: str)`: Update loop status
- `sync_loop_evidence_to_state()`: Synchronize evidence from loops to global state

**Features**:
- Uses `asyncio.gather()` for parallel execution
- Handles errors per loop (doesn't fail all if one fails)
- Tracks loop status: `pending`, `running`, `completed`, `failed`, `cancelled`
- Evidence deduplication across parallel loops

**Usage**:
```python
from src.middleware.workflow_manager import WorkflowManager

manager = WorkflowManager()
manager.add_loop(loop1)
manager.add_loop(loop2)
completed_loops = await manager.run_loops_parallel()
```

## Budget Tracker

**File**: `src/middleware/budget_tracker.py`

**Purpose**: Tracks and enforces resource limits

**Budget Components**:
- **Tokens**: LLM token usage
- **Time**: Elapsed time in seconds
- **Iterations**: Number of iterations

**Methods**:
- `create_budget(token_limit, time_limit_seconds, iterations_limit) -> BudgetStatus`
- `add_tokens(tokens: int)`: Add token usage
- `start_timer()`: Start time tracking
- `update_timer()`: Update elapsed time
- `increment_iteration()`: Increment iteration count
- `check_budget() -> BudgetStatus`: Check current budget status
- `can_continue() -> bool`: Check if research can continue

**Token Estimation**:
- `estimate_tokens(text: str) -> int`: ~4 chars per token
- `estimate_llm_call_tokens(prompt: str, response: str) -> int`: Estimate LLM call tokens

**Usage**:
```python
from src.middleware.budget_tracker import BudgetTracker

tracker = BudgetTracker()
budget = tracker.create_budget(
    token_limit=100000,
    time_limit_seconds=600,
    iterations_limit=10
)
tracker.start_timer()
# ... research operations ...
if not tracker.can_continue():
    # Budget exceeded, stop research
    pass
```

## Models

All middleware models are defined in `src/utils/models.py`:

- `IterationData`: Data for a single iteration
- `Conversation`: Conversation history with iterations
- `ResearchLoop`: Research loop state and configuration
- `BudgetStatus`: Current budget status

## Thread Safety

All middleware components use `ContextVar` for thread-safe isolation:

- Each request/thread has its own workflow state
- No global mutable state
- Safe for concurrent requests

## See Also

- [Orchestrators](orchestrators.md) - How middleware is used in orchestration
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation
- [Contributing - Code Style](../contributing/code-style.md) - Development guidelines