Spaces:
Running
Running
File size: 4,229 Bytes
9ce984a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from __future__ import annotations
import ast
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Optional, Tuple
KB_DIRNAME = "knowledge_base"
@dataclass(frozen=True)
class KBItem:
id: str
category: str
filename: str
path: str
summary: str
def project_root() -> Path:
"""Return project root (folder containing 'mcp_server' and 'knowledge_base')."""
return Path(__file__).resolve().parent.parent
def kb_root() -> Path:
"""Return absolute path to knowledge_base directory."""
return project_root() / KB_DIRNAME
def extract_docstring(p: Path) -> str:
"""Extract module docstring; fallback to first non-empty line if absent."""
try:
src = p.read_text(encoding="utf-8")
except Exception:
return ""
try:
module = ast.parse(src)
doc = ast.get_docstring(module) or ""
if doc:
return " ".join(doc.strip().split())
# fallback: first non-empty non-comment line
for ln in src.splitlines():
s = ln.strip()
if s:
return s[:300]
return ""
except Exception:
return ""
def to_rel_path(p: Path) -> str:
"""Return path relative to project root for stable IDs."""
try:
return str(p.relative_to(project_root()))
except Exception:
return str(p)
def scan_knowledge_base() -> List[KBItem]:
"""Scan knowledge_base for Python examples and return KBItem list."""
root = kb_root()
if not root.exists():
raise FileNotFoundError(f"Knowledge base folder not found: {root}")
items: List[KBItem] = []
for category_dir in sorted([d for d in root.iterdir() if d.is_dir()]):
category = category_dir.name
for py in sorted(category_dir.rglob("*.py")):
filename = py.name
rel_path = to_rel_path(py)
summary = extract_docstring(py) or filename
item_id = f"{category}/{filename}"
items.append(
KBItem(
id=item_id,
category=category,
filename=filename,
path=rel_path,
summary=summary,
)
)
return items
# Simple in-process cache. Re-scan lazily on first access.
_ITEMS_CACHE: Optional[List[KBItem]] = None
def list_items_dict() -> List[dict]:
"""Return KB items as plain dicts suitable for JSON serialization."""
global _ITEMS_CACHE
if _ITEMS_CACHE is None:
_ITEMS_CACHE = scan_knowledge_base()
return [asdict(item) for item in _ITEMS_CACHE]
def get_embedding_text(item: KBItem) -> str:
"""Canonical string used for embedding a KB item."""
head = (item.summary or "").strip()
return f"{item.category}/{item.filename}: {head}"
def ensure_kb_path(path_str: str) -> Path:
"""
Validate and resolve a path under knowledge_base.
Accepts either:
- 'knowledge_base/<category>/<file>.py'
- '<category>/<file>.py'
Raises if outside KB or not a Python file.
"""
base = kb_root().resolve()
# Normalize input
candidate = Path(path_str)
if not candidate.is_absolute():
# Allow either direct rel to project root or category/file
p1 = (project_root() / candidate).resolve()
p2 = (base / candidate).resolve()
# Prefer inside KB
p = p1 if str(p1).startswith(str(base)) and p1.exists() else p2
else:
p = candidate.resolve()
try:
p.relative_to(base)
except Exception:
raise ValueError(f"Path is outside knowledge base: {path_str}")
if not p.exists() or not p.is_file() or p.suffix != ".py":
raise FileNotFoundError(f"KB file not found: {p}")
return p
def read_code(path_str: str) -> str:
"""Read and return full source code of a KB file."""
p = ensure_kb_path(path_str)
return p.read_text(encoding="utf-8")
def get_items_for_embedding() -> List[Tuple[KBItem, str]]:
"""Return list of (KBItem, text) pairs for embedding."""
global _ITEMS_CACHE
if _ITEMS_CACHE is None:
_ITEMS_CACHE = scan_knowledge_base()
return [(it, get_embedding_text(it)) for it in _ITEMS_CACHE] |