File size: 4,229 Bytes
9ce984a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from __future__ import annotations

import ast
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Optional, Tuple

KB_DIRNAME = "knowledge_base"

@dataclass(frozen=True)
class KBItem:
    id: str
    category: str
    filename: str
    path: str
    summary: str

def project_root() -> Path:
    """Return project root (folder containing 'mcp_server' and 'knowledge_base')."""
    return Path(__file__).resolve().parent.parent

def kb_root() -> Path:
    """Return absolute path to knowledge_base directory."""
    return project_root() / KB_DIRNAME

def extract_docstring(p: Path) -> str:
    """Extract module docstring; fallback to first non-empty line if absent."""
    try:
        src = p.read_text(encoding="utf-8")
    except Exception:
        return ""
    try:
        module = ast.parse(src)
        doc = ast.get_docstring(module) or ""
        if doc:
            return " ".join(doc.strip().split())
        # fallback: first non-empty non-comment line
        for ln in src.splitlines():
            s = ln.strip()
            if s:
                return s[:300]
        return ""
    except Exception:
        return ""

def to_rel_path(p: Path) -> str:
    """Return path relative to project root for stable IDs."""
    try:
        return str(p.relative_to(project_root()))
    except Exception:
        return str(p)

def scan_knowledge_base() -> List[KBItem]:
    """Scan knowledge_base for Python examples and return KBItem list."""
    root = kb_root()
    if not root.exists():
        raise FileNotFoundError(f"Knowledge base folder not found: {root}")
    items: List[KBItem] = []
    for category_dir in sorted([d for d in root.iterdir() if d.is_dir()]):
        category = category_dir.name
        for py in sorted(category_dir.rglob("*.py")):
            filename = py.name
            rel_path = to_rel_path(py)
            summary = extract_docstring(py) or filename
            item_id = f"{category}/{filename}"
            items.append(
                KBItem(
                    id=item_id,
                    category=category,
                    filename=filename,
                    path=rel_path,
                    summary=summary,
                )
            )
    return items

# Simple in-process cache. Re-scan lazily on first access.
_ITEMS_CACHE: Optional[List[KBItem]] = None

def list_items_dict() -> List[dict]:
    """Return KB items as plain dicts suitable for JSON serialization."""
    global _ITEMS_CACHE
    if _ITEMS_CACHE is None:
        _ITEMS_CACHE = scan_knowledge_base()
    return [asdict(item) for item in _ITEMS_CACHE]

def get_embedding_text(item: KBItem) -> str:
    """Canonical string used for embedding a KB item."""
    head = (item.summary or "").strip()
    return f"{item.category}/{item.filename}: {head}"

def ensure_kb_path(path_str: str) -> Path:
    """
    Validate and resolve a path under knowledge_base.
    Accepts either:
      - 'knowledge_base/<category>/<file>.py'
      - '<category>/<file>.py'
    Raises if outside KB or not a Python file.
    """
    base = kb_root().resolve()
    # Normalize input
    candidate = Path(path_str)
    if not candidate.is_absolute():
        # Allow either direct rel to project root or category/file
        p1 = (project_root() / candidate).resolve()
        p2 = (base / candidate).resolve()
        # Prefer inside KB
        p = p1 if str(p1).startswith(str(base)) and p1.exists() else p2
    else:
        p = candidate.resolve()
    try:
        p.relative_to(base)
    except Exception:
        raise ValueError(f"Path is outside knowledge base: {path_str}")
    if not p.exists() or not p.is_file() or p.suffix != ".py":
        raise FileNotFoundError(f"KB file not found: {p}")
    return p

def read_code(path_str: str) -> str:
    """Read and return full source code of a KB file."""
    p = ensure_kb_path(path_str)
    return p.read_text(encoding="utf-8")

def get_items_for_embedding() -> List[Tuple[KBItem, str]]:
    """Return list of (KBItem, text) pairs for embedding."""
    global _ITEMS_CACHE
    if _ITEMS_CACHE is None:
        _ITEMS_CACHE = scan_knowledge_base()
    return [(it, get_embedding_text(it)) for it in _ITEMS_CACHE]