ali4566544's picture
Upload 13 files
c0fd849 verified
# Entry point for CLI version
import cohere
from dotenv import load_dotenv
import os
import json
from pathlib import Path
import ast
import re
from Prompts.system_prompts import (
COHERE_CODING_AGENT,
COHERE_CODE_REVIEWER,
COHERE_ARCHITECT
)
load_dotenv()
cohere_api_key = os.getenv("COHERE_API_KEY")
print(cohere_api_key[0:5] + "*" * len(cohere_api_key) if cohere_api_key else "No API key found")
co = cohere.ClientV2(cohere_api_key)
# ===== File System Manager =====
class FileSystemManager:
"""Manages file reading, writing, and analysis"""
def __init__(self, workspace_dir="workspace"):
self.workspace_dir = os.path.abspath(workspace_dir)
self.create_workspace()
# Unsafe patterns for security
self.unsafe_patterns = [
r"os\.system\(",
r"subprocess\.[^Popen]", # Allow Popen with validation
r"eval\(",
r"exec\(",
r"__import__\(",
r"rm -rf",
r"shutil\.rmtree",
]
def create_workspace(self):
"""Create workspace directory if it doesn't exist"""
os.makedirs(self.workspace_dir, exist_ok=True)
print(f"📁 Workspace: {self.workspace_dir}")
def read_file(self, file_path):
"""Read a file with safety checks"""
try:
# Convert to absolute path within workspace
abs_path = self._resolve_path(file_path)
# Security check
if not self._is_safe_path(abs_path):
return {"error": f"Access restricted: {file_path}"}
if not os.path.exists(abs_path):
return {"error": f"File not found: {file_path}"}
if not os.path.isfile(abs_path):
return {"error": f"Not a file: {file_path}"}
with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return {
"success": True,
"path": abs_path,
"content": content,
"size": len(content),
"lines": len(content.split('\n'))
}
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except Exception as e:
return {"error": f"Error reading {file_path}: {str(e)}"}
def write_file(self, file_path, content):
"""Write content to a file"""
try:
abs_path = self._resolve_path(file_path)
# Security check
if not self._is_safe_path(abs_path):
return {"error": f"Write restricted: {file_path}"}
# Create directory if needed
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, 'w', encoding='utf-8') as f:
f.write(content)
return {
"success": True,
"path": abs_path,
"message": f"File written: {abs_path}",
"size": len(content)
}
except Exception as e:
return {"error": f"Error writing {file_path}: {str(e)}"}
def create_file(self, file_path, content=""):
"""Create a new file with optional content"""
return self.write_file(file_path, content)
def list_files(self, directory=".", pattern="*"):
"""List files in a directory"""
try:
dir_path = self._resolve_path(directory)
if not os.path.exists(dir_path):
return {"error": f"Directory not found: {directory}"}
if not os.path.isdir(dir_path):
return {"error": f"Not a directory: {directory}"}
files = []
for root, dirs, filenames in os.walk(dir_path):
# Skip hidden directories
dirs[:] = [d for d in dirs if not d.startswith('.')]
for filename in filenames:
if not filename.startswith('.'): # Skip hidden files
full_path = os.path.join(root, filename)
rel_path = os.path.relpath(full_path, self.workspace_dir)
try:
stat = os.stat(full_path)
files.append({
"name": filename,
"path": rel_path,
"size": stat.st_size,
"modified": stat.st_mtime
})
except:
continue
return {
"success": True,
"directory": dir_path,
"files": sorted(files, key=lambda x: x["path"]),
"count": len(files)
}
except Exception as e:
return {"error": f"Error listing files: {str(e)}"}
def analyze_file(self, file_path):
"""Analyze a Python file using AST"""
read_result = self.read_file(file_path)
if "error" in read_result:
return read_result
content = read_result["content"]
try:
tree = ast.parse(content)
functions = []
classes = []
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
functions.append({
"name": node.name,
"line": node.lineno,
"args": [arg.arg for arg in node.args.args]
})
elif isinstance(node, ast.ClassDef):
classes.append({
"name": node.name,
"line": node.lineno
})
elif isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}")
return {
"success": True,
"path": read_result["path"],
"valid_python": True,
"functions": functions,
"classes": classes,
"imports": imports,
"analysis": f"Found {len(functions)} functions, {len(classes)} classes, {len(imports)} imports"
}
except SyntaxError as e:
return {
"success": False,
"path": read_result["path"],
"valid_python": False,
"error": f"Syntax error at line {e.lineno}: {e.msg}"
}
except Exception as e:
return {
"success": False,
"path": read_result["path"],
"error": f"Analysis error: {str(e)}"
}
def _resolve_path(self, path):
"""Resolve path relative to workspace"""
if os.path.isabs(path):
return path
return os.path.join(self.workspace_dir, path)
def _is_safe_path(self, path):
"""Check if path is safe to access"""
abs_path = os.path.abspath(path)
# Must be within workspace
if not abs_path.startswith(self.workspace_dir):
return False
# Block sensitive files
sensitive = ['.env', 'secret', 'password', 'key', 'token', '.git', '__pycache__']
for pattern in sensitive:
if pattern in abs_path.lower():
return False
return True
# Initialize file system manager
fs = FileSystemManager()
# ===== Enhanced Agent Functions =====
def coding_agent(task, context="", persona="coder", file_context=None):
"""Enhanced coding agent with file context support"""
persona_prompts = {
'coder': COHERE_CODING_AGENT,
'reviewer': COHERE_CODE_REVIEWER,
'architect': COHERE_ARCHITECT
}
system_prompt = persona_prompts.get(persona, COHERE_CODING_AGENT)
full_prompt = f"{system_prompt}\n\nTask: {task}"
if context:
full_prompt += f"\n\nCode Context:\n```python\n{context}\n```"
if file_context:
full_prompt += f"\n\nFile Context:\n{file_context}"
response = co.chat(
model="command-a-03-2025",
messages=[
{
"role": "user",
"content": full_prompt,
}
],
)
return response.message.content[0].text
def simple_agent():
"""Test the function"""
response = co.chat(
model='command-a-03-2025',
messages=[
{
'role': 'user',
'content': "I'm joining a new startup called Co1t today. Could you help me write a one-sentence introduction message to my teammates"
}
],
)
print(response.message.content[0].text)
def interactive_agent():
"""CLI for the agent with file operations"""
print("🤖 AI Coding Agent (with File System)")
print("Type 'quit' to exit, 'help' for commands\n")
while True:
user_input = input("\n> ").strip()
if user_input.lower() == 'quit':
print("Goodbye!")
break
elif user_input.lower() == 'help':
print("\n📚 COMMANDS:")
print(" help - Show this help")
print(" quit - Exit")
print(" test - Test agent")
print(" read <file> - Read a file")
print(" write <file> <content> - Write to file")
print(" create <file> [content] - Create new file")
print(" list [dir] - List files")
print(" analyze <file> - Analyze Python file")
print(" review <file/code> - Review code")
print(" architect <task> - Use architect")
print(" edit <file> - Edit file with AI")
print(" Or ask any coding question!")
continue
elif user_input.lower() == 'test':
print("\n🧪 Testing agent...")
result = coding_agent(
"Write Python function to check prime numbers",
persona="coder"
)
print(f"\n{result}")
# ===== FILE OPERATIONS =====
elif user_input.lower().startswith('read '):
file_path = user_input[5:].strip()
if not file_path:
print("❌ Please provide file path")
continue
print(f"\n📖 Reading {file_path}...")
result = fs.read_file(file_path)
if "error" in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ File: {result['path']}")
print(f"Size: {result['size']} chars, Lines: {result['lines']}")
print("-" * 60)
# Show first 500 chars
preview = result['content']
if len(preview) > 500:
preview = preview[:500] + "...\n[Truncated]"
print(preview)
print("-" * 60)
elif user_input.lower().startswith('write '):
parts = user_input[6:].strip().split(' ', 1)
if len(parts) < 2:
print("❌ Usage: write <file> <content>")
continue
file_path, content = parts
print(f"\n✏️ Writing to {file_path}...")
result = fs.write_file(file_path, content)
if "error" in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ {result['message']}")
elif user_input.lower().startswith('create '):
parts = user_input[7:].strip().split(' ', 1)
file_path = parts[0]
content = parts[1] if len(parts) > 1 else ""
print(f"\n📝 Creating {file_path}...")
result = fs.create_file(file_path, content)
if "error" in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ File created: {result['path']}")
if content:
print(f" With {len(content)} characters of content")
elif user_input.lower().startswith('list'):
dir_path = user_input[5:].strip() or "."
print(f"\n📁 Listing files in {dir_path}...")
result = fs.list_files(dir_path)
if "error" in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ Directory: {result['directory']}")
print(f"Found {result['count']} files:")
print("-" * 60)
for file_info in result['files'][:20]: # Show first 20
size_kb = file_info['size'] / 1024
print(f"{file_info['path']:40} ({size_kb:.1f} KB)")
if result['count'] > 20:
print(f"... and {result['count'] - 20} more files")
print("-" * 60)
elif user_input.lower().startswith('analyze '):
file_path = user_input[8:].strip()
if not file_path:
print("❌ Please provide file path")
continue
print(f"\n🔍 Analyzing {file_path}...")
result = fs.analyze_file(file_path)
if "error" in result:
print(f"❌ Error: {result['error']}")
elif not result.get("valid_python", True):
print(f"⚠️ {result['error']}")
else:
print(f"✅ Valid Python file: {result['path']}")
print(f"📊 Analysis: {result['analysis']}")
if result['functions']:
print("\n📋 Functions:")
for func in result['functions'][:5]: # Show first 5
args = ', '.join(func['args'])
print(f" • {func['name']}({args}) (line {func['line']})")
if result['classes']:
print("\n🏗️ Classes:")
for cls in result['classes'][:5]:
print(f" • {cls['name']} (line {cls['line']})")
if result['imports']:
print("\n📦 Imports:")
for imp in result['imports'][:10]: # Show first 10
print(f" • {imp}")
elif user_input.lower().startswith('review'):
# Check if it's a file or inline code
target = user_input[7:].strip()
if os.path.exists(target) or '/' in target or '.' in target:
# It's probably a file path
print(f"\n🔍 Reviewing file: {target}")
read_result = fs.read_file(target)
if "error" in read_result:
print(f"❌ Error reading file: {read_result['error']}")
continue
context = read_result['content']
print(f"📄 Read {len(context)} characters from file")
else:
# It's inline code
context = target
if not context:
print("❌ Please provide code or file path to review")
continue
print(f"\n🔍 Reviewing code ({len(context)} chars)...")
print("🤖 Analyzing with AI reviewer...")
result = coding_agent(
"Review this code for security issues, bugs, and improvements",
context=context,
persona="reviewer"
)
print(f"\n{result}")
elif user_input.lower().startswith('edit '):
# AI-powered file editing
parts = user_input[5:].strip().split(' ', 1)
if len(parts) < 2:
print("❌ Usage: edit <file> <instructions>")
continue
file_path, instructions = parts
print(f"\n✏️ Editing {file_path} with AI...")
# Read current file
read_result = fs.read_file(file_path)
if "error" in read_result:
print(f"❌ Error reading file: {read_result['error']}")
continue
current_content = read_result['content']
print(f"📄 Current file: {len(current_content)} characters")
# Ask AI to edit
edit_prompt = f"""Edit this file according to these instructions:
File: {file_path}
Instructions: {instructions}
Current content:
```python
{current_content[:1000]} # First 1000 chars
```
Return the COMPLETE new file content. Only output the code, no explanations."""
print("🤖 AI is editing the file...")
try:
new_content = coding_agent(edit_prompt, context=current_content, persona="coder")
# Write edited content
write_result = fs.write_file(file_path, new_content)
if "error" in write_result:
print(f"❌ Error saving: {write_result['error']}")
else:
print(f"✅ File updated: {write_result['path']}")
print(f" New size: {len(new_content)} characters")
# Show diff summary
old_lines = current_content.split('\n')
new_lines = new_content.split('\n')
print(f" Lines changed: {abs(len(new_lines) - len(old_lines))}")
except Exception as e:
print(f"❌ AI editing failed: {str(e)}")
elif user_input.lower().startswith('architect'):
task = user_input[10:].strip()
if not task:
print("❌ Please provide a task for the architect")
continue
print("\n🏗️ Architect thinking...")
result = coding_agent(task, persona="architect")
print(f"\n{result}")
else:
# Check if query mentions a file
file_mentioned = None
for word in user_input.split():
if '.' in word and not word.startswith('.'):
# Might be a file reference
possible_file = word.strip('.,!?;:"\'')
if os.path.exists(possible_file) or '/' in possible_file:
file_mentioned = possible_file
break
if file_mentioned:
# Try to read the file for context
print(f"\n📄 Detected file reference: {file_mentioned}")
read_result = fs.read_file(file_mentioned)
if "error" not in read_result:
print(f"✅ Adding file context ({len(read_result['content'])} chars)")
file_context = f"File '{file_mentioned}' content:\n{read_result['content'][:2000]}"
result = coding_agent(user_input, file_context=file_context, persona="coder")
else:
print(f"⚠️ Could not read file, proceeding without context")
result = coding_agent(user_input, persona="coder")
else:
# General coding question
result = coding_agent(user_input, persona="coder")
print(f"\n🤖 Assistant:\n{result}")
# ===== Example Files Creation =====
def setup_example_files():
"""Create example files in workspace"""
examples = {
"math_operations.py": """def add(a, b):
return a + b
def subtract(a, b):
return a - b
def multiply(a, b):
return a * b
def divide(a, b):
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
# Test the functions
if __name__ == "__main__":
print("Testing math operations:")
print(f"2 + 3 = {add(2, 3)}")
print(f"5 - 2 = {subtract(5, 2)}")
print(f"4 * 6 = {multiply(4, 6)}")
print(f"10 / 2 = {divide(10, 2)}")""",
"data_processor.py": """import json
from typing import List, Dict, Any
class DataProcessor:
def __init__(self):
self.data = []
def load_from_file(self, filepath: str):
with open(filepath, 'r') as f:
self.data = json.load(f)
return self
def filter_by_key(self, key: str, value: Any) -> List[Dict]:
return [item for item in self.data if item.get(key) == value]
def calculate_average(self, key: str) -> float:
values = [item[key] for item in self.data if key in item]
if not values:
return 0.0
return sum(values) / len(values)
def save_results(self, filepath: str, results: List[Dict]):
with open(filepath, 'w') as f:
json.dump(results, f, indent=2)""",
"README.md": """# AI Coding Agent Workspace
This workspace contains example files for the AI coding agent.
## Files:
- `math_operations.py`: Basic math functions
- `data_processor.py`: Data processing class
- `config.json`: Configuration file
## Usage:
Use the agent commands to read, write, and analyze these files."""
}
print("\n📁 Setting up example workspace...")
for filename, content in examples.items():
result = fs.create_file(filename, content)
if "error" in result:
print(f"❌ Failed to create {filename}: {result['error']}")
else:
print(f"✅ Created {filename}")
# Create empty config file
fs.create_file("config.json", '{"debug": true, "version": "1.0.0"}')
print("\n🎉 Workspace ready! Try these commands:")
print(" • list")
print(" • read math_operations.py")
print(" • analyze data_processor.py")
print(" • review math_operations.py")
print(" • edit math_operations.py 'add docstrings to all functions'")
if __name__ == '__main__':
# Setup workspace
setup_example_files()
# Test basic functionality
print("\n" + "="*60)
simple_agent()
# Start interactive mode
print("\n" + "="*60)
interactive_agent()