Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import base64 | |
| from typing import Dict, List, Any | |
| import requests | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from datasets import load_dataset | |
| try: | |
| from langchain_core.documents import Document | |
| except ImportError: | |
| try: | |
| from langchain.docstore.document import Document | |
| except ImportError: | |
| try: | |
| from langchain.schema import Document | |
| except ImportError: | |
| # Fallback: Create a simple Document class | |
| class Document: | |
| def __init__(self, page_content: str, metadata: dict = None): | |
| self.page_content = page_content | |
| self.metadata = metadata or {} | |
| # Import RecursiveCharacterTextSplitter with fallback | |
| RecursiveCharacterTextSplitter = None | |
| try: | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| print("β Using langchain.text_splitter.RecursiveCharacterTextSplitter") | |
| except ImportError: | |
| try: | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| print("β Using langchain_text_splitters.RecursiveCharacterTextSplitter") | |
| except ImportError: | |
| print("β οΈ Using fallback RecursiveCharacterTextSplitter") | |
| # Fallback: Simple text splitter | |
| class RecursiveCharacterTextSplitter: | |
| def __init__(self, chunk_size=500, chunk_overlap=50, **kwargs): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| print(f"π Initialized fallback text splitter with chunk_size={chunk_size}") | |
| def split_documents(self, documents): | |
| """Simple document splitting fallback""" | |
| print(f"π Splitting {len(documents)} documents using fallback method...") | |
| result = [] | |
| for doc in documents: | |
| text = doc.page_content | |
| # Simple chunking | |
| for i in range(0, len(text), self.chunk_size - self.chunk_overlap): | |
| chunk = text[i:i + self.chunk_size] | |
| if chunk.strip(): | |
| result.append(Document(page_content=chunk, metadata=doc.metadata)) | |
| print(f"β Split into {len(result)} chunks") | |
| return result | |
| from langchain_community.retrievers import BM25Retriever | |
| # Load environment variables | |
| load_dotenv() | |
| class GitHubMCPServer: | |
| """GitHub MCP Server for repository scanning, file access, and CVE retrieval""" | |
| def __init__(self): | |
| self.github_token = os.getenv("GITHUB_TOKEN") | |
| if not self.github_token: | |
| raise ValueError("GITHUB_TOKEN environment variable is required") | |
| self.headers = { | |
| "Authorization": f"token {self.github_token}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| # Initialize CVE retriever | |
| self.cve_retriever = None | |
| self._initialize_cve_retriever() | |
| def _initialize_cve_retriever(self): | |
| """Initialize the CVE retriever with Hugging Face dataset""" | |
| try: | |
| print("π Loading CVE dataset from Hugging Face...") | |
| # Load CVE dataset from Hugging Face | |
| # Login using `huggingface-cli login` to access this dataset | |
| knowledge_base = load_dataset("CIRCL/vulnerability", split="train") | |
| print(f"π Loaded {len(knowledge_base)} vulnerability records from Hugging Face") | |
| # Debug: Print first few records to understand dataset structure | |
| print("π Dataset structure analysis:") | |
| print(f"Dataset columns: {knowledge_base.column_names}") | |
| for i in range(min(2, len(knowledge_base))): | |
| print(f"Record {i}: {dict(knowledge_base[i])}") | |
| # Filter to include only CVE entries (not GHSA) | |
| print("π Filtering for CVE entries only...") | |
| cve_dataset = knowledge_base.filter(lambda row: str(row["id"]).startswith("CVE-")) | |
| print(f"π Filtered to {len(cve_dataset)} CVE records (excluded GHSA entries)") | |
| # Convert dataset entries to Document objects with metadata | |
| source_docs = [] | |
| for record in cve_dataset: | |
| cve_id = record.get('id', '') | |
| description = record.get('description', '') | |
| # Skip records without essential information | |
| if not cve_id or not description: | |
| continue | |
| # Create document content | |
| content = f"CVE ID: {cve_id}\nDescription: {description}" | |
| # Create metadata | |
| metadata = { | |
| 'cve_id': str(cve_id), | |
| 'description': str(description) | |
| } | |
| source_docs.append(Document(page_content=content, metadata=metadata)) | |
| print(f"π Created {len(source_docs)} CVE document objects") | |
| if not source_docs: | |
| print("β No valid CVE documents found in dataset") | |
| self.cve_retriever = None | |
| return | |
| # Split documents into smaller chunks for better retrieval | |
| print("π Initializing text splitter...") | |
| try: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, # Characters per chunk | |
| chunk_overlap=50, # Overlap between chunks to maintain context | |
| add_start_index=True, | |
| strip_whitespace=True, | |
| separators=["\n\n", "\n", ".", " ", ""], # Priority order for splitting | |
| ) | |
| print("β Text splitter initialized successfully") | |
| except Exception as splitter_error: | |
| print(f"β Text splitter initialization failed: {splitter_error}") | |
| # Use simple fallback | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
| print("β Using simple fallback text splitter") | |
| print("π Processing documents with text splitter...") | |
| try: | |
| docs_processed = text_splitter.split_documents(source_docs) | |
| print(f"π Knowledge base prepared with {len(docs_processed)} document chunks") | |
| except Exception as processing_error: | |
| print(f"β Document processing failed: {processing_error}") | |
| # Use original documents without splitting as fallback | |
| docs_processed = source_docs | |
| print(f"β Using original documents without splitting: {len(docs_processed)} documents") | |
| # Initialize BM25 retriever | |
| print("π Initializing BM25 retriever...") | |
| try: | |
| self.cve_retriever = BM25Retriever.from_documents( | |
| docs_processed, | |
| k=3 | |
| ) | |
| print(f"β CVE Retriever initialized with {len(docs_processed)} document chunks") | |
| except Exception as retriever_error: | |
| print(f"β BM25 retriever initialization failed: {retriever_error}") | |
| self.cve_retriever = None | |
| except Exception as e: | |
| print(f"β Error initializing CVE retriever: {str(e)}") | |
| print("π‘ Make sure you have access to the Hugging Face dataset 'CIRCL/vulnerability'") | |
| print("π‘ You may need to login with: huggingface-cli login") | |
| print("π‘ Dataset columns should be: id, title, description, cpes") | |
| self.cve_retriever = None | |
| def get_repository_info(self, owner: str, repo: str) -> dict: | |
| """Get basic repository information""" | |
| try: | |
| url = f"https://api.github.com/repos/{owner}/{repo}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return { | |
| "success": True, | |
| "repository_name": data["name"], | |
| "full_name": data["full_name"], | |
| "description": data.get("description", "No description available"), | |
| "primary_language": data.get("language", "Unknown"), | |
| "size_kb": data["size"], | |
| "stars": data["stargazers_count"], | |
| "forks": data["forks_count"], | |
| "default_branch": data["default_branch"], | |
| "created_date": data["created_at"][:10], | |
| "last_updated": data["updated_at"][:10], | |
| "is_private": data["private"], | |
| "clone_url": data["clone_url"] | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Repository not found or inaccessible (HTTP {response.status_code})", | |
| "status_code": response.status_code | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to fetch repository information: {str(e)}" | |
| } | |
| def get_file_content(self, owner: str, repo: str, path: str) -> str: | |
| """Get content of a specific file - returns just the file content as string""" | |
| try: | |
| url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data["type"] == "file" and "content" in data: | |
| # Decode base64 content | |
| try: | |
| content = base64.b64decode(data["content"]).decode('utf-8') | |
| return content | |
| except UnicodeDecodeError: | |
| return f"ERROR: File '{path}' contains binary data that cannot be decoded as text" | |
| else: | |
| return f"ERROR: Path '{path}' is not a file or content is not available" | |
| else: | |
| return f"ERROR: File '{path}' not found or inaccessible (HTTP {response.status_code})" | |
| except Exception as e: | |
| return f"ERROR: Failed to fetch file content for '{path}': {str(e)}" | |
| def scan_repository(self, owner: str, repo: str, extensions: str = ".py,.js,.ts,.php,.java") -> list: | |
| """Scan repository for code files - returns simple list of file paths""" | |
| try: | |
| ext_list = [ext.strip() for ext in extensions.split(",") if ext.strip()] | |
| all_files = [] | |
| self._scan_directory_sync(owner, repo, "", ext_list, all_files) | |
| # Return simple list of file paths for easier processing by CodeAgent | |
| file_paths = [file_info.get('path', '') for file_info in all_files[:50]] # Limit to 50 files | |
| return file_paths | |
| except Exception as e: | |
| return [f"ERROR: Failed to scan repository: {str(e)}"] | |
| def _scan_directory_sync(self, owner: str, repo: str, path: str, extensions: List[str], all_files: List[Dict]): | |
| """Recursively scan directory for files""" | |
| try: | |
| url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| for item in data: | |
| if item["type"] == "file": | |
| if any(item["name"].endswith(ext) for ext in extensions): | |
| all_files.append({ | |
| "name": item["name"], | |
| "path": item["path"], | |
| "type": item["type"], | |
| "size": item.get("size", 0), | |
| "sha": item["sha"] | |
| }) | |
| elif item["type"] == "dir" and len(all_files) < 100: | |
| self._scan_directory_sync(owner, repo, item["path"], extensions, all_files) | |
| except Exception: | |
| pass | |
| def search_cve_database(self, query: str) -> str: | |
| """Search CVE database for relevant vulnerability information""" | |
| if not self.cve_retriever: | |
| return "β CVE retriever not properly initialized. Please check Hugging Face dataset access." | |
| try: | |
| # Retrieve relevant documents | |
| docs = self.cve_retriever.invoke(query) | |
| if not docs: | |
| return f"No relevant CVE information found for query: '{query}'" | |
| # Format the retrieved CVE information | |
| result = f"π **CVE Knowledge Base Results for: '{query}'**\n\n" | |
| for i, doc in enumerate(docs, 1): | |
| metadata = doc.metadata | |
| result += f"**Result {i}:**\n" | |
| result += f"- **CVE ID**: {metadata.get('cve_id', 'Unknown')}\n" | |
| # Extract description from content or metadata | |
| description = metadata.get('description', '') | |
| if not description: | |
| content_lines = doc.page_content.split('\n') | |
| desc_line = next((line for line in content_lines if line.startswith('Description:')), '') | |
| description = desc_line.replace('Description: ', '').strip() if desc_line else 'No description available' | |
| result += f"- **Description**: {description[:200]}{'...' if len(description) > 200 else ''}\n" | |
| result += "---\n" | |
| # Add summary of common patterns | |
| cve_ids = [doc.metadata.get('cve_id') for doc in docs if doc.metadata.get('cve_id')] | |
| result += f"\n**π Analysis Summary:**\n" | |
| result += f"- **CVE Examples**: {', '.join(cve_ids[:3])}{'...' if len(cve_ids) > 3 else ''}\n" | |
| result += f"- **Total Matches**: {len(docs)}\n" | |
| return result | |
| except Exception as e: | |
| return f"β Error retrieving CVE information: {str(e)}" | |
| def simple_cve_search(self, query: str, k: int = 3) -> str: | |
| """Simple CVE search that returns only CVE IDs and descriptions for multi-agent workflow""" | |
| if not self.cve_retriever: | |
| return "β CVE retriever not properly initialized. Please check Hugging Face dataset access." | |
| try: | |
| # Set retriever to return k results | |
| original_k = self.cve_retriever.k | |
| self.cve_retriever.k = k | |
| # Retrieve relevant documents | |
| docs = self.cve_retriever.invoke(query) | |
| # Restore original k | |
| self.cve_retriever.k = original_k | |
| if not docs: | |
| return f"No relevant CVE information found for query: '{query}'" | |
| # Format simple results - just CVE ID and description | |
| result = f"Top {len(docs)} CVE matches for '{query}':\n\n" | |
| for i, doc in enumerate(docs, 1): | |
| metadata = doc.metadata | |
| cve_id = metadata.get('cve_id', 'Unknown') | |
| # Extract description from metadata or content | |
| description = metadata.get('description', '') | |
| if not description: | |
| content_lines = doc.page_content.split('\n') | |
| desc_line = next((line for line in content_lines if line.startswith('Description:')), '') | |
| description = desc_line.replace('Description: ', '').strip() if desc_line else 'No description available' | |
| result += f"{i}. {cve_id}\n" | |
| result += f" {description[:150]}{'...' if len(description) > 150 else ''}\n\n" | |
| return result.strip() | |
| except Exception as e: | |
| return f"β Error retrieving CVE information: {str(e)}" | |
| def get_nvd_cve_details(self, cve_id: str) -> str: | |
| """ | |
| Fetches detailed CVE information from NVD (National Vulnerability Database). | |
| Args: | |
| cve_id: The CVE identifier (e.g., 'CVE-2019-16515') | |
| Returns: | |
| Formatted string containing detailed CVE information from NVD | |
| """ | |
| try: | |
| # Validate and clean CVE ID format | |
| cve_id = cve_id.strip().upper() | |
| if not cve_id.startswith('CVE-'): | |
| return f"β Invalid CVE ID format: '{cve_id}'\nCVE ID must start with 'CVE-' (e.g., CVE-2019-16515)" | |
| # NVD API endpoint | |
| nvd_api_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" | |
| nvd_web_url = f"https://nvd.nist.gov/vuln/detail/{cve_id}" | |
| # Make request to NVD API | |
| params = {"cveId": cve_id} | |
| headers = { | |
| "User-Agent": "VulnerabilityScanner/1.0 (GitHub Security Analysis Tool)" | |
| } | |
| print(f"π Fetching NVD details for {cve_id}...") | |
| response = requests.get(nvd_api_url, params=params, headers=headers, timeout=15) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Check if CVE was found | |
| if data.get('resultsPerPage', 0) == 0: | |
| return f"β οΈ CVE not found in NVD database: {cve_id}\n\nπ **NVD URL**: {nvd_web_url}\n\nNote: The CVE may not yet be published in NVD or the ID might be incorrect." | |
| # Extract vulnerability data | |
| vuln = data['vulnerabilities'][0]['cve'] | |
| # Build formatted result | |
| result = f"π **NVD CVE Details: {cve_id}**\n\n" | |
| result += f"π **NVD URL**: {nvd_web_url}\n\n" | |
| # Status and dates | |
| result += f"**Status**: {vuln.get('vulnStatus', 'N/A')}\n" | |
| result += f"**Published**: {vuln.get('published', 'N/A')[:10]}\n" | |
| result += f"**Last Modified**: {vuln.get('lastModified', 'N/A')[:10]}\n\n" | |
| # Description | |
| descriptions = vuln.get('descriptions', []) | |
| for desc in descriptions: | |
| if desc.get('lang') == 'en': | |
| result += f"**π Description**:\n{desc.get('value', 'N/A')}\n\n" | |
| break | |
| # CVSS Scores | |
| metrics = vuln.get('metrics', {}) | |
| # CVSS v3.x (preferred) | |
| if 'cvssMetricV31' in metrics or 'cvssMetricV30' in metrics: | |
| cvss_key = 'cvssMetricV31' if 'cvssMetricV31' in metrics else 'cvssMetricV30' | |
| cvss_v3 = metrics[cvss_key][0]['cvssData'] | |
| result += f"**π― CVSS v3 Score**:\n" | |
| result += f"- **Base Score**: {cvss_v3.get('baseScore', 'N/A')} ({cvss_v3.get('baseSeverity', 'N/A')})\n" | |
| result += f"- **Vector String**: {cvss_v3.get('vectorString', 'N/A')}\n" | |
| result += f"- **Attack Vector**: {cvss_v3.get('attackVector', 'N/A')}\n" | |
| result += f"- **Attack Complexity**: {cvss_v3.get('attackComplexity', 'N/A')}\n" | |
| result += f"- **Privileges Required**: {cvss_v3.get('privilegesRequired', 'N/A')}\n" | |
| result += f"- **User Interaction**: {cvss_v3.get('userInteraction', 'N/A')}\n" | |
| result += f"- **Scope**: {cvss_v3.get('scope', 'N/A')}\n" | |
| result += f"- **Confidentiality Impact**: {cvss_v3.get('confidentialityImpact', 'N/A')}\n" | |
| result += f"- **Integrity Impact**: {cvss_v3.get('integrityImpact', 'N/A')}\n" | |
| result += f"- **Availability Impact**: {cvss_v3.get('availabilityImpact', 'N/A')}\n\n" | |
| # CVSS v2 (if available) | |
| if 'cvssMetricV2' in metrics: | |
| cvss_v2 = metrics['cvssMetricV2'][0]['cvssData'] | |
| result += f"**CVSS v2 Score**:\n" | |
| result += f"- **Base Score**: {cvss_v2.get('baseScore', 'N/A')} ({metrics['cvssMetricV2'][0].get('baseSeverity', 'N/A')})\n" | |
| result += f"- **Vector String**: {cvss_v2.get('vectorString', 'N/A')}\n\n" | |
| # CWE (Common Weakness Enumeration) | |
| weaknesses = vuln.get('weaknesses', []) | |
| if weaknesses: | |
| result += f"**π CWE (Common Weakness Enumeration)**:\n" | |
| cwe_list = [] | |
| for weakness in weaknesses: | |
| for desc in weakness.get('description', []): | |
| if desc.get('lang') == 'en': | |
| cwe_list.append(desc.get('value', 'N/A')) | |
| result += f"- {', '.join(set(cwe_list))}\n\n" | |
| # References | |
| references = vuln.get('references', []) | |
| if references: | |
| result += f"**π References** (showing first 5):\n" | |
| for i, ref in enumerate(references[:5], 1): | |
| result += f"{i}. [{ref.get('source', 'Source')}]({ref.get('url', '#')})\n" | |
| if len(references) > 5: | |
| result += f"\n... and {len(references) - 5} more references\n" | |
| result += "\n" | |
| result += f"---\n" | |
| result += f"π‘ **Tip**: Use this CVE information to cross-reference vulnerabilities found in code analysis.\n" | |
| return result | |
| elif response.status_code == 404: | |
| return f"β οΈ CVE not found: {cve_id}\n\nπ **NVD URL**: {nvd_web_url}\n\nThe CVE may not exist or may not yet be published in NVD." | |
| elif response.status_code == 403: | |
| return f"β Access denied to NVD API (HTTP 403)\n\nThis might be due to rate limiting. Please try again in a few moments.\n\nπ **NVD URL**: {nvd_web_url}" | |
| else: | |
| return f"β NVD API request failed with status {response.status_code}\n\nπ **NVD URL**: {nvd_web_url}\n\nYou can view the CVE details directly on the NVD website." | |
| except requests.exceptions.Timeout: | |
| return f"β±οΈ Request to NVD API timed out for {cve_id}\n\nPlease try again or visit: {nvd_web_url}" | |
| except requests.exceptions.RequestException as e: | |
| return f"β Network error while fetching CVE details: {str(e)}\n\nπ **NVD URL**: {nvd_web_url}" | |
| except Exception as e: | |
| return f"β Unexpected error fetching NVD details for {cve_id}: {str(e)}\n\nπ **NVD URL**: {nvd_web_url}" | |
| def search_and_fetch_cve_details(self, query: str, max_nvd_fetches: int = 5) -> str: | |
| """ | |
| Smart combined function: Searches CVE database and automatically fetches NVD details. | |
| This function: | |
| 1. Searches the CVE knowledge base (RAG) for relevant vulnerabilities | |
| 2. Automatically parses CVE IDs from the results | |
| 3. Fetches detailed NVD information for top CVEs | |
| 4. Returns combined results with both RAG data and NVD details | |
| Args: | |
| query: Vulnerability search query (e.g., "SQL injection", "XSS") | |
| max_nvd_fetches: Maximum number of CVEs to fetch NVD details for (default: 5) | |
| Returns: | |
| Formatted string with RAG results + detailed NVD information | |
| """ | |
| import re | |
| import time | |
| try: | |
| # Step 1: Search CVE database using RAG | |
| print(f"π Step 1: Searching CVE knowledge base for '{query}'...") | |
| rag_results = self.search_cve_database(query) | |
| if "β" in rag_results or "No relevant CVE information found" in rag_results: | |
| return rag_results | |
| # Step 2: Parse CVE IDs from RAG results | |
| print(f"π Step 2: Parsing CVE IDs from results...") | |
| cve_pattern = r'CVE-\d{4}-\d{4,7}' | |
| cve_ids = re.findall(cve_pattern, rag_results) | |
| # Remove duplicates and limit to max_nvd_fetches | |
| unique_cve_ids = list(dict.fromkeys(cve_ids))[:max_nvd_fetches] | |
| if not unique_cve_ids: | |
| return rag_results + "\n\nβ οΈ No CVE IDs found in results to fetch NVD details." | |
| print(f"β Found {len(unique_cve_ids)} unique CVE IDs: {', '.join(unique_cve_ids)}") | |
| # Step 3: Build combined result | |
| combined_result = "π¬ **COMPREHENSIVE CVE ANALYSIS**\n" | |
| combined_result += "=" * 80 + "\n\n" | |
| # Include RAG results first | |
| combined_result += "## π PART 1: CVE Knowledge Base Search Results\n\n" | |
| combined_result += rag_results | |
| combined_result += "\n\n" + "=" * 80 + "\n\n" | |
| # Step 4: Fetch NVD details for each CVE | |
| combined_result += f"## π PART 2: Detailed NVD Information (Top {len(unique_cve_ids)} CVEs)\n\n" | |
| combined_result += f"Fetching official NVD details for: {', '.join(unique_cve_ids)}\n\n" | |
| combined_result += "-" * 80 + "\n\n" | |
| for idx, cve_id in enumerate(unique_cve_ids, 1): | |
| print(f"π Step 3.{idx}: Fetching NVD details for {cve_id}...") | |
| # Fetch NVD details | |
| nvd_result = self.get_nvd_cve_details(cve_id) | |
| combined_result += nvd_result | |
| combined_result += "\n" + "=" * 80 + "\n\n" | |
| # Rate limiting: Add delay between requests (NVD recommends max 5 requests per 30 seconds) | |
| if idx < len(unique_cve_ids): | |
| time.sleep(6) # Wait 6 seconds between requests | |
| # Step 5: Add summary | |
| combined_result += "## π SUMMARY\n\n" | |
| combined_result += f"β **Total CVEs Analyzed**: {len(unique_cve_ids)}\n" | |
| combined_result += f"β **Search Query**: {query}\n" | |
| combined_result += f"β **RAG Results**: {len(cve_ids)} CVE references found\n" | |
| combined_result += f"β **NVD Details Fetched**: {len(unique_cve_ids)} CVEs\n\n" | |
| combined_result += "π‘ **Next Steps**: Use this information to:\n" | |
| combined_result += "- Cross-reference vulnerabilities in your code\n" | |
| combined_result += "- Understand CVSS severity scores\n" | |
| combined_result += "- Review CWE classifications\n" | |
| combined_result += "- Check official NVD references for remediation guidance\n" | |
| print(f"β Combined analysis complete!") | |
| return combined_result | |
| except Exception as e: | |
| return f"β Error in combined CVE analysis: {str(e)}\n\nPlease try using search_cve_database and get_nvd_cve_details separately." | |
| # Initialize the GitHub MCP server | |
| github_server = GitHubMCPServer() | |
| # Create Gradio interfaces for MCP | |
| demo = gr.TabbedInterface( | |
| [ | |
| gr.Interface( | |
| fn=github_server.get_repository_info, | |
| inputs=[ | |
| gr.Textbox(label="Repository Owner", placeholder="octocat"), | |
| gr.Textbox(label="Repository Name", placeholder="Hello-World") | |
| ], | |
| outputs=gr.Textbox(label="Repository Information", lines=15), | |
| title="Get Repository Information", | |
| description="Get basic information about a GitHub repository", | |
| api_name="get_repository_info" | |
| ), | |
| gr.Interface( | |
| fn=github_server.get_file_content, | |
| inputs=[ | |
| gr.Textbox(label="Repository Owner", placeholder="octocat"), | |
| gr.Textbox(label="Repository Name", placeholder="Hello-World"), | |
| gr.Textbox(label="File Path", placeholder="README.md") | |
| ], | |
| outputs=gr.Textbox(label="File Content", lines=20), | |
| title="Get File Content", | |
| description="Get the content of a specific file from a GitHub repository", | |
| api_name="get_file_content" | |
| ), | |
| gr.Interface( | |
| fn=github_server.scan_repository, | |
| inputs=[ | |
| gr.Textbox(label="Repository Owner", placeholder="octocat"), | |
| gr.Textbox(label="Repository Name", placeholder="Hello-World"), | |
| gr.Textbox(label="File Extensions", value=".py,.js,.ts,.php,.java", placeholder=".py,.js,.ts,.php,.java") | |
| ], | |
| outputs=gr.Textbox(label="Scan Results", lines=20), | |
| title="Scan Repository for Code Files", | |
| description="Scan a GitHub repository for code files with specified extensions", | |
| api_name="scan_repository" | |
| ), | |
| gr.Interface( | |
| fn=github_server.search_cve_database, | |
| inputs=[ | |
| gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc.") | |
| ], | |
| outputs=gr.Textbox(label="CVE Search Results", lines=25), | |
| title="Search CVE Database", | |
| description="Search the CVE knowledge base for vulnerability patterns and CWE information", | |
| api_name="search_cve_database" | |
| ), | |
| gr.Interface( | |
| fn=github_server.get_nvd_cve_details, | |
| inputs=[ | |
| gr.Textbox(label="CVE ID", placeholder="CVE-2019-16515", value="CVE-2019-16515") | |
| ], | |
| outputs=gr.Textbox(label="NVD CVE Details", lines=30), | |
| title="Get NVD CVE Details", | |
| description="Fetch detailed CVE information from National Vulnerability Database (NVD)", | |
| api_name="get_nvd_cve_details" | |
| ), | |
| gr.Interface( | |
| fn=github_server.search_and_fetch_cve_details, | |
| inputs=[ | |
| gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc.", value="SQL injection"), | |
| gr.Slider(minimum=1, maximum=10, value=5, step=1, label="Max NVD Fetches", info="Number of CVEs to fetch NVD details for") | |
| ], | |
| outputs=gr.Textbox(label="Comprehensive CVE Analysis", lines=40), | |
| title="π¬ Smart CVE Analysis (RAG + NVD)", | |
| description="Automatically searches CVE database AND fetches detailed NVD information for top CVEs", | |
| api_name="search_and_fetch_cve_details" | |
| ), | |
| gr.Interface( | |
| fn=github_server.simple_cve_search, | |
| inputs=[ | |
| gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc."), | |
| gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Number of Results", info="Number of CVE matches to return") | |
| ], | |
| outputs=gr.Textbox(label="Simple CVE Search Results", lines=15), | |
| title="π Simple CVE Search", | |
| description="Simple CVE search returning only CVE IDs and descriptions (for multi-agent workflow)", | |
| api_name="simple_cve_search" | |
| ) | |
| ], | |
| [ | |
| "Repository Info", | |
| "File Content", | |
| "Repository Scanner", | |
| "CVE Database", | |
| "NVD CVE Details", | |
| "π¬ Smart CVE Analysis", | |
| "π Simple CVE Search" | |
| ], | |
| title="π GitHub MCP Server with CVE Knowledge Base & NVD Integration" | |
| ) | |
| if __name__ == "__main__": | |
| print("π Starting GitHub MCP Server with CVE Knowledge Base & NVD Integration...") | |
| print("π‘ Server will provide GitHub repository access, CVE search, and NVD details via MCP") | |
| print("π οΈ Available tools:") | |
| print(" - get_repository_info: Get repository metadata") | |
| print(" - get_file_content: Retrieve file contents") | |
| print(" - scan_repository: Scan for code files") | |
| print(" - search_cve_database: Search CVE knowledge base") | |
| print(" - get_nvd_cve_details: Fetch detailed CVE info from NVD") | |
| print(" - π search_and_fetch_cve_details: Smart combined RAG + NVD analysis") | |
| demo.launch(mcp_server=True) |