|
|
"""Codebase analysis agent to detect framework, dependencies, and structure.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
from schemas import ReadinessRequest |
|
|
|
|
|
|
|
|
class CodebaseAnalyzer: |
|
|
"""Analyzes codebase to extract framework, dependencies, and deployment info.""" |
|
|
|
|
|
def analyze_folder(self, folder_path: str) -> Dict[str, Any]: |
|
|
"""Analyze a local folder.""" |
|
|
path = Path(folder_path) |
|
|
if not path.exists(): |
|
|
return {"error": f"Folder not found: {folder_path}"} |
|
|
|
|
|
analysis = { |
|
|
"framework": None, |
|
|
"platform": None, |
|
|
"dependencies": [], |
|
|
"package_manager": None, |
|
|
"has_docker": False, |
|
|
"has_docker_compose": False, |
|
|
"has_k8s": False, |
|
|
"config_files": [], |
|
|
"readme_path": None, |
|
|
"code_summary": "", |
|
|
"detected_files": [] |
|
|
} |
|
|
|
|
|
|
|
|
package_files = { |
|
|
"package.json": "npm", |
|
|
"requirements.txt": "pip", |
|
|
"Pipfile": "pipenv", |
|
|
"poetry.lock": "poetry", |
|
|
"go.mod": "go", |
|
|
"Cargo.toml": "rust", |
|
|
"pom.xml": "maven", |
|
|
"build.gradle": "gradle", |
|
|
} |
|
|
|
|
|
for file_name, manager in package_files.items(): |
|
|
file_path = path / file_name |
|
|
if file_path.exists(): |
|
|
analysis["package_manager"] = manager |
|
|
analysis["config_files"].append(file_name) |
|
|
analysis["detected_files"].append(file_name) |
|
|
|
|
|
|
|
|
if file_name == "package.json": |
|
|
deps = self._parse_package_json(file_path) |
|
|
analysis["dependencies"] = deps.get("dependencies", []) |
|
|
analysis["framework"] = self._detect_framework_from_package_json(deps) |
|
|
elif file_name == "requirements.txt": |
|
|
analysis["dependencies"] = self._parse_requirements_txt(file_path) |
|
|
analysis["framework"] = self._detect_framework_from_requirements(analysis["dependencies"]) |
|
|
|
|
|
|
|
|
dockerfile = path / "Dockerfile" |
|
|
docker_compose = path / "docker-compose.yml" |
|
|
k8s_dir = path / "k8s" or path / "kubernetes" |
|
|
|
|
|
if dockerfile.exists(): |
|
|
analysis["has_docker"] = True |
|
|
analysis["detected_files"].append("Dockerfile") |
|
|
if docker_compose.exists(): |
|
|
analysis["has_docker_compose"] = True |
|
|
analysis["detected_files"].append("docker-compose.yml") |
|
|
if k8s_dir.exists(): |
|
|
analysis["has_k8s"] = True |
|
|
analysis["detected_files"].append("k8s/") |
|
|
|
|
|
|
|
|
for readme_name in ["README.md", "readme.md", "README.txt"]: |
|
|
readme_path = path / readme_name |
|
|
if readme_path.exists(): |
|
|
analysis["readme_path"] = str(readme_path) |
|
|
break |
|
|
|
|
|
|
|
|
vercel_json = path / "vercel.json" |
|
|
netlify_toml = path / "netlify.toml" |
|
|
if vercel_json.exists(): |
|
|
analysis["platform"] = "vercel" |
|
|
analysis["detected_files"].append("vercel.json") |
|
|
elif netlify_toml.exists(): |
|
|
analysis["platform"] = "netlify" |
|
|
analysis["detected_files"].append("netlify.toml") |
|
|
|
|
|
|
|
|
analysis["code_summary"] = self._generate_code_summary(analysis) |
|
|
|
|
|
return analysis |
|
|
|
|
|
def analyze_github_repo(self, repo_url: str) -> Dict[str, Any]: |
|
|
"""Analyze a GitHub repository (placeholder - would use GitHub API).""" |
|
|
|
|
|
match = re.search(r"github\.com[:/]([\w\-]+)/([\w\-\.]+)", repo_url) |
|
|
if not match: |
|
|
return {"error": "Invalid GitHub URL"} |
|
|
|
|
|
owner, repo = match.groups() |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"repo": f"{owner}/{repo}", |
|
|
"url": repo_url, |
|
|
"framework": None, |
|
|
"platform": None, |
|
|
"dependencies": [], |
|
|
"message": "GitHub repo analysis requires API integration" |
|
|
} |
|
|
|
|
|
def _parse_package_json(self, file_path: Path) -> Dict[str, Any]: |
|
|
"""Parse package.json file.""" |
|
|
try: |
|
|
with open(file_path, 'r') as f: |
|
|
return json.load(f) |
|
|
except Exception: |
|
|
return {} |
|
|
|
|
|
def _parse_requirements_txt(self, file_path: Path) -> List[str]: |
|
|
"""Parse requirements.txt file.""" |
|
|
deps = [] |
|
|
try: |
|
|
with open(file_path, 'r') as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if line and not line.startswith('#'): |
|
|
deps.append(line.split('==')[0].split('>=')[0].split('<=')[0]) |
|
|
except Exception: |
|
|
pass |
|
|
return deps |
|
|
|
|
|
def _detect_framework_from_package_json(self, package_json: Dict) -> Optional[str]: |
|
|
"""Detect framework from package.json dependencies.""" |
|
|
deps = {**package_json.get("dependencies", {}), **package_json.get("devDependencies", {})} |
|
|
deps_lower = {k.lower(): v for k, v in deps.items()} |
|
|
|
|
|
|
|
|
if "next" in deps_lower: |
|
|
return "next.js" |
|
|
elif "react" in deps_lower and "react-dom" in deps_lower: |
|
|
return "react" |
|
|
elif "vue" in deps_lower: |
|
|
return "vue" |
|
|
elif "angular" in deps_lower or "@angular/core" in deps_lower: |
|
|
return "angular" |
|
|
elif "svelte" in deps_lower: |
|
|
return "svelte" |
|
|
elif "express" in deps_lower: |
|
|
return "express" |
|
|
elif "@nestjs/core" in deps_lower: |
|
|
return "nestjs" |
|
|
elif "remix" in deps_lower: |
|
|
return "remix" |
|
|
|
|
|
return None |
|
|
|
|
|
def _detect_framework_from_requirements(self, deps: List[str]) -> Optional[str]: |
|
|
"""Detect framework from Python requirements.""" |
|
|
deps_lower = [d.lower() for d in deps] |
|
|
|
|
|
if "django" in deps_lower: |
|
|
return "django" |
|
|
elif "fastapi" in deps_lower: |
|
|
return "fastapi" |
|
|
elif "flask" in deps_lower: |
|
|
return "flask" |
|
|
elif "starlette" in deps_lower: |
|
|
return "starlette" |
|
|
|
|
|
return None |
|
|
|
|
|
def _generate_code_summary(self, analysis: Dict[str, Any]) -> str: |
|
|
"""Generate code summary from analysis.""" |
|
|
parts = [] |
|
|
|
|
|
if analysis["framework"]: |
|
|
parts.append(f"Framework: {analysis['framework']}") |
|
|
|
|
|
if analysis["package_manager"]: |
|
|
parts.append(f"Package manager: {analysis['package_manager']}") |
|
|
|
|
|
if analysis["dependencies"]: |
|
|
parts.append(f"Dependencies: {len(analysis['dependencies'])} packages") |
|
|
|
|
|
if analysis["has_docker"]: |
|
|
parts.append("Docker configuration detected") |
|
|
|
|
|
if analysis["has_k8s"]: |
|
|
parts.append("Kubernetes configuration detected") |
|
|
|
|
|
return ". ".join(parts) if parts else "Codebase analysis complete" |
|
|
|
|
|
def update_readme(self, readme_path: str, deployment_info: Dict[str, Any]) -> str: |
|
|
"""Update README with deployment information.""" |
|
|
try: |
|
|
with open(readme_path, 'r') as f: |
|
|
content = f.read() |
|
|
except Exception: |
|
|
content = "# Deployment\n\n" |
|
|
|
|
|
|
|
|
if "## Deployment" not in content and "### Deployment" not in content: |
|
|
content += "\n\n## Deployment\n\n" |
|
|
|
|
|
|
|
|
deployment_section = f""" |
|
|
### Deployment Status |
|
|
|
|
|
- **Platform**: {deployment_info.get('platform', 'Not configured')} |
|
|
- **Framework**: {deployment_info.get('framework', 'Unknown')} |
|
|
- **Status**: {deployment_info.get('status', 'Ready for deployment')} |
|
|
|
|
|
### Quick Deploy |
|
|
|
|
|
{deployment_info.get('deployment_instructions', 'Configure deployment in the Deployment Readiness Copilot')} |
|
|
|
|
|
--- |
|
|
*Last updated by Deployment Readiness Copilot* |
|
|
""" |
|
|
|
|
|
|
|
|
if "## Deployment" in content: |
|
|
|
|
|
pattern = r"## Deployment.*?(?=\n##|\Z)" |
|
|
content = re.sub(pattern, f"## Deployment{deployment_section}", content, flags=re.DOTALL) |
|
|
else: |
|
|
content += deployment_section |
|
|
|
|
|
try: |
|
|
with open(readme_path, 'w') as f: |
|
|
f.write(content) |
|
|
return "README updated successfully" |
|
|
except Exception as e: |
|
|
return f"Failed to update README: {str(e)}" |
|
|
|
|
|
def generate_architecture_diagram(self, analysis: Dict[str, Any]) -> str: |
|
|
"""Generate Mermaid.js architecture diagram from analysis.""" |
|
|
framework = analysis.get("framework", "Unknown") |
|
|
platform = analysis.get("platform", "Unknown") |
|
|
dependencies = analysis.get("dependencies", []) |
|
|
has_docker = analysis.get("has_docker") |
|
|
|
|
|
mermaid = "graph TD\n" |
|
|
mermaid += f" User((User)) --> LB[Load Balancer / {platform}]\n" |
|
|
mermaid += f" LB --> App[{framework} App]\n" |
|
|
|
|
|
if has_docker: |
|
|
mermaid += " subgraph Container [Docker Container]\n" |
|
|
mermaid += " App\n" |
|
|
mermaid += " end\n" |
|
|
|
|
|
if "postgresql" in dependencies or "psycopg2" in dependencies: |
|
|
mermaid += " App --> DB[(PostgreSQL)]\n" |
|
|
elif "mysql" in dependencies: |
|
|
mermaid += " App --> DB[(MySQL)]\n" |
|
|
elif "mongo" in dependencies or "pymongo" in dependencies: |
|
|
mermaid += " App --> DB[(MongoDB)]\n" |
|
|
elif "redis" in dependencies: |
|
|
mermaid += " App --> Cache[(Redis)]\n" |
|
|
|
|
|
return mermaid |
|
|
|
|
|
def identify_fixes(self, analysis: Dict[str, Any]) -> List[Dict[str, str]]: |
|
|
"""Identify potential fixes for the codebase.""" |
|
|
fixes = [] |
|
|
|
|
|
|
|
|
if analysis.get("package_manager") == "pip" and "requirements.txt" not in analysis.get("config_files", []): |
|
|
fixes.append({ |
|
|
"id": "missing_requirements", |
|
|
"title": "Missing requirements.txt", |
|
|
"description": "Python project detected but no requirements.txt found.", |
|
|
"fix_action": "create_file", |
|
|
"file": "requirements.txt", |
|
|
"content": "# Generated requirements\nflask\nrequests\n" |
|
|
}) |
|
|
|
|
|
|
|
|
if not analysis.get("has_docker"): |
|
|
fixes.append({ |
|
|
"id": "missing_dockerfile", |
|
|
"title": "Missing Dockerfile", |
|
|
"description": "Containerization is recommended for deployment.", |
|
|
"fix_action": "create_file", |
|
|
"file": "Dockerfile", |
|
|
"content": f"FROM python:3.9-slim\nWORKDIR /app\nCOPY . .\nRUN pip install -r requirements.txt\nCMD [\"python\", \"app.py\"]" |
|
|
}) |
|
|
|
|
|
|
|
|
if not analysis.get("readme_path"): |
|
|
fixes.append({ |
|
|
"id": "missing_readme", |
|
|
"title": "Missing README", |
|
|
"description": "Documentation is essential for collaboration.", |
|
|
"fix_action": "create_file", |
|
|
"file": "README.md", |
|
|
"content": f"# {analysis.get('framework', 'Project')}\n\nGenerated by Deploy Ready Copilot." |
|
|
}) |
|
|
|
|
|
return fixes |
|
|
|