Spaces:
Sleeping
Sleeping
File size: 4,703 Bytes
1397957 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
"""Storage module for OpenCode API - In-memory with optional file persistence"""
from typing import TypeVar, Generic, Optional, Dict, Any, List, AsyncIterator
from pydantic import BaseModel
import json
import os
from pathlib import Path
import asyncio
from .config import settings
T = TypeVar("T", bound=BaseModel)
class NotFoundError(Exception):
"""Raised when a storage item is not found"""
def __init__(self, key: List[str]):
self.key = key
super().__init__(f"Not found: {'/'.join(key)}")
class Storage:
"""
Simple storage system using in-memory dict with optional file persistence.
Keys are lists of strings that form a path (e.g., ["session", "project1", "ses_123"])
"""
_data: Dict[str, Any] = {}
_lock = asyncio.Lock()
@classmethod
def _key_to_path(cls, key: List[str]) -> str:
"""Convert key list to storage path"""
return "/".join(key)
@classmethod
def _file_path(cls, key: List[str]) -> Path:
"""Get file path for persistent storage"""
return Path(settings.storage_path) / "/".join(key[:-1]) / f"{key[-1]}.json"
@classmethod
async def write(cls, key: List[str], data: BaseModel | Dict[str, Any]) -> None:
"""Write data to storage"""
path = cls._key_to_path(key)
if isinstance(data, BaseModel):
value = data.model_dump()
else:
value = data
async with cls._lock:
cls._data[path] = value
# Persist to file
file_path = cls._file_path(key)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(json.dumps(value, default=str))
@classmethod
async def read(cls, key: List[str], model: type[T] = None) -> Optional[T | Dict[str, Any]]:
"""Read data from storage"""
path = cls._key_to_path(key)
async with cls._lock:
# Check in-memory first
if path in cls._data:
data = cls._data[path]
if model:
return model(**data)
return data
# Check file
file_path = cls._file_path(key)
if file_path.exists():
data = json.loads(file_path.read_text())
cls._data[path] = data
if model:
return model(**data)
return data
return None
@classmethod
async def read_or_raise(cls, key: List[str], model: type[T] = None) -> T | Dict[str, Any]:
"""Read data from storage or raise NotFoundError"""
result = await cls.read(key, model)
if result is None:
raise NotFoundError(key)
return result
@classmethod
async def update(cls, key: List[str], updater: callable, model: type[T] = None) -> T | Dict[str, Any]:
"""Update data in storage using an updater function"""
data = await cls.read_or_raise(key, model)
if isinstance(data, BaseModel):
data_dict = data.model_dump()
updater(data_dict)
await cls.write(key, data_dict)
if model:
return model(**data_dict)
return data_dict
else:
updater(data)
await cls.write(key, data)
return data
@classmethod
async def remove(cls, key: List[str]) -> None:
"""Remove data from storage"""
path = cls._key_to_path(key)
async with cls._lock:
cls._data.pop(path, None)
file_path = cls._file_path(key)
if file_path.exists():
file_path.unlink()
@classmethod
async def list(cls, prefix: List[str]) -> List[List[str]]:
"""List all keys under a prefix"""
prefix_path = cls._key_to_path(prefix)
results = []
async with cls._lock:
# Check in-memory
for key in cls._data.keys():
if key.startswith(prefix_path + "/"):
results.append(key.split("/"))
# Check files
dir_path = Path(settings.storage_path) / "/".join(prefix)
if dir_path.exists():
for file_path in dir_path.glob("*.json"):
key = prefix + [file_path.stem]
if key not in results:
results.append(key)
return results
@classmethod
async def clear(cls) -> None:
"""Clear all storage"""
async with cls._lock:
cls._data.clear()
|