Spaces:
Sleeping
Sleeping
| """Storage module for OpenCode API - In-memory with optional file persistence""" | |
| from typing import TypeVar, Generic, Optional, Dict, Any, List, AsyncIterator | |
| from pydantic import BaseModel | |
| import json | |
| import os | |
| from pathlib import Path | |
| import asyncio | |
| from .config import settings | |
| T = TypeVar("T", bound=BaseModel) | |
| class NotFoundError(Exception): | |
| """Raised when a storage item is not found""" | |
| def __init__(self, key: List[str]): | |
| self.key = key | |
| super().__init__(f"Not found: {'/'.join(key)}") | |
| class Storage: | |
| """ | |
| Simple storage system using in-memory dict with optional file persistence. | |
| Keys are lists of strings that form a path (e.g., ["session", "project1", "ses_123"]) | |
| """ | |
| _data: Dict[str, Any] = {} | |
| _lock = asyncio.Lock() | |
| def _key_to_path(cls, key: List[str]) -> str: | |
| """Convert key list to storage path""" | |
| return "/".join(key) | |
| def _file_path(cls, key: List[str]) -> Path: | |
| """Get file path for persistent storage""" | |
| return Path(settings.storage_path) / "/".join(key[:-1]) / f"{key[-1]}.json" | |
| async def write(cls, key: List[str], data: BaseModel | Dict[str, Any]) -> None: | |
| """Write data to storage""" | |
| path = cls._key_to_path(key) | |
| if isinstance(data, BaseModel): | |
| value = data.model_dump() | |
| else: | |
| value = data | |
| async with cls._lock: | |
| cls._data[path] = value | |
| # Persist to file | |
| file_path = cls._file_path(key) | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| file_path.write_text(json.dumps(value, default=str)) | |
| async def read(cls, key: List[str], model: type[T] = None) -> Optional[T | Dict[str, Any]]: | |
| """Read data from storage""" | |
| path = cls._key_to_path(key) | |
| async with cls._lock: | |
| # Check in-memory first | |
| if path in cls._data: | |
| data = cls._data[path] | |
| if model: | |
| return model(**data) | |
| return data | |
| # Check file | |
| file_path = cls._file_path(key) | |
| if file_path.exists(): | |
| data = json.loads(file_path.read_text()) | |
| cls._data[path] = data | |
| if model: | |
| return model(**data) | |
| return data | |
| return None | |
| async def read_or_raise(cls, key: List[str], model: type[T] = None) -> T | Dict[str, Any]: | |
| """Read data from storage or raise NotFoundError""" | |
| result = await cls.read(key, model) | |
| if result is None: | |
| raise NotFoundError(key) | |
| return result | |
| async def update(cls, key: List[str], updater: callable, model: type[T] = None) -> T | Dict[str, Any]: | |
| """Update data in storage using an updater function""" | |
| data = await cls.read_or_raise(key, model) | |
| if isinstance(data, BaseModel): | |
| data_dict = data.model_dump() | |
| updater(data_dict) | |
| await cls.write(key, data_dict) | |
| if model: | |
| return model(**data_dict) | |
| return data_dict | |
| else: | |
| updater(data) | |
| await cls.write(key, data) | |
| return data | |
| async def remove(cls, key: List[str]) -> None: | |
| """Remove data from storage""" | |
| path = cls._key_to_path(key) | |
| async with cls._lock: | |
| cls._data.pop(path, None) | |
| file_path = cls._file_path(key) | |
| if file_path.exists(): | |
| file_path.unlink() | |
| async def list(cls, prefix: List[str]) -> List[List[str]]: | |
| """List all keys under a prefix""" | |
| prefix_path = cls._key_to_path(prefix) | |
| results = [] | |
| async with cls._lock: | |
| # Check in-memory | |
| for key in cls._data.keys(): | |
| if key.startswith(prefix_path + "/"): | |
| results.append(key.split("/")) | |
| # Check files | |
| dir_path = Path(settings.storage_path) / "/".join(prefix) | |
| if dir_path.exists(): | |
| for file_path in dir_path.glob("*.json"): | |
| key = prefix + [file_path.stem] | |
| if key not in results: | |
| results.append(key) | |
| return results | |
| async def clear(cls) -> None: | |
| """Clear all storage""" | |
| async with cls._lock: | |
| cls._data.clear() | |