"""Real-time progress tracking for multi-agent pipeline.""" from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Dict, List, Optional class AgentStatus(str, Enum): """Status of an agent in the pipeline.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" @dataclass class AgentProgress: """Progress information for a single agent.""" name: str status: AgentStatus progress: float # 0.0 to 1.0 message: str start_time: Optional[float] = None end_time: Optional[float] = None error: Optional[str] = None @dataclass class PipelineProgress: """Overall pipeline progress tracking.""" agents: List[AgentProgress] = field(default_factory=list) current_step: int = 0 total_steps: int = 7 overall_progress: float = 0.0 status_message: str = "Initializing..." def update_agent(self, name: str, status: AgentStatus, message: str = "", progress: float = 0.0, error: Optional[str] = None): """Update progress for a specific agent.""" import time agent = next((a for a in self.agents if a.name == name), None) if not agent: agent = AgentProgress(name=name, status=status, progress=progress, message=message, error=error) self.agents.append(agent) agent.start_time = time.time() else: agent.status = status agent.progress = progress agent.message = message agent.error = error if status in [AgentStatus.COMPLETED, AgentStatus.FAILED, AgentStatus.SKIPPED]: agent.end_time = time.time() self._update_overall() def _update_overall(self): """Update overall progress based on agent statuses.""" if not self.agents: self.overall_progress = 0.0 return completed = sum(1 for a in self.agents if a.status == AgentStatus.COMPLETED) total = len(self.agents) self.overall_progress = completed / total if total > 0 else 0.0 # Update status message running = next((a for a in self.agents if a.status == AgentStatus.RUNNING), None) if running: self.status_message = f"Running {running.name}... ({running.message})" elif completed == total: self.status_message = "Pipeline completed successfully!" else: failed = [a for a in self.agents if a.status == AgentStatus.FAILED] if failed: self.status_message = f"Pipeline failed: {failed[0].name}" else: self.status_message = "Pipeline in progress..." def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization.""" return { "agents": [ { "name": a.name, "status": a.status.value, "progress": a.progress, "message": a.message, "error": a.error } for a in self.agents ], "current_step": self.current_step, "total_steps": self.total_steps, "overall_progress": self.overall_progress, "status_message": self.status_message }