|
|
import asyncio |
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import numpy as np |
|
|
import pickle |
|
|
import gzip |
|
|
import asyncpg |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
import base64 |
|
|
|
|
|
class EnhancedDatabaseManager: |
|
|
"""Enhanced Database Manager that stores everything in PostgreSQL + Vercel Blob""" |
|
|
|
|
|
def __init__(self, database_url: str): |
|
|
self.database_url = database_url |
|
|
self.pool = None |
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
async def connect(self): |
|
|
"""Initialize database connection pool""" |
|
|
try: |
|
|
self.pool = await asyncpg.create_pool( |
|
|
self.database_url, |
|
|
min_size=2, |
|
|
max_size=20, |
|
|
command_timeout=60 |
|
|
) |
|
|
self.logger.info("Enhanced database connection pool created successfully") |
|
|
|
|
|
|
|
|
await self._create_all_tables() |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Database connection failed: {e}") |
|
|
raise |
|
|
|
|
|
async def _create_all_tables(self): |
|
|
"""Create all tables for comprehensive storage""" |
|
|
async with self.pool.acquire() as conn: |
|
|
await conn.execute(""" |
|
|
-- RAG instances metadata |
|
|
CREATE TABLE IF NOT EXISTS rag_instances ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
ai_type VARCHAR(50) NOT NULL, |
|
|
user_id VARCHAR(100), |
|
|
ai_id VARCHAR(100), |
|
|
name VARCHAR(255) NOT NULL, |
|
|
description TEXT, |
|
|
|
|
|
-- Storage references |
|
|
blob_url TEXT, |
|
|
config_json JSONB, |
|
|
|
|
|
-- Statistics |
|
|
total_chunks INTEGER DEFAULT 0, |
|
|
total_tokens INTEGER DEFAULT 0, |
|
|
file_count INTEGER DEFAULT 0, |
|
|
|
|
|
-- Timestamps |
|
|
created_at TIMESTAMP DEFAULT NOW(), |
|
|
updated_at TIMESTAMP DEFAULT NOW(), |
|
|
last_accessed_at TIMESTAMP DEFAULT NOW(), |
|
|
|
|
|
-- Status |
|
|
status VARCHAR(20) DEFAULT 'active', |
|
|
|
|
|
UNIQUE(ai_type, user_id, ai_id) |
|
|
); |
|
|
|
|
|
-- Knowledge files metadata |
|
|
CREATE TABLE IF NOT EXISTS knowledge_files ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE, |
|
|
filename VARCHAR(255) NOT NULL, |
|
|
original_filename VARCHAR(255), |
|
|
file_type VARCHAR(50), |
|
|
file_size INTEGER, |
|
|
|
|
|
-- Content storage |
|
|
content_text TEXT, |
|
|
content_blob BYTEA, |
|
|
|
|
|
-- Processing info |
|
|
processed_at TIMESTAMP DEFAULT NOW(), |
|
|
processing_status VARCHAR(20) DEFAULT 'pending', |
|
|
token_count INTEGER DEFAULT 0, |
|
|
|
|
|
-- Timestamps |
|
|
created_at TIMESTAMP DEFAULT NOW(), |
|
|
updated_at TIMESTAMP DEFAULT NOW() |
|
|
); |
|
|
|
|
|
-- RAG graph data (for large graphs, store in chunks) |
|
|
CREATE TABLE IF NOT EXISTS rag_graph_data ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE, |
|
|
data_type VARCHAR(20) NOT NULL, -- 'nodes', 'edges', 'attrs' |
|
|
chunk_index INTEGER DEFAULT 0, |
|
|
chunk_data JSONB, |
|
|
created_at TIMESTAMP DEFAULT NOW() |
|
|
); |
|
|
|
|
|
-- RAG vector data (for large embeddings, store in chunks) |
|
|
CREATE TABLE IF NOT EXISTS rag_vector_data ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE, |
|
|
data_type VARCHAR(20) NOT NULL, -- 'embeddings', 'metadata' |
|
|
chunk_index INTEGER DEFAULT 0, |
|
|
chunk_data JSONB, |
|
|
created_at TIMESTAMP DEFAULT NOW() |
|
|
); |
|
|
|
|
|
-- User conversations |
|
|
CREATE TABLE IF NOT EXISTS conversations ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
user_id VARCHAR(100) NOT NULL, |
|
|
rag_instance_id UUID REFERENCES rag_instances(id) ON DELETE CASCADE, |
|
|
title VARCHAR(255), |
|
|
created_at TIMESTAMP DEFAULT NOW(), |
|
|
updated_at TIMESTAMP DEFAULT NOW(), |
|
|
is_active BOOLEAN DEFAULT TRUE |
|
|
); |
|
|
|
|
|
-- Conversation messages |
|
|
CREATE TABLE IF NOT EXISTS conversation_messages ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE, |
|
|
role VARCHAR(20) NOT NULL, -- 'user', 'assistant' |
|
|
content TEXT NOT NULL, |
|
|
metadata JSONB DEFAULT '{}', |
|
|
created_at TIMESTAMP DEFAULT NOW() |
|
|
); |
|
|
|
|
|
-- System statistics |
|
|
CREATE TABLE IF NOT EXISTS system_stats ( |
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), |
|
|
stat_date DATE DEFAULT CURRENT_DATE, |
|
|
total_rag_instances INTEGER DEFAULT 0, |
|
|
total_conversations INTEGER DEFAULT 0, |
|
|
total_messages INTEGER DEFAULT 0, |
|
|
total_knowledge_files INTEGER DEFAULT 0, |
|
|
created_at TIMESTAMP DEFAULT NOW(), |
|
|
UNIQUE(stat_date) |
|
|
); |
|
|
|
|
|
-- Create indexes for performance |
|
|
CREATE INDEX IF NOT EXISTS idx_rag_instances_lookup ON rag_instances(ai_type, user_id, ai_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_rag_instances_status ON rag_instances(status); |
|
|
CREATE INDEX IF NOT EXISTS idx_rag_instances_user ON rag_instances(user_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_knowledge_files_rag ON knowledge_files(rag_instance_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_conversations_user ON conversations(user_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_conversation_messages_conv ON conversation_messages(conversation_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_rag_graph_data_rag ON rag_graph_data(rag_instance_id); |
|
|
CREATE INDEX IF NOT EXISTS idx_rag_vector_data_rag ON rag_vector_data(rag_instance_id); |
|
|
""") |
|
|
|
|
|
self.logger.info("Enhanced database tables created/verified successfully") |
|
|
|
|
|
async def save_complete_rag_instance( |
|
|
self, |
|
|
ai_type: str, |
|
|
user_id: Optional[str], |
|
|
ai_id: Optional[str], |
|
|
name: str, |
|
|
description: Optional[str], |
|
|
rag_state: Dict[str, Any], |
|
|
blob_url: Optional[str] = None |
|
|
) -> str: |
|
|
"""Save complete RAG instance with all data to database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
async with conn.transaction(): |
|
|
|
|
|
rag_instance_id = await conn.fetchval(""" |
|
|
INSERT INTO rag_instances ( |
|
|
ai_type, user_id, ai_id, name, description, blob_url, |
|
|
config_json, total_chunks, total_tokens, file_count |
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) |
|
|
ON CONFLICT (ai_type, user_id, ai_id) DO UPDATE SET |
|
|
name = EXCLUDED.name, |
|
|
description = EXCLUDED.description, |
|
|
blob_url = EXCLUDED.blob_url, |
|
|
config_json = EXCLUDED.config_json, |
|
|
total_chunks = EXCLUDED.total_chunks, |
|
|
total_tokens = EXCLUDED.total_tokens, |
|
|
file_count = EXCLUDED.file_count, |
|
|
updated_at = NOW() |
|
|
RETURNING id; |
|
|
""", |
|
|
ai_type, user_id, ai_id, name, description, blob_url, |
|
|
json.dumps(rag_state.get('config', {})), |
|
|
len(rag_state.get('vectors', {}).get('embeddings', [])), |
|
|
self._estimate_tokens(rag_state), |
|
|
0 |
|
|
) |
|
|
|
|
|
|
|
|
await conn.execute(""" |
|
|
DELETE FROM rag_graph_data WHERE rag_instance_id = $1 |
|
|
""", rag_instance_id) |
|
|
|
|
|
await conn.execute(""" |
|
|
DELETE FROM rag_vector_data WHERE rag_instance_id = $1 |
|
|
""", rag_instance_id) |
|
|
|
|
|
|
|
|
graph_data = rag_state.get('graph', {}) |
|
|
await self._save_graph_data(conn, rag_instance_id, graph_data) |
|
|
|
|
|
|
|
|
vector_data = rag_state.get('vectors', {}) |
|
|
await self._save_vector_data(conn, rag_instance_id, vector_data) |
|
|
|
|
|
return str(rag_instance_id) |
|
|
|
|
|
async def _save_graph_data(self, conn, rag_instance_id: str, graph_data: Dict[str, Any]): |
|
|
"""Save graph data in chunks to avoid size limits""" |
|
|
|
|
|
|
|
|
nodes = graph_data.get('nodes', []) |
|
|
if nodes: |
|
|
chunk_size = 1000 |
|
|
for i in range(0, len(nodes), chunk_size): |
|
|
chunk = nodes[i:i + chunk_size] |
|
|
await conn.execute(""" |
|
|
INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
""", rag_instance_id, 'nodes', i // chunk_size, json.dumps(chunk)) |
|
|
|
|
|
|
|
|
edges = graph_data.get('edges', []) |
|
|
if edges: |
|
|
chunk_size = 1000 |
|
|
for i in range(0, len(edges), chunk_size): |
|
|
chunk = edges[i:i + chunk_size] |
|
|
await conn.execute(""" |
|
|
INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
""", rag_instance_id, 'edges', i // chunk_size, json.dumps(chunk)) |
|
|
|
|
|
|
|
|
graph_attrs = graph_data.get('graph_attrs', {}) |
|
|
if graph_attrs: |
|
|
await conn.execute(""" |
|
|
INSERT INTO rag_graph_data (rag_instance_id, data_type, chunk_index, chunk_data) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
""", rag_instance_id, 'attrs', 0, json.dumps(graph_attrs)) |
|
|
|
|
|
async def _save_vector_data(self, conn, rag_instance_id: str, vector_data: Dict[str, Any]): |
|
|
"""Save vector data in chunks to avoid size limits""" |
|
|
|
|
|
|
|
|
embeddings = vector_data.get('embeddings', []) |
|
|
if embeddings: |
|
|
chunk_size = 100 |
|
|
for i in range(0, len(embeddings), chunk_size): |
|
|
chunk = embeddings[i:i + chunk_size] |
|
|
await conn.execute(""" |
|
|
INSERT INTO rag_vector_data (rag_instance_id, data_type, chunk_index, chunk_data) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
""", rag_instance_id, 'embeddings', i // chunk_size, json.dumps(chunk)) |
|
|
|
|
|
|
|
|
metadata = vector_data.get('metadata', []) |
|
|
if metadata: |
|
|
await conn.execute(""" |
|
|
INSERT INTO rag_vector_data (rag_instance_id, data_type, chunk_index, chunk_data) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
""", rag_instance_id, 'metadata', 0, json.dumps(metadata)) |
|
|
|
|
|
async def load_complete_rag_instance( |
|
|
self, |
|
|
ai_type: str, |
|
|
user_id: Optional[str] = None, |
|
|
ai_id: Optional[str] = None |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
"""Load complete RAG instance from database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
|
|
|
rag_instance = await conn.fetchrow(""" |
|
|
SELECT id, ai_type, user_id, ai_id, name, description, blob_url, |
|
|
config_json, total_chunks, total_tokens, file_count, |
|
|
created_at, updated_at, last_accessed_at, status |
|
|
FROM rag_instances |
|
|
WHERE ai_type = $1 AND user_id = $2 AND ai_id = $3 AND status = 'active' |
|
|
""", ai_type, user_id, ai_id) |
|
|
|
|
|
if not rag_instance: |
|
|
return None |
|
|
|
|
|
|
|
|
await conn.execute(""" |
|
|
UPDATE rag_instances SET last_accessed_at = NOW() WHERE id = $1 |
|
|
""", rag_instance['id']) |
|
|
|
|
|
|
|
|
graph_data = await self._load_graph_data(conn, rag_instance['id']) |
|
|
|
|
|
|
|
|
vector_data = await self._load_vector_data(conn, rag_instance['id']) |
|
|
|
|
|
return { |
|
|
"metadata": dict(rag_instance), |
|
|
"rag_state": { |
|
|
"graph": graph_data, |
|
|
"vectors": vector_data, |
|
|
"config": rag_instance['config_json'] or {}, |
|
|
"version": "1.0" |
|
|
} |
|
|
} |
|
|
|
|
|
async def _load_graph_data(self, conn, rag_instance_id: str) -> Dict[str, Any]: |
|
|
"""Load graph data from chunks""" |
|
|
|
|
|
|
|
|
nodes_chunks = await conn.fetch(""" |
|
|
SELECT chunk_index, chunk_data FROM rag_graph_data |
|
|
WHERE rag_instance_id = $1 AND data_type = 'nodes' |
|
|
ORDER BY chunk_index |
|
|
""", rag_instance_id) |
|
|
|
|
|
nodes = [] |
|
|
for chunk_row in nodes_chunks: |
|
|
nodes.extend(chunk_row['chunk_data']) |
|
|
|
|
|
|
|
|
edges_chunks = await conn.fetch(""" |
|
|
SELECT chunk_index, chunk_data FROM rag_graph_data |
|
|
WHERE rag_instance_id = $1 AND data_type = 'edges' |
|
|
ORDER BY chunk_index |
|
|
""", rag_instance_id) |
|
|
|
|
|
edges = [] |
|
|
for chunk_row in edges_chunks: |
|
|
edges.extend(chunk_row['chunk_data']) |
|
|
|
|
|
|
|
|
attrs_row = await conn.fetchrow(""" |
|
|
SELECT chunk_data FROM rag_graph_data |
|
|
WHERE rag_instance_id = $1 AND data_type = 'attrs' |
|
|
""", rag_instance_id) |
|
|
|
|
|
graph_attrs = attrs_row['chunk_data'] if attrs_row else {} |
|
|
|
|
|
return { |
|
|
"nodes": nodes, |
|
|
"edges": edges, |
|
|
"graph_attrs": graph_attrs |
|
|
} |
|
|
|
|
|
async def _load_vector_data(self, conn, rag_instance_id: str) -> Dict[str, Any]: |
|
|
"""Load vector data from chunks""" |
|
|
|
|
|
|
|
|
embeddings_chunks = await conn.fetch(""" |
|
|
SELECT chunk_index, chunk_data FROM rag_vector_data |
|
|
WHERE rag_instance_id = $1 AND data_type = 'embeddings' |
|
|
ORDER BY chunk_index |
|
|
""", rag_instance_id) |
|
|
|
|
|
embeddings = [] |
|
|
for chunk_row in embeddings_chunks: |
|
|
embeddings.extend(chunk_row['chunk_data']) |
|
|
|
|
|
|
|
|
metadata_row = await conn.fetchrow(""" |
|
|
SELECT chunk_data FROM rag_vector_data |
|
|
WHERE rag_instance_id = $1 AND data_type = 'metadata' |
|
|
""", rag_instance_id) |
|
|
|
|
|
metadata = metadata_row['chunk_data'] if metadata_row else [] |
|
|
|
|
|
return { |
|
|
"embeddings": embeddings, |
|
|
"metadata": metadata, |
|
|
"dimension": 1024 |
|
|
} |
|
|
|
|
|
async def save_knowledge_file( |
|
|
self, |
|
|
rag_instance_id: str, |
|
|
filename: str, |
|
|
original_filename: str, |
|
|
file_type: str, |
|
|
file_size: int, |
|
|
content_text: str, |
|
|
content_blob: Optional[bytes] = None |
|
|
) -> str: |
|
|
"""Save knowledge file to database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
file_id = await conn.fetchval(""" |
|
|
INSERT INTO knowledge_files ( |
|
|
rag_instance_id, filename, original_filename, file_type, |
|
|
file_size, content_text, content_blob, processing_status |
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) |
|
|
RETURNING id |
|
|
""", rag_instance_id, filename, original_filename, file_type, |
|
|
file_size, content_text, content_blob, 'processed') |
|
|
|
|
|
return str(file_id) |
|
|
|
|
|
async def get_knowledge_files(self, rag_instance_id: str) -> List[Dict[str, Any]]: |
|
|
"""Get all knowledge files for a RAG instance""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
files = await conn.fetch(""" |
|
|
SELECT id, filename, original_filename, file_type, file_size, |
|
|
content_text, processing_status, token_count, |
|
|
created_at, updated_at |
|
|
FROM knowledge_files |
|
|
WHERE rag_instance_id = $1 |
|
|
ORDER BY created_at DESC |
|
|
""", rag_instance_id) |
|
|
|
|
|
return [dict(file) for file in files] |
|
|
|
|
|
async def list_user_rag_instances(self, user_id: str) -> List[Dict[str, Any]]: |
|
|
"""List all RAG instances for a user""" |
|
|
async with self.pool.acquire() as conn: |
|
|
results = await conn.fetch(""" |
|
|
SELECT id, ai_type, ai_id, name, description, total_chunks, |
|
|
total_tokens, file_count, created_at, updated_at, |
|
|
last_accessed_at, status |
|
|
FROM rag_instances |
|
|
WHERE user_id = $1 AND status = 'active' |
|
|
ORDER BY created_at DESC |
|
|
""", user_id) |
|
|
|
|
|
return [dict(row) for row in results] |
|
|
|
|
|
async def save_conversation( |
|
|
self, |
|
|
user_id: str, |
|
|
rag_instance_id: str, |
|
|
title: Optional[str] = None |
|
|
) -> str: |
|
|
"""Save conversation to database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
conversation_id = await conn.fetchval(""" |
|
|
INSERT INTO conversations (user_id, rag_instance_id, title) |
|
|
VALUES ($1, $2, $3) |
|
|
RETURNING id |
|
|
""", user_id, rag_instance_id, title) |
|
|
|
|
|
return str(conversation_id) |
|
|
|
|
|
async def save_conversation_message( |
|
|
self, |
|
|
conversation_id: str, |
|
|
role: str, |
|
|
content: str, |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> str: |
|
|
"""Save conversation message to database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
message_id = await conn.fetchval(""" |
|
|
INSERT INTO conversation_messages (conversation_id, role, content, metadata) |
|
|
VALUES ($1, $2, $3, $4) |
|
|
RETURNING id |
|
|
""", conversation_id, role, content, json.dumps(metadata or {})) |
|
|
|
|
|
|
|
|
await conn.execute(""" |
|
|
UPDATE conversations SET updated_at = NOW() WHERE id = $1 |
|
|
""", conversation_id) |
|
|
|
|
|
return str(message_id) |
|
|
|
|
|
async def get_conversation_messages( |
|
|
self, |
|
|
conversation_id: str, |
|
|
limit: int = 50 |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Get conversation messages from database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
messages = await conn.fetch(""" |
|
|
SELECT id, role, content, metadata, created_at |
|
|
FROM conversation_messages |
|
|
WHERE conversation_id = $1 |
|
|
ORDER BY created_at DESC |
|
|
LIMIT $2 |
|
|
""", conversation_id, limit) |
|
|
|
|
|
return [dict(msg) for msg in reversed(messages)] |
|
|
|
|
|
async def get_user_conversations(self, user_id: str) -> List[Dict[str, Any]]: |
|
|
"""Get all conversations for a user""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
conversations = await conn.fetch(""" |
|
|
SELECT c.id, c.title, c.created_at, c.updated_at, |
|
|
r.name as ai_name, r.ai_type, |
|
|
(SELECT content FROM conversation_messages |
|
|
WHERE conversation_id = c.id |
|
|
ORDER BY created_at DESC LIMIT 1) as last_message |
|
|
FROM conversations c |
|
|
JOIN rag_instances r ON c.rag_instance_id = r.id |
|
|
WHERE c.user_id = $1 AND c.is_active = TRUE |
|
|
ORDER BY c.updated_at DESC |
|
|
""", user_id) |
|
|
|
|
|
return [dict(conv) for conv in conversations] |
|
|
|
|
|
async def update_system_stats(self): |
|
|
"""Update system statistics""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
|
|
|
stats = await conn.fetchrow(""" |
|
|
SELECT |
|
|
(SELECT COUNT(*) FROM rag_instances WHERE status = 'active') as rag_count, |
|
|
(SELECT COUNT(*) FROM conversations WHERE is_active = TRUE) as conv_count, |
|
|
(SELECT COUNT(*) FROM conversation_messages) as msg_count, |
|
|
(SELECT COUNT(*) FROM knowledge_files) as file_count |
|
|
""") |
|
|
|
|
|
|
|
|
await conn.execute(""" |
|
|
INSERT INTO system_stats ( |
|
|
stat_date, total_rag_instances, total_conversations, |
|
|
total_messages, total_knowledge_files |
|
|
) VALUES (CURRENT_DATE, $1, $2, $3, $4) |
|
|
ON CONFLICT (stat_date) DO UPDATE SET |
|
|
total_rag_instances = EXCLUDED.total_rag_instances, |
|
|
total_conversations = EXCLUDED.total_conversations, |
|
|
total_messages = EXCLUDED.total_messages, |
|
|
total_knowledge_files = EXCLUDED.total_knowledge_files |
|
|
""", stats['rag_count'], stats['conv_count'], stats['msg_count'], stats['file_count']) |
|
|
|
|
|
async def get_system_stats(self) -> Dict[str, Any]: |
|
|
"""Get system statistics""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
stats = await conn.fetchrow(""" |
|
|
SELECT * FROM system_stats |
|
|
ORDER BY stat_date DESC |
|
|
LIMIT 1 |
|
|
""") |
|
|
|
|
|
return dict(stats) if stats else {} |
|
|
|
|
|
async def delete_rag_instance(self, rag_instance_id: str): |
|
|
"""Soft delete a RAG instance""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
await conn.execute(""" |
|
|
UPDATE rag_instances |
|
|
SET status = 'deleted', updated_at = NOW() |
|
|
WHERE id = $1 |
|
|
""", rag_instance_id) |
|
|
|
|
|
async def cleanup_old_data(self, days_old: int = 30): |
|
|
"""Clean up old data from database""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
|
|
|
await conn.execute(""" |
|
|
DELETE FROM rag_instances |
|
|
WHERE status = 'deleted' AND updated_at < NOW() - INTERVAL '%s days' |
|
|
""", days_old) |
|
|
|
|
|
|
|
|
await conn.execute(""" |
|
|
DELETE FROM system_stats |
|
|
WHERE stat_date < CURRENT_DATE - INTERVAL '90 days' |
|
|
""") |
|
|
|
|
|
def _estimate_tokens(self, rag_state: Dict[str, Any]) -> int: |
|
|
"""Estimate token count from RAG state""" |
|
|
try: |
|
|
|
|
|
content_size = len(json.dumps(rag_state)) |
|
|
return content_size // 4 |
|
|
except: |
|
|
return 0 |
|
|
|
|
|
async def get_database_size(self) -> Dict[str, Any]: |
|
|
"""Get database size information""" |
|
|
|
|
|
async with self.pool.acquire() as conn: |
|
|
size_info = await conn.fetchrow(""" |
|
|
SELECT |
|
|
pg_size_pretty(pg_database_size(current_database())) as total_size, |
|
|
(SELECT COUNT(*) FROM rag_instances) as rag_instances, |
|
|
(SELECT COUNT(*) FROM knowledge_files) as knowledge_files, |
|
|
(SELECT COUNT(*) FROM conversations) as conversations, |
|
|
(SELECT COUNT(*) FROM conversation_messages) as messages, |
|
|
(SELECT COUNT(*) FROM rag_graph_data) as graph_chunks, |
|
|
(SELECT COUNT(*) FROM rag_vector_data) as vector_chunks |
|
|
""") |
|
|
|
|
|
return dict(size_info) |
|
|
|
|
|
async def test_connection(self) -> bool: |
|
|
"""Test database connection""" |
|
|
try: |
|
|
async with self.pool.acquire() as conn: |
|
|
await conn.fetchval("SELECT 1") |
|
|
return True |
|
|
except Exception as e: |
|
|
self.logger.error(f"Database connection test failed: {e}") |
|
|
return False |
|
|
|
|
|
async def close(self): |
|
|
"""Close database connection pool""" |
|
|
if self.pool: |
|
|
await self.pool.close() |
|
|
self.logger.info("Database connection pool closed") |