File size: 3,419 Bytes
92f3410
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Real-time progress tracking for multi-agent pipeline."""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional


class AgentStatus(str, Enum):
    """Status of an agent in the pipeline."""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class AgentProgress:
    """Progress information for a single agent."""
    name: str
    status: AgentStatus
    progress: float  # 0.0 to 1.0
    message: str
    start_time: Optional[float] = None
    end_time: Optional[float] = None
    error: Optional[str] = None


@dataclass
class PipelineProgress:
    """Overall pipeline progress tracking."""
    agents: List[AgentProgress] = field(default_factory=list)
    current_step: int = 0
    total_steps: int = 7
    overall_progress: float = 0.0
    status_message: str = "Initializing..."
    
    def update_agent(self, name: str, status: AgentStatus, message: str = "", progress: float = 0.0, error: Optional[str] = None):
        """Update progress for a specific agent."""
        import time
        
        agent = next((a for a in self.agents if a.name == name), None)
        if not agent:
            agent = AgentProgress(name=name, status=status, progress=progress, message=message, error=error)
            self.agents.append(agent)
            agent.start_time = time.time()
        else:
            agent.status = status
            agent.progress = progress
            agent.message = message
            agent.error = error
            if status in [AgentStatus.COMPLETED, AgentStatus.FAILED, AgentStatus.SKIPPED]:
                agent.end_time = time.time()
        
        self._update_overall()
    
    def _update_overall(self):
        """Update overall progress based on agent statuses."""
        if not self.agents:
            self.overall_progress = 0.0
            return
        
        completed = sum(1 for a in self.agents if a.status == AgentStatus.COMPLETED)
        total = len(self.agents)
        self.overall_progress = completed / total if total > 0 else 0.0
        
        # Update status message
        running = next((a for a in self.agents if a.status == AgentStatus.RUNNING), None)
        if running:
            self.status_message = f"Running {running.name}... ({running.message})"
        elif completed == total:
            self.status_message = "Pipeline completed successfully!"
        else:
            failed = [a for a in self.agents if a.status == AgentStatus.FAILED]
            if failed:
                self.status_message = f"Pipeline failed: {failed[0].name}"
            else:
                self.status_message = "Pipeline in progress..."
    
    def to_dict(self) -> Dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "agents": [
                {
                    "name": a.name,
                    "status": a.status.value,
                    "progress": a.progress,
                    "message": a.message,
                    "error": a.error
                }
                for a in self.agents
            ],
            "current_step": self.current_step,
            "total_steps": self.total_steps,
            "overall_progress": self.overall_progress,
            "status_message": self.status_message
        }