""" OpenCode Terminal Portal ------------------------- Manages OpenCode terminal TUI as a provider. Supports free models: Kimi K2.5 Free, MiniMax M2.5 Free, Big Pickle, GLM 4.7 ANONYMOUS MODE: No credentials stored, fresh device identity each session """ import asyncio import logging import subprocess import os import json import random import string from typing import Optional, Dict, Any, Callable from dataclasses import dataclass from datetime import datetime import threading import queue import shutil logger = logging.getLogger("kai_api.terminal_portal") @dataclass class TerminalConfig: """Configuration for OpenCode terminal.""" name: str model: str # e.g., "kimi-k2.5-free", "minimax-m2.5-free" project_dir: str = "." config_path: str = ".opencode/config.json" def generate_anonymous_identity(): """Generate random device identifiers to appear as different device each time.""" return { "device_id": ''.join(random.choices(string.hexdigits.lower(), k=32)), "session_id": ''.join(random.choices(string.hexdigits.lower(), k=16)), "fingerprint": ''.join(random.choices(string.hexdigits.lower(), k=24)), } # Free models configuration OPENCODE_MODELS = { "kimi-k2.5-free": { "name": "Kimi K2.5 Free", "provider": "opencode-zen", "description": "Moonshot AI's Kimi K2.5 - Free tier" }, "minimax-m2.5-free": { "name": "MiniMax M2.5 Free", "provider": "opencode-zen", "description": "MiniMax M2.5 - Free tier" }, "big-pickle": { "name": "Big Pickle", "provider": "opencode-zen", "description": "Stealth model - Free" }, "glm-4.7": { "name": "GLM 4.7", "provider": "opencode-zen", "description": "GLM 4.7 - Free tier" } } class OpenCodeTerminalPortal: """Manages OpenCode terminal TUI session with DISPOSABLE mode.""" # Disposable mode settings MAX_MESSAGES_BEFORE_RESET = 20 # Auto-reset after 20 messages AUTO_NEW_CHAT_BETWEEN_MESSAGES = True # Start new chat between each message def __init__(self, config: TerminalConfig): self.config = config self.process: Optional[subprocess.Popen] = None self.is_initialized = False self.output_queue = queue.Queue() self.input_queue = queue.Queue() self.output_thread: Optional[threading.Thread] = None self.on_output_callback: Optional[Callable] = None self.screenshot_path = f"/tmp/opencode_{config.model}.png" self.last_activity = None self._keyboard_active = False self.message_count = 0 # Track messages for auto-reset self.current_identity = None # Track current session identity self.session_dir = None # Track current session directory async def initialize(self): """Initialize OpenCode terminal session in ANONYMOUS/DISPOSABLE mode.""" if self.is_initialized: return try: logger.info(f"๐Ÿš€ Starting OpenCode terminal with model: {self.config.model}") logger.info("๐Ÿ”’ Anonymous mode: No credentials stored, fresh device identity") logger.info("๐Ÿ—‘๏ธ Disposable mode: Auto-reset after 20 messages, new chat between messages") # Generate fresh anonymous identity self.current_identity = generate_anonymous_identity() identity = self.current_identity # Create isolated config directory for this session self.session_dir = f"/tmp/opencode_session_{identity['session_id'][:8]}" config_path = f"{self.session_dir}/config.json" os.makedirs(self.session_dir, exist_ok=True) # Remove any existing auth files to ensure anonymous mode auth_paths = [ os.path.expanduser("~/.local/share/opencode/auth.json"), os.path.expanduser("~/.local/share/opencode"), ".opencode/auth.json", "/tmp/opencode_auth.json" ] for path in auth_paths: if os.path.exists(path): try: if os.path.isfile(path): os.remove(path) logger.info(f"๐Ÿ—‘๏ธ Removed auth file: {path}") elif os.path.isdir(path): shutil.rmtree(path) logger.info(f"๐Ÿ—‘๏ธ Removed auth directory: {path}") except Exception as e: logger.warning(f"Could not remove {path}: {e}") # Create anonymous config - NO login required config_data = { "$schema": "https://opencode.ai/config.json", "theme": "opencode", "provider": { "opencode-zen": { "npm": "@ai-sdk/openai-compatible", "options": { "baseURL": "https://opencode.ai/zen/v1" }, "models": { model: {"name": info["name"]} for model, info in OPENCODE_MODELS.items() } } }, "model": f"opencode-zen/{self.config.model}", "autoshare": False, "autoupdate": True, # Anonymous mode settings "anonymous": True, "deviceId": identity["device_id"], "sessionId": identity["session_id"] } with open(config_path, 'w') as f: json.dump(config_data, f, indent=2) logger.info(f"โœ… Config created at: {config_path}") # Start OpenCode process with custom environment env = os.environ.copy() env['OPENCODE_CONFIG'] = os.path.abspath(config_path) # Set random terminal identifier env['TERM_SESSION_ID'] = identity["session_id"] # Prevent any auth persistence env['OPENCODE_NO_AUTH'] = '1' self.process = subprocess.Popen( ['npx', '-y', 'opencode-ai'], cwd=self.config.project_dir, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Merge stderr into stdout for easier reading text=True, bufsize=1 ) # Start output reading thread self.output_thread = threading.Thread(target=self._read_output) self.output_thread.daemon = True self.output_thread.start() self.is_initialized = True logger.info(f"โœ… OpenCode terminal ready (Anonymous mode)") logger.info(f"๐Ÿ†” Session ID: {identity['session_id'][:16]}...") # Wait for startup and send initial commands to select free model await asyncio.sleep(3) # Auto-select the free model (Ctrl+X then M) await self.send_key('ctrl+x') await asyncio.sleep(0.5) await self.send_input('m') await asyncio.sleep(1) # Select model based on config model_map = { "kimi-k2.5-free": "kimi", "minimax-m2.5-free": "minimax", "big-pickle": "big pickle", "glm-4.7": "glm" } model_keyword = model_map.get(self.config.model, "kimi") # Send enter to select default (usually Kimi) await self.send_key('Enter') await asyncio.sleep(0.5) await self.take_screenshot() except Exception as e: logger.error(f"Failed to initialize OpenCode terminal: {e}") raise def _read_output(self): """Read output from OpenCode process in background thread.""" try: while self.process and self.process.poll() is None: # Read line from stdout line = self.process.stdout.readline() if line: self.output_queue.put(('stdout', line)) if self.on_output_callback: asyncio.create_task(self.on_output_callback('stdout', line)) # Check stderr import select if self.process.stderr in select.select([self.process.stderr], [], [], 0)[0]: err_line = self.process.stderr.readline() if err_line: self.output_queue.put(('stderr', err_line)) except Exception as e: logger.error(f"Error reading output: {e}") async def send_input(self, text: str, is_message: bool = True): """Send text input to OpenCode with disposable mode handling.""" if not self.process or not self.is_initialized: return False try: # If this is a user message (not a command), handle disposable mode if is_message and self.AUTO_NEW_CHAT_BETWEEN_MESSAGES: # Start a new chat before sending the message await self._start_new_chat() await asyncio.sleep(0.5) # Send the actual message self.process.stdin.write(text + '\n') self.process.stdin.flush() self.last_activity = datetime.now() # Track message count for auto-reset if is_message: self.message_count += 1 logger.info(f"๐Ÿ“จ Message {self.message_count}/{self.MAX_MESSAGES_BEFORE_RESET}") # Check if we need to auto-reset if self.message_count >= self.MAX_MESSAGES_BEFORE_RESET: logger.info("๐Ÿ”„ Auto-reset triggered after 20 messages!") await self._full_reset() return True except Exception as e: logger.error(f"Error sending input: {e}") return False async def _start_new_chat(self): """Start a new chat to avoid context carryover.""" try: # Send Ctrl+N for new chat (or equivalent command) logger.info("๐Ÿ†• Starting new chat...") self.process.stdin.write('\x0e') # Ctrl+N self.process.stdin.flush() await asyncio.sleep(0.3) except Exception as e: logger.warning(f"Could not start new chat: {e}") async def _full_reset(self): """Complete reset: wipe everything and start fresh.""" logger.info("๐Ÿงน PERFORMING FULL DISPOSABLE RESET...") # 1. Close current session await self.close() # 2. Aggressive cleanup of ALL traces await self._complete_cleanup() # 3. Reset counters self.message_count = 0 self.current_identity = None self.session_dir = None # 4. Wait a moment to ensure cleanup await asyncio.sleep(2) # 5. Reinitialize with fresh identity logger.info("๐Ÿ”„ Starting fresh session...") await self.initialize() logger.info("โœ… Full reset complete - OpenCode sees a completely new device!") async def _complete_cleanup(self): """Complete cleanup of ALL OpenCode traces.""" cleanup_paths = [ # Config directories "/tmp/opencode_session_*", os.path.expanduser("~/.local/share/opencode"), os.path.expanduser("~/.config/opencode"), os.path.expanduser("~/.opencode"), ".opencode", # Cache and temp files os.path.expanduser("~/.cache/opencode"), "/tmp/opencode*", "/tmp/.opencode*", # Node/npm cache that might have identifiers os.path.expanduser("~/.npm/_npx/*opencode*"), os.path.expanduser("~/.npm/_logs/*opencode*"), # Any auth files os.path.expanduser("~/.local/share/opencode/auth.json"), "/tmp/opencode_auth.json", "/tmp/kai-opencode-*", ] for path_pattern in cleanup_paths: try: import glob matching_paths = glob.glob(path_pattern) for path in matching_paths: if os.path.exists(path): if os.path.isfile(path): os.remove(path) logger.info(f"๐Ÿ—‘๏ธ Deleted file: {path}") elif os.path.isdir(path): shutil.rmtree(path, ignore_errors=True) logger.info(f"๐Ÿ—‘๏ธ Deleted directory: {path}") except Exception as e: logger.warning(f"Cleanup warning for {path_pattern}: {e}") # Clear npm/npx cache try: subprocess.run(["npm", "cache", "clean", "--force"], capture_output=True, timeout=10) logger.info("๐Ÿงน NPM cache cleared") except Exception as e: logger.warning(f"Could not clear NPM cache: {e}") # Clear any system-level temporary identifiers try: # Clear /tmp of any opencode related files import glob for f in glob.glob("/tmp/*opencode*"): try: if os.path.isfile(f): os.remove(f) elif os.path.isdir(f): shutil.rmtree(f, ignore_errors=True) except: pass except: pass logger.info("๐Ÿงน Complete cleanup finished - No traces left!") async def send_key(self, key: str): """Send a special key to OpenCode (e.g., 'ctrl+c', 'enter', 'tab').""" if not self.process or not self.is_initialized: return False try: # Map common keys key_map = { 'Enter': '\n', 'Tab': '\t', 'Escape': '\x1b', 'Backspace': '\x7f', 'Delete': '\x1b[3~', 'ArrowUp': '\x1b[A', 'ArrowDown': '\x1b[B', 'ArrowLeft': '\x1b[D', 'ArrowRight': '\x1b[C', } char = key_map.get(key, key) self.process.stdin.write(char) self.process.stdin.flush() return True except Exception as e: logger.error(f"Error sending key: {e}") return False async def execute_command(self, command: str): """Execute an OpenCode command (e.g., '/models', '/connect').""" return await self.send_input(command) async def take_screenshot(self) -> str: """Take a screenshot of the terminal (if supported by terminal emulator).""" # For now, we'll create a text-based representation # In production, you could use a terminal emulator that supports screenshots try: # Get recent output output_lines = [] while not self.output_queue.empty() and len(output_lines) < 50: try: stream, line = self.output_queue.get_nowait() output_lines.append(line) except queue.Empty: break # Create a simple text screenshot screenshot_text = "\n".join(output_lines[-25:]) if output_lines else "Terminal ready..." # Save to file with open(self.screenshot_path, 'w') as f: f.write(screenshot_text) return self.screenshot_path except Exception as e: logger.error(f"Screenshot failed: {e}") return "" def get_output(self, max_lines: int = 100) -> list: """Get recent output lines.""" lines = [] temp_queue = queue.Queue() # Get lines from queue without removing them permanently while not self.output_queue.empty() and len(lines) < max_lines: try: item = self.output_queue.get_nowait() lines.append(item) temp_queue.put(item) except queue.Empty: break # Put them back while not temp_queue.empty(): self.output_queue.put(temp_queue.get()) return lines def set_keyboard_active(self, active: bool): """Enable/disable keyboard input capture.""" self._keyboard_active = active logger.info(f"Keyboard {'activated' if active else 'deactivated'} for OpenCode") def is_keyboard_active(self) -> bool: """Check if keyboard is active.""" return self._keyboard_active def is_running(self) -> bool: """Check if OpenCode is running.""" if not self.process: return False return self.process.poll() is None async def close(self): """Close the OpenCode terminal and cleanup anonymous session.""" try: if self.process: # Send exit command try: self.process.stdin.write('/exit\n') self.process.stdin.flush() await asyncio.sleep(1) except: pass # Kill if still running if self.process.poll() is None: self.process.terminate() await asyncio.sleep(2) if self.process.poll() is None: self.process.kill() self.process = None self.is_initialized = False # Cleanup session directory to remove any traces if self.session_dir and os.path.exists(self.session_dir): try: shutil.rmtree(self.session_dir, ignore_errors=True) logger.info(f"๐Ÿงน Cleaned up session directory: {self.session_dir}") except Exception as e: logger.warning(f"Session cleanup warning: {e}") # Reset message counter self.message_count = 0 logger.info("OpenCode terminal closed (Anonymous session cleaned)") except Exception as e: logger.error(f"Error closing OpenCode: {e}") async def sync_auth(self): """DEPRECATED: Anonymous mode - no auth to sync.""" logger.warning("sync_auth() called but anonymous mode is active - no credentials stored") return False async def manual_reset(self): """Manually trigger a full disposable reset.""" logger.info("๐Ÿ”„ Manual reset requested") await self._full_reset() return True def get_disposable_status(self) -> dict: """Get current disposable mode status.""" return { "message_count": self.message_count, "max_messages": self.MAX_MESSAGES_BEFORE_RESET, "messages_remaining": max(0, self.MAX_MESSAGES_BEFORE_RESET - self.message_count), "auto_reset_enabled": True, "new_chat_between_messages": self.AUTO_NEW_CHAT_BETWEEN_MESSAGES, "is_running": self.is_running(), "anonymous_mode": True, "session_dir": self.session_dir, "device_id": self.current_identity["device_id"][:16] + "..." if self.current_identity else None, } class TerminalPortalManager: """Manages multiple OpenCode terminal portals.""" def __init__(self): self.portals: Dict[str, OpenCodeTerminalPortal] = {} def get_portal(self, model: str) -> OpenCodeTerminalPortal: """Get or create a portal for a specific model.""" if model not in self.portals: if model not in OPENCODE_MODELS: raise ValueError(f"Unknown OpenCode model: {model}") config = TerminalConfig( name=OPENCODE_MODELS[model]["name"], model=model, config_path=f".opencode/config_{model}.json" ) self.portals[model] = OpenCodeTerminalPortal(config) return self.portals[model] def get_available_models(self) -> Dict[str, Dict]: """Get all available OpenCode models.""" return OPENCODE_MODELS.copy() async def close_all(self): """Close all terminal portals.""" for portal in self.portals.values(): await portal.close() # Global instance _terminal_manager = TerminalPortalManager() def get_terminal_manager() -> TerminalPortalManager: """Get the global terminal manager.""" return _terminal_manager