Spaces:
Sleeping
Sleeping
File size: 4,968 Bytes
1397957 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
"""Event bus for OpenCode API - Pub/Sub system for real-time events"""
from typing import TypeVar, Generic, Callable, Dict, List, Any, Optional, Awaitable
from pydantic import BaseModel
import asyncio
from dataclasses import dataclass, field
import uuid
T = TypeVar("T", bound=BaseModel)
@dataclass
class Event(Generic[T]):
"""Event definition with type and payload schema"""
type: str
payload_type: type[T]
def create(self, payload: T) -> "EventInstance":
"""Create an event instance"""
return EventInstance(
type=self.type,
payload=payload.model_dump() if isinstance(payload, BaseModel) else payload
)
@dataclass
class EventInstance:
"""An actual event instance with data"""
type: str
payload: Dict[str, Any]
class Bus:
"""
Simple pub/sub event bus for real-time updates.
Supports both sync and async subscribers.
"""
_subscribers: Dict[str, List[Callable]] = {}
_all_subscribers: List[Callable] = []
_lock = asyncio.Lock()
@classmethod
async def publish(cls, event: Event | str, payload: BaseModel | Dict[str, Any]) -> None:
"""Publish an event to all subscribers. Event can be Event object or string type."""
if isinstance(payload, BaseModel):
payload_dict = payload.model_dump()
else:
payload_dict = payload
event_type = event.type if isinstance(event, Event) else event
instance = EventInstance(type=event_type, payload=payload_dict)
async with cls._lock:
# Notify type-specific subscribers
for callback in cls._subscribers.get(event_type, []):
try:
result = callback(instance)
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"Error in event subscriber: {e}")
# Notify all-event subscribers
for callback in cls._all_subscribers:
try:
result = callback(instance)
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"Error in all-event subscriber: {e}")
@classmethod
def subscribe(cls, event_type: str, callback: Callable) -> Callable[[], None]:
"""Subscribe to a specific event type. Returns unsubscribe function."""
if event_type not in cls._subscribers:
cls._subscribers[event_type] = []
cls._subscribers[event_type].append(callback)
def unsubscribe():
cls._subscribers[event_type].remove(callback)
return unsubscribe
@classmethod
def subscribe_all(cls, callback: Callable) -> Callable[[], None]:
"""Subscribe to all events. Returns unsubscribe function."""
cls._all_subscribers.append(callback)
def unsubscribe():
cls._all_subscribers.remove(callback)
return unsubscribe
@classmethod
async def clear(cls) -> None:
"""Clear all subscribers"""
async with cls._lock:
cls._subscribers.clear()
cls._all_subscribers.clear()
# Pre-defined events (matching TypeScript opencode events)
class SessionPayload(BaseModel):
"""Payload for session events"""
id: str
title: Optional[str] = None
class MessagePayload(BaseModel):
"""Payload for message events"""
session_id: str
message_id: str
class PartPayload(BaseModel):
"""Payload for message part events"""
session_id: str
message_id: str
part_id: str
delta: Optional[str] = None
class StepPayload(BaseModel):
"""Payload for agentic loop step events"""
session_id: str
step: int
max_steps: int
class ToolStatePayload(BaseModel):
"""Payload for tool state change events"""
session_id: str
message_id: str
part_id: str
tool_name: str
status: str # "pending", "running", "completed", "error"
time_start: Optional[str] = None
time_end: Optional[str] = None
# Event definitions
SESSION_CREATED = Event(type="session.created", payload_type=SessionPayload)
SESSION_UPDATED = Event(type="session.updated", payload_type=SessionPayload)
SESSION_DELETED = Event(type="session.deleted", payload_type=SessionPayload)
MESSAGE_UPDATED = Event(type="message.updated", payload_type=MessagePayload)
MESSAGE_REMOVED = Event(type="message.removed", payload_type=MessagePayload)
PART_UPDATED = Event(type="part.updated", payload_type=PartPayload)
PART_REMOVED = Event(type="part.removed", payload_type=PartPayload)
STEP_STARTED = Event(type="step.started", payload_type=StepPayload)
STEP_FINISHED = Event(type="step.finished", payload_type=StepPayload)
TOOL_STATE_CHANGED = Event(type="tool.state.changed", payload_type=ToolStatePayload)
|