"""FastMCP server exposing vault and indexing tools.""" from __future__ import annotations import logging import os import time from typing import Any, Dict, List, Optional from dotenv import load_dotenv from fastmcp import FastMCP from fastmcp.tools.tool import ToolResult from mcp.types import TextContent from pydantic import Field # Load environment variables from .env file load_dotenv() from ..services import IndexerService, VaultNote, VaultService from ..services.auth import AuthError, AuthService from ..services.config import get_config, PROJECT_ROOT try: from fastmcp.server.http import _current_http_request # type: ignore except ImportError: # pragma: no cover _current_http_request = None logger = logging.getLogger(__name__) mcp = FastMCP( "obsidian-docs-viewer", instructions=( "Multi-tenant vault tools. STDIO uses user_id 'local-dev'; HTTP mode must validate each " "request with JWT.sub. Note paths must be relative '.md' files under 256 chars without '..' or '\\'. " "Frontmatter is YAML: tags are string arrays and 'version' is reserved. Notes must be <=1 MiB; " "writes refresh created/updated timestamps and synchronously update the search index; deletes " "clear index rows and backlinks. Wikilinks use [[...]] slug matching (prefer same folder, else " "lexicographic). Search ranking = bm25(title*3, body*1) + recency bonus (+1 if <=7d, +0.5 if <=30d)." ), ) vault_service = VaultService() indexer_service = IndexerService() auth_service = AuthService() @mcp.resource("ui://widget/note.html", mime_type="text/html+skybridge") def widget_resource() -> str: """Return the widget HTML bundle.""" # Locate widget.html relative to project root # In Docker: /app/frontend/dist/widget.html # Local: frontend/dist/widget.html # We use PROJECT_ROOT from config widget_path = PROJECT_ROOT / "frontend" / "dist" / "widget.html" logger.info(f"Reading widget from: {widget_path}") if not widget_path.exists(): logger.error(f"Widget path does not exist: {widget_path}") return "Widget build not found. Please run 'npm run build' in frontend directory." try: html_content = widget_path.read_text(encoding="utf-8") logger.info(f"Widget content length: {len(html_content)}") if not html_content.strip(): logger.error("Widget file is empty!") return "Widget build file is empty." # Replace relative asset paths with absolute URLs for ChatGPT iframe config = get_config() base_url = config.hf_space_url.rstrip("/") logger.info(f"Injecting base URL: {base_url}") # Inject API_BASE_URL global for the widget to use html_content = html_content.replace( '', f'' ) # Vite builds usually output /assets/... html_content = html_content.replace('src="/assets/', f'src="{base_url}/assets/') html_content = html_content.replace('href="/assets/', f'href="{base_url}/assets/') return html_content except Exception as e: logger.exception(f"Failed to read widget file: {e}") return f"Server error reading widget: {e}" def _current_user_id() -> str: """Resolve the acting user ID (local mode defaults to local-dev).""" # HTTP transport (hosted) uses Authorization headers if _current_http_request is not None: try: request = _current_http_request.get() # type: ignore[call-arg] except LookupError: request = None if request is not None: header = request.headers.get("Authorization") # Check for No-Auth mode if header is missing if not header: config = get_config() if config.enable_noauth_mcp: return "demo-user" raise PermissionError("Authorization header required") scheme, _, token = header.partition(" ") if scheme.lower() != "bearer" or not token: raise PermissionError("Authorization header must be 'Bearer '") try: payload = auth_service.validate_jwt(token) except AuthError as exc: raise PermissionError(exc.message) from exc os.environ.setdefault("LOCAL_USER_ID", payload.sub) return payload.sub # STDIO / local fall back return os.getenv("LOCAL_USER_ID", "local-dev") def _note_to_response(note: VaultNote) -> Dict[str, Any]: return { "path": note["path"], "title": note["title"], "metadata": dict(note.get("metadata") or {}), "body": note.get("body", ""), } @mcp.tool( name="list_notes", description="List notes in the vault (optionally scoped to a folder).", ) def list_notes( folder: Optional[str] = Field( default=None, description="Optional relative folder (trim '/' ; no '..' or '\\').", ), ) -> List[Dict[str, Any]]: start_time = time.time() user_id = _current_user_id() notes = vault_service.list_notes(user_id, folder=folder) duration_ms = (time.time() - start_time) * 1000 logger.info( "MCP tool called", extra={ "tool_name": "list_notes", "user_id": user_id, "folder": folder or "(root)", "result_count": len(notes), "duration_ms": f"{duration_ms:.2f}", }, ) return [ { "path": entry["path"], "title": entry["title"], "last_modified": entry["last_modified"].isoformat(), } for entry in notes ] @mcp.tool(name="read_note", description="Read a Markdown note with metadata and body.") def read_note( path: str = Field( ..., description="Relative '.md' path ≤256 chars (no '..' or '\\')." ), ) -> dict: start_time = time.time() user_id = _current_user_id() note = vault_service.read_note(user_id, path) duration_ms = (time.time() - start_time) * 1000 logger.info( "MCP tool called", extra={ "tool_name": "read_note", "user_id": user_id, "note_path": path, "duration_ms": f"{duration_ms:.2f}", }, ) structured_note = { "title": note["title"], "note_path": note["path"], "body": note["body"], "metadata": note["metadata"], "updated": note["modified"].isoformat(), } return ToolResult( content=[TextContent(type="text", text=f"Read note: {note['title']}\n\n{note['body']}")], structured_content={"note": structured_note}, meta={ "openai/outputTemplate": "ui://widget/note.html", "openai/resultCanProduceWidget": True, "openai/toolInvocation/invoking": f"Opening {note['title']}...", "openai/toolInvocation/invoked": f"Loaded {note['title']}" } ) @mcp.tool( name="write_note", description="Create or update a note. Automatically updates frontmatter timestamps and search index.", ) def write_note( path: str = Field( ..., description="Relative '.md' path ≤256 chars (no '..' or '\\')." ), body: str = Field(..., description="Markdown body ≤1 MiB."), title: Optional[str] = Field( default=None, description="Optional title override; otherwise frontmatter/H1/filename is used.", ), metadata: Optional[Dict[str, Any]] = Field( default=None, description="Optional frontmatter dict (tags arrays of strings; 'version' reserved).", ), ) -> dict: start_time = time.time() user_id = _current_user_id() note = vault_service.write_note( user_id, path, title=title, metadata=metadata, body=body, ) indexer_service.index_note(user_id, note) duration_ms = (time.time() - start_time) * 1000 logger.info( "MCP tool called", extra={ "tool_name": "write_note", "user_id": user_id, "note_path": path, "duration_ms": f"{duration_ms:.2f}", }, ) structured_note = { "title": note["title"], "note_path": note["path"], "body": note["body"], "metadata": note["metadata"], "updated": note["modified"].isoformat(), } return ToolResult( content=[TextContent(type="text", text=f"Successfully saved note: {path}")], structured_content={"note": structured_note}, meta={ "openai/outputTemplate": "ui://widget/note.html", "openai/resultCanProduceWidget": True, "openai/toolInvocation/invoking": f"Saving {path}...", "openai/toolInvocation/invoked": f"Saved {path}" } ) @mcp.tool(name="delete_note", description="Delete a note and remove it from the index.") def delete_note( path: str = Field( ..., description="Relative '.md' path ≤256 chars (no '..' or '\\')." ), ) -> Dict[str, str]: start_time = time.time() user_id = _current_user_id() vault_service.delete_note(user_id, path) indexer_service.delete_note_index(user_id, path) duration_ms = (time.time() - start_time) * 1000 logger.info( "MCP tool called", extra={ "tool_name": "delete_note", "user_id": user_id, "note_path": path, "duration_ms": f"{duration_ms:.2f}", }, ) return {"status": "ok"} @mcp.tool( name="search_notes", description="Full-text search with snippets and recency-aware scoring.", meta={ "openai/outputTemplate": "ui://widget/note.html", "openai/toolInvocation/invoking": "Searching...", "openai/toolInvocation/invoked": "Search complete." } ) def search_notes( query: str = Field(..., description="Non-empty search query (bm25 + recency)."), limit: int = Field(50, ge=1, le=100, description="Result cap between 1 and 100."), ) -> ToolResult: start_time = time.time() user_id = _current_user_id() results = indexer_service.search_notes(user_id, query, limit=limit) duration_ms = (time.time() - start_time) * 1000 logger.info( "MCP tool called", extra={ "tool_name": "search_notes", "user_id": user_id, "query": query, "limit": limit, "result_count": len(results), "duration_ms": f"{duration_ms:.2f}", }, ) # Structure results for the widget structured_results = [] for r in results: structured_results.append({ "title": r["title"], "note_path": r["path"], "snippet": r["snippet"], "score": r["score"], "updated": r["updated"] if isinstance(r["updated"], str) else r["updated"].isoformat() }) return ToolResult( content=[TextContent(type="text", text=f"Found {len(results)} notes matching '{query}'.")], structured_content={"results": structured_results}, meta={ "openai/outputTemplate": "ui://widget/note.html", "openai/resultCanProduceWidget": True, "openai/toolInvocation/invoking": f"Searching for '{query}'...", "openai/toolInvocation/invoked": f"Found {len(results)} results." } ) @mcp.tool( name="get_backlinks", description="List notes that reference the target note." ) def get_backlinks( path: str = Field( ..., description="Relative '.md' path ≤256 chars (no '..' or '\\')." ), ) -> List[Dict[str, Any]]: user_id = _current_user_id() backlinks = indexer_service.get_backlinks(user_id, path) return backlinks @mcp.tool(name="get_tags", description="List tags and associated note counts.") def get_tags() -> List[Dict[str, Any]]: user_id = _current_user_id() return indexer_service.get_tags(user_id) if __name__ == "__main__": transport = os.getenv("MCP_TRANSPORT", "stdio").strip().lower() or "stdio" # Configure HTTP transport with custom port if specified if transport == "http": port = int(os.getenv("MCP_PORT", "8001")) host = os.getenv("MCP_HOST", "127.0.0.1") logger.info( "Starting MCP server", extra={"transport": transport, "host": host, "port": port}, ) mcp.run(transport=transport, host=host, port=port) else: logger.info("Starting MCP server", extra={"transport": transport}) mcp.run(transport=transport)