Spaces:
Sleeping
Sleeping
| import requests | |
| import time | |
| import os | |
| from urllib.parse import urlparse | |
| from treelib import Tree | |
| from typing import Dict, List, Optional, Tuple | |
| import logging | |
| from dataclasses import dataclass | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from groq import Groq, GroqError | |
| import gradio as gr | |
| from tqdm.auto import tqdm | |
| # --- Basic Configuration --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- Data Structures --- | |
| class FileInfo: | |
| """Data class to store file information""" | |
| path: str | |
| name: str | |
| content: str | |
| explanation: str | |
| size: int | |
| file_type: str | |
| # --- Core Application Logic --- | |
| class GitHubRepositoryAnalyzer: | |
| """ | |
| A class to analyze GitHub repositories by fetching file structures, | |
| downloading content, and using an LLM to explain the code. | |
| """ | |
| def __init__(self, github_token: Optional[str] = None, groq_api_key: Optional[str] = None): | |
| self.github_token = github_token | |
| self.session = requests.Session() | |
| self.file_contents: Dict[str, FileInfo] = {} | |
| # Configure GitHub API access | |
| if self.github_token: | |
| logger.info("Using provided GitHub token for higher rate limits.") | |
| self.session.headers.update({'Authorization': f'token {self.github_token}'}) | |
| # Authenticated GitHub API: 5000 requests/hour | |
| self.rate_limiter = RateLimiter(max_calls=4500, time_window=3600) | |
| else: | |
| logger.warning("No GitHub token. Using unauthenticated access with lower rate limits.") | |
| # Unauthenticated: 60 requests/hour | |
| self.rate_limiter = RateLimiter(max_calls=50, time_window=3600) | |
| # Configure Groq client | |
| if groq_api_key: | |
| self.groq_client = Groq(api_key=groq_api_key) | |
| logger.info("Groq client initialized.") | |
| else: | |
| self.groq_client = None | |
| logger.warning("Groq API key not provided. Code analysis will be skipped.") | |
| # File types to analyze | |
| self.analyzable_extensions = { | |
| '.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.cs', '.php', | |
| '.rb', '.go', '.rs', '.swift', '.kt', '.scala', '.sh', '.bash', | |
| '.sql', '.html', '.css', '.scss', '.less', '.json', '.xml', '.yaml', '.yml', | |
| '.md', '.txt', '.cfg', '.conf', '.ini', '.env', '.dockerfile', 'requirements.txt' | |
| } | |
| def extract_repo_info(self, repo_url: str) -> Tuple[str, str]: | |
| """Extract owner and repository name from a GitHub URL.""" | |
| try: | |
| parsed_url = urlparse(repo_url) | |
| path = parsed_url.path.strip('/').replace('.git', '') | |
| parts = path.split('/') | |
| if len(parts) >= 2: | |
| return parts[0], parts[1] | |
| raise ValueError("Invalid repository URL format") | |
| except Exception as e: | |
| logger.error(f"Error parsing repository URL: {e}") | |
| raise | |
| def get_repository_structure(self, owner: str, repo: str, path: str = "") -> List[Dict]: | |
| """Recursively fetch the entire file structure of the repository.""" | |
| all_files = [] | |
| try: | |
| contents = self._fetch_contents(owner, repo, path) | |
| for item in contents: | |
| if item['type'] == 'dir': | |
| all_files.extend(self.get_repository_structure(owner, repo, item['path'])) | |
| else: | |
| all_files.append(item) | |
| except Exception as e: | |
| logger.error(f"Failed to get repository structure for {path}: {e}") | |
| return all_files | |
| def _fetch_contents(self, owner: str, repo: str, path: str) -> List[Dict]: | |
| """Helper to fetch contents of a specific directory with pagination.""" | |
| url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" | |
| items = [] | |
| while url: | |
| self.rate_limiter.wait_if_needed() | |
| response = self.session.get(url) | |
| response.raise_for_status() | |
| items.extend(response.json()) | |
| url = response.links.get('next', {}).get('url') | |
| return items | |
| def map_reduce_analysis(self, owner: str, repo: str, files: List[Dict], progress: gr.Progress): | |
| """ | |
| Analyzes files in parallel (Map phase) and aggregates results (Reduce phase). | |
| This method uses a ThreadPoolExecutor to perform I/O-bound tasks concurrently. | |
| """ | |
| # --- MAP PHASE --- | |
| # Each file is processed independently in a separate thread. | |
| # This is efficient for tasks that wait for network responses (API calls). | |
| logger.info(f"Starting Map phase: Analyzing {len(files)} files in parallel.") | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| future_to_file = {executor.submit(self._process_single_file, owner, repo, file_item): file_item for file_item in files} | |
| # tqdm progress tracking integrated with Gradio | |
| pbar = tqdm(as_completed(future_to_file), total=len(files), desc="Analyzing Files") | |
| for future in pbar: | |
| try: | |
| file_info = future.result() | |
| if file_info: | |
| # Store the result of the map phase | |
| self.file_contents[file_info.path] = file_info | |
| pbar.set_description(f"Analyzed {file_info.name}") | |
| # Update Gradio progress bar | |
| progress(pbar.n / pbar.total, desc=pbar.desc) | |
| except Exception as e: | |
| file_item = future_to_file[future] | |
| logger.error(f"Error processing {file_item['path']}: {e}") | |
| # --- REDUCE PHASE --- | |
| # The reduce phase is the aggregation and structuring of the mapped results, | |
| # which happens after the loop when creating the tree and summary. | |
| logger.info("Reduce phase: Aggregating results.") | |
| tree = self._create_directory_tree(owner, repo) | |
| details = self._format_detailed_explanations() | |
| summary = self._format_summary(owner, repo) | |
| return tree, details, summary | |
| def _process_single_file(self, owner: str, repo: str, file_item: Dict) -> Optional[FileInfo]: | |
| """Processes a single file: download, check, and analyze.""" | |
| file_path = file_item['path'] | |
| file_size = file_item.get('size', 0) | |
| if not self._should_analyze_file(file_path, file_size): | |
| return None | |
| content = self._get_raw_file(owner, repo, file_path) | |
| if content is None: | |
| return None | |
| explanation = self._analyze_code_with_llm(content, file_path) | |
| return FileInfo( | |
| path=file_path, name=file_item['name'], content=content, | |
| explanation=explanation, size=file_size, file_type=os.path.splitext(file_path)[1] | |
| ) | |
| def _should_analyze_file(self, file_path: str, file_size: int) -> bool: | |
| """Determine if a file should be analyzed based on extension and size.""" | |
| if file_size > 1024 * 1024: return False # Skip files > 1MB | |
| file_name = os.path.basename(file_path) | |
| _, file_ext = os.path.splitext(file_name) | |
| return file_ext.lower() in self.analyzable_extensions or file_name in self.analyzable_extensions | |
| def _get_raw_file(self, owner: str, repo: str, file_path: str) -> Optional[str]: | |
| """Fetch raw file content with fallback branches.""" | |
| for branch in ['main', 'master']: | |
| url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}" | |
| try: | |
| response = self.session.get(url, timeout=10) | |
| if response.status_code == 200: | |
| # Simple check for binary content | |
| return response.text if '\x00' not in response.text else None | |
| except (requests.RequestException, UnicodeDecodeError) as e: | |
| logger.warning(f"Could not fetch or decode {file_path} from branch {branch}: {e}") | |
| return None | |
| def _analyze_code_with_llm(self, code: str, file_path: str) -> str: | |
| """Analyze code using Groq LLM API.""" | |
| if not self.groq_client: | |
| return "Analysis skipped: Groq API key not provided." | |
| max_code_length = 8000 | |
| if len(code) > max_code_length: code = code[:max_code_length] + "\n... (truncated)" | |
| prompt = f"""Analyze the following code from file '{file_path}'. Provide a concise explanation of its functionality, purpose, and key components. | |
| ``` | |
| {code} | |
| ``` | |
| Structure your analysis with these points: | |
| 1. **Main Purpose**: What is the primary goal of this file? | |
| 2. **Key Functions/Classes**: What are the main components and what do they do? | |
| 3. **Overall Role**: How does this file fit into the larger project? | |
| """ | |
| try: | |
| chat_completion = self.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| temperature=0.2, max_tokens=1024 | |
| ) | |
| return chat_completion.choices[0].message.content.strip() | |
| except GroqError as e: | |
| logger.error(f"Groq API error for {file_path}: {e}") | |
| return f"Error: Groq API request failed - {e.message}" | |
| except Exception as e: | |
| logger.error(f"Error calling Groq API for {file_path}: {e}") | |
| return f"Error: {e}" | |
| def _create_directory_tree(self, owner: str, repo: str) -> str: | |
| """Creates a string representation of the directory tree.""" | |
| tree = Tree() | |
| root_id = f"{owner}/{repo}" | |
| tree.create_node(f"π³ {root_id}", root_id) | |
| created_nodes = {root_id} | |
| for file_path in sorted(self.file_contents.keys()): | |
| path_parts = file_path.split('/') | |
| parent_id = root_id | |
| for i, part in enumerate(path_parts[:-1]): | |
| node_id = f"{root_id}/{'/'.join(path_parts[:i+1])}" | |
| if node_id not in created_nodes: | |
| tree.create_node(f"π {part}", node_id, parent=parent_id) | |
| created_nodes.add(node_id) | |
| parent_id = node_id | |
| file_name = path_parts[-1] | |
| file_type = self.file_contents[file_path].file_type | |
| emoji = self.get_file_emoji(file_type) | |
| tree.create_node(f"{emoji} {file_name}", f"{root_id}/{file_path}", parent=parent_id) | |
| return f"```\n{tree.show(line_type='ascii-ex')}\n```" | |
| def _format_detailed_explanations(self) -> str: | |
| """Formats all file explanations into a single Markdown string.""" | |
| if not self.file_contents: return "No files were analyzed." | |
| output = [] | |
| for path, info in sorted(self.file_contents.items()): | |
| output.append(f"### π `{path}`") | |
| output.append(f"**Size**: {info.size:,} bytes") | |
| output.append("---") | |
| output.append(info.explanation) | |
| output.append("\n---\n") | |
| return "\n".join(output) | |
| def _format_summary(self, owner: str, repo: str) -> str: | |
| """Creates a summary of the analysis.""" | |
| total_files = len(self.file_contents) | |
| total_size = sum(info.size for info in self.file_contents.values()) | |
| return ( | |
| f"## Analysis Summary for `{owner}/{repo}`\n" | |
| f"- **Total Files Analyzed**: {total_files}\n" | |
| f"- **Total Code Size Analyzed**: {total_size:,} bytes" | |
| ) | |
| def get_file_emoji(file_type: str) -> str: | |
| """Returns an emoji for a given file type.""" | |
| emoji_map = { | |
| '.py': 'π', '.js': 'π¨', '.ts': 'π·', '.java': 'β', '.html': 'π', | |
| '.css': 'π¨', '.json': 'π', '.md': 'π', '.sh': 'π', '.yml': 'βοΈ', | |
| '.yaml': 'βοΈ', '.dockerfile': 'π³', '.sql': 'ποΈ', 'requirements.txt': 'π¦' | |
| } | |
| return emoji_map.get(file_type.lower(), 'π') | |
| class RateLimiter: | |
| """Simple rate limiter to avoid exceeding API limits.""" | |
| def __init__(self, max_calls: int, time_window: int): | |
| self.max_calls = max_calls | |
| self.time_window = time_window | |
| self.calls = [] | |
| def wait_if_needed(self): | |
| now = time.time() | |
| self.calls = [t for t in self.calls if now - t < self.time_window] | |
| if len(self.calls) >= self.max_calls: | |
| sleep_time = self.time_window - (now - self.calls[0]) | |
| if sleep_time > 0: | |
| logger.info(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.") | |
| time.sleep(sleep_time) | |
| self.calls.append(time.time()) | |
| # --- Gradio Interface --- | |
| def analyze_repo_gradio(repo_url: str, github_token: str, groq_key: str, progress=gr.Progress(track_tqdm=True)): | |
| """The main function executed by the Gradio interface.""" | |
| if not repo_url: | |
| return "Please enter a GitHub repository URL.", "", "" | |
| if not groq_key: | |
| return "Please enter your Groq API Key.", "", "" | |
| try: | |
| analyzer = GitHubRepositoryAnalyzer(github_token=github_token, groq_api_key=groq_key) | |
| progress(0, desc="Extracting repo info...") | |
| owner, repo = analyzer.extract_repo_info(repo_url) | |
| progress(0.1, desc="Fetching repository file structure...") | |
| all_files = analyzer.get_repository_structure(owner, repo) | |
| if not all_files: | |
| return "Could not retrieve repository structure. Check URL or token.", "", "" | |
| tree, details, summary = analyzer.map_reduce_analysis(owner, repo, all_files, progress) | |
| return tree, details, summary | |
| except Exception as e: | |
| logger.error(f"A critical error occurred: {e}", exc_info=True) | |
| return f"An error occurred: {e}", "", "" | |
| def create_gradio_interface(): | |
| """Builds and returns the Gradio web interface.""" | |
| with gr.Blocks(theme=gr.themes.Soft(), title="GitHub Repo Analyzer") as demo: | |
| gr.Markdown("# π€ AI-Powered GitHub Repository Analyzer") | |
| gr.Markdown("Enter a GitHub repository URL to generate a file tree, get AI-powered explanations for each file, and see a summary.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="e.g., https://github.com/google/generative-ai-python") | |
| with gr.Accordion("API Keys (Optional but Recommended)", open=False): | |
| github_token = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token for a higher rate limit", type="password") | |
| groq_key = gr.Textbox(label="Groq API Key", placeholder="Enter your Groq API key for code analysis", type="password") | |
| analyze_btn = gr.Button("Analyze Repository", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("π Summary"): | |
| summary_output = gr.Markdown() | |
| with gr.TabItem("π³ File Tree"): | |
| tree_output = gr.Markdown() | |
| with gr.TabItem("π Detailed Analysis"): | |
| details_output = gr.Markdown() | |
| analyze_btn.click( | |
| fn=analyze_repo_gradio, | |
| inputs=[repo_url, github_token, groq_key], | |
| outputs=[tree_output, details_output, summary_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app = create_gradio_interface() | |
| app.launch(debug=True) |