File size: 3,419 Bytes
92f3410 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
"""Real-time progress tracking for multi-agent pipeline."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional
class AgentStatus(str, Enum):
"""Status of an agent in the pipeline."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class AgentProgress:
"""Progress information for a single agent."""
name: str
status: AgentStatus
progress: float # 0.0 to 1.0
message: str
start_time: Optional[float] = None
end_time: Optional[float] = None
error: Optional[str] = None
@dataclass
class PipelineProgress:
"""Overall pipeline progress tracking."""
agents: List[AgentProgress] = field(default_factory=list)
current_step: int = 0
total_steps: int = 7
overall_progress: float = 0.0
status_message: str = "Initializing..."
def update_agent(self, name: str, status: AgentStatus, message: str = "", progress: float = 0.0, error: Optional[str] = None):
"""Update progress for a specific agent."""
import time
agent = next((a for a in self.agents if a.name == name), None)
if not agent:
agent = AgentProgress(name=name, status=status, progress=progress, message=message, error=error)
self.agents.append(agent)
agent.start_time = time.time()
else:
agent.status = status
agent.progress = progress
agent.message = message
agent.error = error
if status in [AgentStatus.COMPLETED, AgentStatus.FAILED, AgentStatus.SKIPPED]:
agent.end_time = time.time()
self._update_overall()
def _update_overall(self):
"""Update overall progress based on agent statuses."""
if not self.agents:
self.overall_progress = 0.0
return
completed = sum(1 for a in self.agents if a.status == AgentStatus.COMPLETED)
total = len(self.agents)
self.overall_progress = completed / total if total > 0 else 0.0
# Update status message
running = next((a for a in self.agents if a.status == AgentStatus.RUNNING), None)
if running:
self.status_message = f"Running {running.name}... ({running.message})"
elif completed == total:
self.status_message = "Pipeline completed successfully!"
else:
failed = [a for a in self.agents if a.status == AgentStatus.FAILED]
if failed:
self.status_message = f"Pipeline failed: {failed[0].name}"
else:
self.status_message = "Pipeline in progress..."
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization."""
return {
"agents": [
{
"name": a.name,
"status": a.status.value,
"progress": a.progress,
"message": a.message,
"error": a.error
}
for a in self.agents
],
"current_step": self.current_step,
"total_steps": self.total_steps,
"overall_progress": self.overall_progress,
"status_message": self.status_message
}
|