# Entry point for CLI version import cohere from dotenv import load_dotenv import os import json from pathlib import Path import ast import re from Prompts.system_prompts import ( COHERE_CODING_AGENT, COHERE_CODE_REVIEWER, COHERE_ARCHITECT ) load_dotenv() cohere_api_key = os.getenv("COHERE_API_KEY") print(cohere_api_key[0:5] + "*" * len(cohere_api_key) if cohere_api_key else "No API key found") co = cohere.ClientV2(cohere_api_key) # ===== File System Manager ===== class FileSystemManager: """Manages file reading, writing, and analysis""" def __init__(self, workspace_dir="workspace"): self.workspace_dir = os.path.abspath(workspace_dir) self.create_workspace() # Unsafe patterns for security self.unsafe_patterns = [ r"os\.system\(", r"subprocess\.[^Popen]", # Allow Popen with validation r"eval\(", r"exec\(", r"__import__\(", r"rm -rf", r"shutil\.rmtree", ] def create_workspace(self): """Create workspace directory if it doesn't exist""" os.makedirs(self.workspace_dir, exist_ok=True) print(f"๐Ÿ“ Workspace: {self.workspace_dir}") def read_file(self, file_path): """Read a file with safety checks""" try: # Convert to absolute path within workspace abs_path = self._resolve_path(file_path) # Security check if not self._is_safe_path(abs_path): return {"error": f"Access restricted: {file_path}"} if not os.path.exists(abs_path): return {"error": f"File not found: {file_path}"} if not os.path.isfile(abs_path): return {"error": f"Not a file: {file_path}"} with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return { "success": True, "path": abs_path, "content": content, "size": len(content), "lines": len(content.split('\n')) } except PermissionError: return {"error": f"Permission denied: {file_path}"} except Exception as e: return {"error": f"Error reading {file_path}: {str(e)}"} def write_file(self, file_path, content): """Write content to a file""" try: abs_path = self._resolve_path(file_path) # Security check if not self._is_safe_path(abs_path): return {"error": f"Write restricted: {file_path}"} # Create directory if needed os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, 'w', encoding='utf-8') as f: f.write(content) return { "success": True, "path": abs_path, "message": f"File written: {abs_path}", "size": len(content) } except Exception as e: return {"error": f"Error writing {file_path}: {str(e)}"} def create_file(self, file_path, content=""): """Create a new file with optional content""" return self.write_file(file_path, content) def list_files(self, directory=".", pattern="*"): """List files in a directory""" try: dir_path = self._resolve_path(directory) if not os.path.exists(dir_path): return {"error": f"Directory not found: {directory}"} if not os.path.isdir(dir_path): return {"error": f"Not a directory: {directory}"} files = [] for root, dirs, filenames in os.walk(dir_path): # Skip hidden directories dirs[:] = [d for d in dirs if not d.startswith('.')] for filename in filenames: if not filename.startswith('.'): # Skip hidden files full_path = os.path.join(root, filename) rel_path = os.path.relpath(full_path, self.workspace_dir) try: stat = os.stat(full_path) files.append({ "name": filename, "path": rel_path, "size": stat.st_size, "modified": stat.st_mtime }) except: continue return { "success": True, "directory": dir_path, "files": sorted(files, key=lambda x: x["path"]), "count": len(files) } except Exception as e: return {"error": f"Error listing files: {str(e)}"} def analyze_file(self, file_path): """Analyze a Python file using AST""" read_result = self.read_file(file_path) if "error" in read_result: return read_result content = read_result["content"] try: tree = ast.parse(content) functions = [] classes = [] imports = [] for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): functions.append({ "name": node.name, "line": node.lineno, "args": [arg.arg for arg in node.args.args] }) elif isinstance(node, ast.ClassDef): classes.append({ "name": node.name, "line": node.lineno }) elif isinstance(node, ast.Import): for alias in node.names: imports.append(alias.name) elif isinstance(node, ast.ImportFrom): module = node.module or "" for alias in node.names: imports.append(f"{module}.{alias.name}") return { "success": True, "path": read_result["path"], "valid_python": True, "functions": functions, "classes": classes, "imports": imports, "analysis": f"Found {len(functions)} functions, {len(classes)} classes, {len(imports)} imports" } except SyntaxError as e: return { "success": False, "path": read_result["path"], "valid_python": False, "error": f"Syntax error at line {e.lineno}: {e.msg}" } except Exception as e: return { "success": False, "path": read_result["path"], "error": f"Analysis error: {str(e)}" } def _resolve_path(self, path): """Resolve path relative to workspace""" if os.path.isabs(path): return path return os.path.join(self.workspace_dir, path) def _is_safe_path(self, path): """Check if path is safe to access""" abs_path = os.path.abspath(path) # Must be within workspace if not abs_path.startswith(self.workspace_dir): return False # Block sensitive files sensitive = ['.env', 'secret', 'password', 'key', 'token', '.git', '__pycache__'] for pattern in sensitive: if pattern in abs_path.lower(): return False return True # Initialize file system manager fs = FileSystemManager() # ===== Enhanced Agent Functions ===== def coding_agent(task, context="", persona="coder", file_context=None): """Enhanced coding agent with file context support""" persona_prompts = { 'coder': COHERE_CODING_AGENT, 'reviewer': COHERE_CODE_REVIEWER, 'architect': COHERE_ARCHITECT } system_prompt = persona_prompts.get(persona, COHERE_CODING_AGENT) full_prompt = f"{system_prompt}\n\nTask: {task}" if context: full_prompt += f"\n\nCode Context:\n```python\n{context}\n```" if file_context: full_prompt += f"\n\nFile Context:\n{file_context}" response = co.chat( model="command-a-03-2025", messages=[ { "role": "user", "content": full_prompt, } ], ) return response.message.content[0].text def simple_agent(): """Test the function""" response = co.chat( model='command-a-03-2025', messages=[ { 'role': 'user', 'content': "I'm joining a new startup called Co1t today. Could you help me write a one-sentence introduction message to my teammates" } ], ) print(response.message.content[0].text) def interactive_agent(): """CLI for the agent with file operations""" print("๐Ÿค– AI Coding Agent (with File System)") print("Type 'quit' to exit, 'help' for commands\n") while True: user_input = input("\n> ").strip() if user_input.lower() == 'quit': print("Goodbye!") break elif user_input.lower() == 'help': print("\n๐Ÿ“š COMMANDS:") print(" help - Show this help") print(" quit - Exit") print(" test - Test agent") print(" read - Read a file") print(" write - Write to file") print(" create [content] - Create new file") print(" list [dir] - List files") print(" analyze - Analyze Python file") print(" review - Review code") print(" architect - Use architect") print(" edit - Edit file with AI") print(" Or ask any coding question!") continue elif user_input.lower() == 'test': print("\n๐Ÿงช Testing agent...") result = coding_agent( "Write Python function to check prime numbers", persona="coder" ) print(f"\n{result}") # ===== FILE OPERATIONS ===== elif user_input.lower().startswith('read '): file_path = user_input[5:].strip() if not file_path: print("โŒ Please provide file path") continue print(f"\n๐Ÿ“– Reading {file_path}...") result = fs.read_file(file_path) if "error" in result: print(f"โŒ Error: {result['error']}") else: print(f"โœ… File: {result['path']}") print(f"Size: {result['size']} chars, Lines: {result['lines']}") print("-" * 60) # Show first 500 chars preview = result['content'] if len(preview) > 500: preview = preview[:500] + "...\n[Truncated]" print(preview) print("-" * 60) elif user_input.lower().startswith('write '): parts = user_input[6:].strip().split(' ', 1) if len(parts) < 2: print("โŒ Usage: write ") continue file_path, content = parts print(f"\nโœ๏ธ Writing to {file_path}...") result = fs.write_file(file_path, content) if "error" in result: print(f"โŒ Error: {result['error']}") else: print(f"โœ… {result['message']}") elif user_input.lower().startswith('create '): parts = user_input[7:].strip().split(' ', 1) file_path = parts[0] content = parts[1] if len(parts) > 1 else "" print(f"\n๐Ÿ“ Creating {file_path}...") result = fs.create_file(file_path, content) if "error" in result: print(f"โŒ Error: {result['error']}") else: print(f"โœ… File created: {result['path']}") if content: print(f" With {len(content)} characters of content") elif user_input.lower().startswith('list'): dir_path = user_input[5:].strip() or "." print(f"\n๐Ÿ“ Listing files in {dir_path}...") result = fs.list_files(dir_path) if "error" in result: print(f"โŒ Error: {result['error']}") else: print(f"โœ… Directory: {result['directory']}") print(f"Found {result['count']} files:") print("-" * 60) for file_info in result['files'][:20]: # Show first 20 size_kb = file_info['size'] / 1024 print(f"{file_info['path']:40} ({size_kb:.1f} KB)") if result['count'] > 20: print(f"... and {result['count'] - 20} more files") print("-" * 60) elif user_input.lower().startswith('analyze '): file_path = user_input[8:].strip() if not file_path: print("โŒ Please provide file path") continue print(f"\n๐Ÿ” Analyzing {file_path}...") result = fs.analyze_file(file_path) if "error" in result: print(f"โŒ Error: {result['error']}") elif not result.get("valid_python", True): print(f"โš ๏ธ {result['error']}") else: print(f"โœ… Valid Python file: {result['path']}") print(f"๐Ÿ“Š Analysis: {result['analysis']}") if result['functions']: print("\n๐Ÿ“‹ Functions:") for func in result['functions'][:5]: # Show first 5 args = ', '.join(func['args']) print(f" โ€ข {func['name']}({args}) (line {func['line']})") if result['classes']: print("\n๐Ÿ—๏ธ Classes:") for cls in result['classes'][:5]: print(f" โ€ข {cls['name']} (line {cls['line']})") if result['imports']: print("\n๐Ÿ“ฆ Imports:") for imp in result['imports'][:10]: # Show first 10 print(f" โ€ข {imp}") elif user_input.lower().startswith('review'): # Check if it's a file or inline code target = user_input[7:].strip() if os.path.exists(target) or '/' in target or '.' in target: # It's probably a file path print(f"\n๐Ÿ” Reviewing file: {target}") read_result = fs.read_file(target) if "error" in read_result: print(f"โŒ Error reading file: {read_result['error']}") continue context = read_result['content'] print(f"๐Ÿ“„ Read {len(context)} characters from file") else: # It's inline code context = target if not context: print("โŒ Please provide code or file path to review") continue print(f"\n๐Ÿ” Reviewing code ({len(context)} chars)...") print("๐Ÿค– Analyzing with AI reviewer...") result = coding_agent( "Review this code for security issues, bugs, and improvements", context=context, persona="reviewer" ) print(f"\n{result}") elif user_input.lower().startswith('edit '): # AI-powered file editing parts = user_input[5:].strip().split(' ', 1) if len(parts) < 2: print("โŒ Usage: edit ") continue file_path, instructions = parts print(f"\nโœ๏ธ Editing {file_path} with AI...") # Read current file read_result = fs.read_file(file_path) if "error" in read_result: print(f"โŒ Error reading file: {read_result['error']}") continue current_content = read_result['content'] print(f"๐Ÿ“„ Current file: {len(current_content)} characters") # Ask AI to edit edit_prompt = f"""Edit this file according to these instructions: File: {file_path} Instructions: {instructions} Current content: ```python {current_content[:1000]} # First 1000 chars ``` Return the COMPLETE new file content. Only output the code, no explanations.""" print("๐Ÿค– AI is editing the file...") try: new_content = coding_agent(edit_prompt, context=current_content, persona="coder") # Write edited content write_result = fs.write_file(file_path, new_content) if "error" in write_result: print(f"โŒ Error saving: {write_result['error']}") else: print(f"โœ… File updated: {write_result['path']}") print(f" New size: {len(new_content)} characters") # Show diff summary old_lines = current_content.split('\n') new_lines = new_content.split('\n') print(f" Lines changed: {abs(len(new_lines) - len(old_lines))}") except Exception as e: print(f"โŒ AI editing failed: {str(e)}") elif user_input.lower().startswith('architect'): task = user_input[10:].strip() if not task: print("โŒ Please provide a task for the architect") continue print("\n๐Ÿ—๏ธ Architect thinking...") result = coding_agent(task, persona="architect") print(f"\n{result}") else: # Check if query mentions a file file_mentioned = None for word in user_input.split(): if '.' in word and not word.startswith('.'): # Might be a file reference possible_file = word.strip('.,!?;:"\'') if os.path.exists(possible_file) or '/' in possible_file: file_mentioned = possible_file break if file_mentioned: # Try to read the file for context print(f"\n๐Ÿ“„ Detected file reference: {file_mentioned}") read_result = fs.read_file(file_mentioned) if "error" not in read_result: print(f"โœ… Adding file context ({len(read_result['content'])} chars)") file_context = f"File '{file_mentioned}' content:\n{read_result['content'][:2000]}" result = coding_agent(user_input, file_context=file_context, persona="coder") else: print(f"โš ๏ธ Could not read file, proceeding without context") result = coding_agent(user_input, persona="coder") else: # General coding question result = coding_agent(user_input, persona="coder") print(f"\n๐Ÿค– Assistant:\n{result}") # ===== Example Files Creation ===== def setup_example_files(): """Create example files in workspace""" examples = { "math_operations.py": """def add(a, b): return a + b def subtract(a, b): return a - b def multiply(a, b): return a * b def divide(a, b): if b == 0: raise ValueError("Cannot divide by zero") return a / b # Test the functions if __name__ == "__main__": print("Testing math operations:") print(f"2 + 3 = {add(2, 3)}") print(f"5 - 2 = {subtract(5, 2)}") print(f"4 * 6 = {multiply(4, 6)}") print(f"10 / 2 = {divide(10, 2)}")""", "data_processor.py": """import json from typing import List, Dict, Any class DataProcessor: def __init__(self): self.data = [] def load_from_file(self, filepath: str): with open(filepath, 'r') as f: self.data = json.load(f) return self def filter_by_key(self, key: str, value: Any) -> List[Dict]: return [item for item in self.data if item.get(key) == value] def calculate_average(self, key: str) -> float: values = [item[key] for item in self.data if key in item] if not values: return 0.0 return sum(values) / len(values) def save_results(self, filepath: str, results: List[Dict]): with open(filepath, 'w') as f: json.dump(results, f, indent=2)""", "README.md": """# AI Coding Agent Workspace This workspace contains example files for the AI coding agent. ## Files: - `math_operations.py`: Basic math functions - `data_processor.py`: Data processing class - `config.json`: Configuration file ## Usage: Use the agent commands to read, write, and analyze these files.""" } print("\n๐Ÿ“ Setting up example workspace...") for filename, content in examples.items(): result = fs.create_file(filename, content) if "error" in result: print(f"โŒ Failed to create {filename}: {result['error']}") else: print(f"โœ… Created {filename}") # Create empty config file fs.create_file("config.json", '{"debug": true, "version": "1.0.0"}') print("\n๐ŸŽ‰ Workspace ready! Try these commands:") print(" โ€ข list") print(" โ€ข read math_operations.py") print(" โ€ข analyze data_processor.py") print(" โ€ข review math_operations.py") print(" โ€ข edit math_operations.py 'add docstrings to all functions'") if __name__ == '__main__': # Setup workspace setup_example_files() # Test basic functionality print("\n" + "="*60) simple_agent() # Start interactive mode print("\n" + "="*60) interactive_agent()