File size: 4,968 Bytes
1397957
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Event bus for OpenCode API - Pub/Sub system for real-time events"""

from typing import TypeVar, Generic, Callable, Dict, List, Any, Optional, Awaitable
from pydantic import BaseModel
import asyncio
from dataclasses import dataclass, field
import uuid


T = TypeVar("T", bound=BaseModel)


@dataclass
class Event(Generic[T]):
    """Event definition with type and payload schema"""
    type: str
    payload_type: type[T]
    
    def create(self, payload: T) -> "EventInstance":
        """Create an event instance"""
        return EventInstance(
            type=self.type,
            payload=payload.model_dump() if isinstance(payload, BaseModel) else payload
        )


@dataclass
class EventInstance:
    """An actual event instance with data"""
    type: str
    payload: Dict[str, Any]


class Bus:
    """
    Simple pub/sub event bus for real-time updates.
    Supports both sync and async subscribers.
    """
    
    _subscribers: Dict[str, List[Callable]] = {}
    _all_subscribers: List[Callable] = []
    _lock = asyncio.Lock()
    
    @classmethod
    async def publish(cls, event: Event | str, payload: BaseModel | Dict[str, Any]) -> None:
        """Publish an event to all subscribers. Event can be Event object or string type."""
        if isinstance(payload, BaseModel):
            payload_dict = payload.model_dump()
        else:
            payload_dict = payload
        
        event_type = event.type if isinstance(event, Event) else event
        instance = EventInstance(type=event_type, payload=payload_dict)
        
        async with cls._lock:
            # Notify type-specific subscribers
            for callback in cls._subscribers.get(event_type, []):
                try:
                    result = callback(instance)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    print(f"Error in event subscriber: {e}")
            
            # Notify all-event subscribers
            for callback in cls._all_subscribers:
                try:
                    result = callback(instance)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    print(f"Error in all-event subscriber: {e}")
    
    @classmethod
    def subscribe(cls, event_type: str, callback: Callable) -> Callable[[], None]:
        """Subscribe to a specific event type. Returns unsubscribe function."""
        if event_type not in cls._subscribers:
            cls._subscribers[event_type] = []
        cls._subscribers[event_type].append(callback)
        
        def unsubscribe():
            cls._subscribers[event_type].remove(callback)
        
        return unsubscribe
    
    @classmethod
    def subscribe_all(cls, callback: Callable) -> Callable[[], None]:
        """Subscribe to all events. Returns unsubscribe function."""
        cls._all_subscribers.append(callback)
        
        def unsubscribe():
            cls._all_subscribers.remove(callback)
        
        return unsubscribe
    
    @classmethod
    async def clear(cls) -> None:
        """Clear all subscribers"""
        async with cls._lock:
            cls._subscribers.clear()
            cls._all_subscribers.clear()


# Pre-defined events (matching TypeScript opencode events)
class SessionPayload(BaseModel):
    """Payload for session events"""
    id: str
    title: Optional[str] = None
    
class MessagePayload(BaseModel):
    """Payload for message events"""
    session_id: str
    message_id: str
    
class PartPayload(BaseModel):
    """Payload for message part events"""
    session_id: str
    message_id: str
    part_id: str
    delta: Optional[str] = None

class StepPayload(BaseModel):
    """Payload for agentic loop step events"""
    session_id: str
    step: int
    max_steps: int

class ToolStatePayload(BaseModel):
    """Payload for tool state change events"""
    session_id: str
    message_id: str
    part_id: str
    tool_name: str
    status: str  # "pending", "running", "completed", "error"
    time_start: Optional[str] = None
    time_end: Optional[str] = None


# Event definitions
SESSION_CREATED = Event(type="session.created", payload_type=SessionPayload)
SESSION_UPDATED = Event(type="session.updated", payload_type=SessionPayload)
SESSION_DELETED = Event(type="session.deleted", payload_type=SessionPayload)

MESSAGE_UPDATED = Event(type="message.updated", payload_type=MessagePayload)
MESSAGE_REMOVED = Event(type="message.removed", payload_type=MessagePayload)

PART_UPDATED = Event(type="part.updated", payload_type=PartPayload)
PART_REMOVED = Event(type="part.removed", payload_type=PartPayload)

STEP_STARTED = Event(type="step.started", payload_type=StepPayload)
STEP_FINISHED = Event(type="step.finished", payload_type=StepPayload)

TOOL_STATE_CHANGED = Event(type="tool.state.changed", payload_type=ToolStatePayload)