File size: 4,703 Bytes
1397957
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Storage module for OpenCode API - In-memory with optional file persistence"""

from typing import TypeVar, Generic, Optional, Dict, Any, List, AsyncIterator
from pydantic import BaseModel
import json
import os
from pathlib import Path
import asyncio
from .config import settings

T = TypeVar("T", bound=BaseModel)


class NotFoundError(Exception):
    """Raised when a storage item is not found"""
    def __init__(self, key: List[str]):
        self.key = key
        super().__init__(f"Not found: {'/'.join(key)}")


class Storage:
    """
    Simple storage system using in-memory dict with optional file persistence.
    Keys are lists of strings that form a path (e.g., ["session", "project1", "ses_123"])
    """
    
    _data: Dict[str, Any] = {}
    _lock = asyncio.Lock()
    
    @classmethod
    def _key_to_path(cls, key: List[str]) -> str:
        """Convert key list to storage path"""
        return "/".join(key)
    
    @classmethod
    def _file_path(cls, key: List[str]) -> Path:
        """Get file path for persistent storage"""
        return Path(settings.storage_path) / "/".join(key[:-1]) / f"{key[-1]}.json"
    
    @classmethod
    async def write(cls, key: List[str], data: BaseModel | Dict[str, Any]) -> None:
        """Write data to storage"""
        path = cls._key_to_path(key)
        
        if isinstance(data, BaseModel):
            value = data.model_dump()
        else:
            value = data
        
        async with cls._lock:
            cls._data[path] = value
            
            # Persist to file
            file_path = cls._file_path(key)
            file_path.parent.mkdir(parents=True, exist_ok=True)
            file_path.write_text(json.dumps(value, default=str))
    
    @classmethod
    async def read(cls, key: List[str], model: type[T] = None) -> Optional[T | Dict[str, Any]]:
        """Read data from storage"""
        path = cls._key_to_path(key)
        
        async with cls._lock:
            # Check in-memory first
            if path in cls._data:
                data = cls._data[path]
                if model:
                    return model(**data)
                return data
            
            # Check file
            file_path = cls._file_path(key)
            if file_path.exists():
                data = json.loads(file_path.read_text())
                cls._data[path] = data
                if model:
                    return model(**data)
                return data
        
        return None
    
    @classmethod
    async def read_or_raise(cls, key: List[str], model: type[T] = None) -> T | Dict[str, Any]:
        """Read data from storage or raise NotFoundError"""
        result = await cls.read(key, model)
        if result is None:
            raise NotFoundError(key)
        return result
    
    @classmethod
    async def update(cls, key: List[str], updater: callable, model: type[T] = None) -> T | Dict[str, Any]:
        """Update data in storage using an updater function"""
        data = await cls.read_or_raise(key, model)
        
        if isinstance(data, BaseModel):
            data_dict = data.model_dump()
            updater(data_dict)
            await cls.write(key, data_dict)
            if model:
                return model(**data_dict)
            return data_dict
        else:
            updater(data)
            await cls.write(key, data)
            return data
    
    @classmethod
    async def remove(cls, key: List[str]) -> None:
        """Remove data from storage"""
        path = cls._key_to_path(key)
        
        async with cls._lock:
            cls._data.pop(path, None)
            
            file_path = cls._file_path(key)
            if file_path.exists():
                file_path.unlink()
    
    @classmethod
    async def list(cls, prefix: List[str]) -> List[List[str]]:
        """List all keys under a prefix"""
        prefix_path = cls._key_to_path(prefix)
        results = []
        
        async with cls._lock:
            # Check in-memory
            for key in cls._data.keys():
                if key.startswith(prefix_path + "/"):
                    results.append(key.split("/"))
            
            # Check files
            dir_path = Path(settings.storage_path) / "/".join(prefix)
            if dir_path.exists():
                for file_path in dir_path.glob("*.json"):
                    key = prefix + [file_path.stem]
                    if key not in results:
                        results.append(key)
        
        return results
    
    @classmethod
    async def clear(cls) -> None:
        """Clear all storage"""
        async with cls._lock:
            cls._data.clear()