Spaces:
Running
Running
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import gradio as gr | |
| import time | |
| import sys | |
| import subprocess | |
| import os | |
| import pty | |
| import select | |
| import signal | |
| import fcntl | |
| import struct | |
| import termios | |
| import threading | |
| import re | |
| import ast | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| from typing import List, Dict | |
| from functools import lru_cache | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| MODEL_REPO = "AIencoder/Qwen2.5CMR-Q4_K_M-GGUF" | |
| MODEL_FILE = "qwen2.5cmr-q4_k_m.gguf" | |
| DISPLAY_NUM = ":99" | |
| SCREEN_W, SCREEN_H = 800, 600 | |
| SNIPPETS_FILE = "/tmp/axon_snippets.json" | |
| # ═══════════════════════════════════════ | |
| # Virtual Display (Optimized for pygame/tkinter/turtle) | |
| # ═══════════════════════════════════════ | |
| class VirtualDisplay: | |
| def __init__(self): | |
| self.xvfb_proc = None | |
| self.display = DISPLAY_NUM | |
| self._last_capture = None | |
| self._start_xvfb() | |
| def _start_xvfb(self): | |
| try: | |
| subprocess.run(["pkill", "-f", f"Xvfb {self.display}"], | |
| capture_output=True, timeout=5) | |
| time.sleep(0.3) | |
| # Full-featured Xvfb: GLX for OpenGL, RENDER for anti-aliasing, | |
| # 24-bit color, no access control | |
| self.xvfb_proc = subprocess.Popen([ | |
| "Xvfb", self.display, | |
| "-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24", | |
| "-ac", # No access control | |
| "-nolisten", "tcp", # Security | |
| "+extension", "GLX", # OpenGL support (pygame) | |
| "+extension", "RENDER", # Anti-aliased rendering | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| time.sleep(0.8) | |
| if self.xvfb_proc.poll() is None: | |
| os.environ["DISPLAY"] = self.display | |
| print(f"[Xvfb] Running on {self.display} ({SCREEN_W}x{SCREEN_H})") | |
| else: | |
| # Fallback: simpler Xvfb without extensions | |
| self.xvfb_proc = subprocess.Popen([ | |
| "Xvfb", self.display, | |
| "-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24", "-ac", | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| time.sleep(0.5) | |
| if self.xvfb_proc.poll() is None: | |
| os.environ["DISPLAY"] = self.display | |
| print(f"[Xvfb] Running (fallback mode)") | |
| else: | |
| self.xvfb_proc = None | |
| print("[Xvfb] Failed to start") | |
| except Exception as e: | |
| print(f"[Xvfb] {e}"); self.xvfb_proc = None | |
| def capture(self): | |
| """Capture the virtual display. Returns filepath or None.""" | |
| if not self.is_running: return None | |
| # Clean up previous capture to avoid /tmp filling up | |
| if self._last_capture: | |
| try: os.unlink(self._last_capture) | |
| except: pass | |
| tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) | |
| tmp.close() | |
| cap_env = {**os.environ, "DISPLAY": self.display} | |
| # Method 1: xwd + convert (most reliable for Xvfb) | |
| try: | |
| xwd = subprocess.run( | |
| ["xwd", "-root", "-display", self.display, "-silent"], | |
| capture_output=True, timeout=3, env=cap_env) | |
| if xwd.returncode == 0 and xwd.stdout: | |
| conv = subprocess.run( | |
| ["convert", "xwd:-", tmp.name], | |
| input=xwd.stdout, capture_output=True, timeout=3) | |
| if conv.returncode == 0 and os.path.getsize(tmp.name) > 100: | |
| self._last_capture = tmp.name | |
| return tmp.name | |
| except: pass | |
| # Method 2: scrot | |
| try: | |
| r = subprocess.run( | |
| ["scrot", tmp.name], | |
| capture_output=True, timeout=3, env=cap_env) | |
| if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100: | |
| self._last_capture = tmp.name | |
| return tmp.name | |
| except: pass | |
| # Method 3: import (ImageMagick) | |
| try: | |
| r = subprocess.run( | |
| f"import -window root -display {self.display} {tmp.name}", | |
| shell=True, capture_output=True, timeout=3, env=cap_env) | |
| if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100: | |
| self._last_capture = tmp.name | |
| return tmp.name | |
| except: pass | |
| try: os.unlink(tmp.name) | |
| except: pass | |
| return None | |
| def is_running(self): | |
| return self.xvfb_proc is not None and self.xvfb_proc.poll() is None | |
| def cleanup(self): | |
| if self._last_capture: | |
| try: os.unlink(self._last_capture) | |
| except: pass | |
| if self.xvfb_proc: | |
| self.xvfb_proc.terminate() | |
| try: self.xvfb_proc.wait(timeout=3) | |
| except: self.xvfb_proc.kill() | |
| vdisplay = VirtualDisplay() | |
| # ═══════════════════════════════════════ | |
| # GUI Process Manager (pygame/tkinter/turtle) | |
| # ═══════════════════════════════════════ | |
| GUI_LOG = "/tmp/_axon_gui.log" | |
| class GUIProcessManager: | |
| def __init__(self): | |
| self.process = None | |
| def launch(self, code): | |
| self.stop() | |
| tmp = "/tmp/_axon_gui_run.py" | |
| with open(tmp, "w") as f: f.write(code) | |
| # Full environment for GUI frameworks | |
| env = os.environ.copy() | |
| env.update({ | |
| "DISPLAY": DISPLAY_NUM, | |
| # SDL / pygame | |
| "SDL_VIDEODRIVER": "x11", | |
| "SDL_AUDIODRIVER": "dummy", # No audio device in headless | |
| "PYGAME_HIDE_SUPPORT_PROMPT": "1", # Suppress pygame welcome | |
| # Tkinter / general X11 | |
| "XDG_RUNTIME_DIR": "/tmp", | |
| "MESA_GL_VERSION_OVERRIDE": "3.3", # OpenGL compat | |
| "LIBGL_ALWAYS_SOFTWARE": "1", # Software rendering (no GPU) | |
| # Python | |
| "PYTHONUNBUFFERED": "1", | |
| "PYTHONPATH": "/tmp/axon_workspace:" + env.get("PYTHONPATH", ""), | |
| }) | |
| try: | |
| # Log file instead of PIPE — PIPE deadlocks pygame's output | |
| log_f = open(GUI_LOG, "w") | |
| self.process = subprocess.Popen( | |
| [sys.executable, "-u", tmp], # -u = unbuffered | |
| stdout=log_f, stderr=log_f, | |
| env=env, preexec_fn=os.setsid, cwd="/tmp/axon_workspace") | |
| time.sleep(1.0) # Give pygame time to create the window | |
| if self.process.poll() is not None: | |
| log_f.close() | |
| try: | |
| with open(GUI_LOG) as f: msg = f.read().strip() | |
| except: msg = "" | |
| return f"[Exited immediately]\n{msg}" if msg else "[Exited — no output]" | |
| return f"[GUI launched — PID {self.process.pid}] Auto-refresh enabled." | |
| except Exception as e: | |
| return f"[Error] {e}" | |
| def get_log(self): | |
| try: | |
| with open(GUI_LOG) as f: return f.read()[-2000:] | |
| except: return "" | |
| def stop(self): | |
| if self.process and self.process.poll() is None: | |
| pid = self.process.pid | |
| try: | |
| os.killpg(os.getpgid(pid), signal.SIGTERM) | |
| self.process.wait(timeout=3) | |
| except: | |
| try: os.killpg(os.getpgid(pid), signal.SIGKILL) | |
| except: pass | |
| self.process = None | |
| return f"[Stopped PID {pid}]" | |
| self.process = None | |
| return "[No process]" | |
| def is_running(self): return self.process is not None and self.process.poll() is None | |
| def get_status(self): return f"● PID {self.process.pid}" if self.is_running else "○ idle" | |
| gui_mgr = GUIProcessManager() | |
| # ═══════════════════════════════════════ | |
| # File System | |
| # ═══════════════════════════════════════ | |
| class FileSystem: | |
| def __init__(self): | |
| self.files: Dict[str, str] = { | |
| "main.py": '# Start coding here\nfrom utils import add\n\nresult = add(3, 7)\nprint(f"3 + 7 = {result}")', | |
| "utils.py": "def add(a, b):\n return a + b\n\ndef multiply(a, b):\n return a * b", | |
| } | |
| self.current_file = "main.py" | |
| self._sync_dir = "/tmp/axon_workspace" | |
| self._sync_to_disk() | |
| def _sync_to_disk(self): | |
| os.makedirs(self._sync_dir, exist_ok=True) | |
| for name, content in self.files.items(): | |
| path = os.path.join(self._sync_dir, name) | |
| with open(path, "w") as f: f.write(content) | |
| def save_file(self, content): | |
| if self.current_file: | |
| self.files[self.current_file] = content | |
| path = os.path.join(self._sync_dir, self.current_file) | |
| os.makedirs(os.path.dirname(path) if "/" in self.current_file else self._sync_dir, exist_ok=True) | |
| with open(path, "w") as f: f.write(content) | |
| def get_current_file_content(self): return self.files.get(self.current_file, "") | |
| def set_current_file(self, f): | |
| if f in self.files: self.current_file = f | |
| def create_file(self, f, c=""): | |
| if f and f not in self.files: | |
| self.files[f] = c; self.current_file = f; self._sync_to_disk() | |
| def delete_file(self, f): | |
| if f in self.files and len(self.files) > 1: | |
| del self.files[f] | |
| path = os.path.join(self._sync_dir, f) | |
| if os.path.exists(path): os.unlink(path) | |
| self.current_file = list(self.files.keys())[0] | |
| return True | |
| return False | |
| def get_all_files(self): return list(self.files.keys()) | |
| fs = FileSystem() | |
| # ═══════════════════════════════════════ | |
| # Snippet Library | |
| # ═══════════════════════════════════════ | |
| class SnippetLibrary: | |
| def __init__(self): | |
| self.snippets = {} | |
| self._load() | |
| def _load(self): | |
| if os.path.exists(SNIPPETS_FILE): | |
| try: | |
| with open(SNIPPETS_FILE) as f: self.snippets = json.load(f) | |
| except: self.snippets = {} | |
| def _save(self): | |
| try: | |
| with open(SNIPPETS_FILE, "w") as f: json.dump(self.snippets, f) | |
| except: pass | |
| def add(self, name, code, tags=""): | |
| if not name.strip(): return "Error: name required" | |
| self.snippets[name.strip()] = {"code": code, "tags": tags, "created": time.strftime("%Y-%m-%d")} | |
| self._save(); return f"Saved: {name}" | |
| def get(self, name): return self.snippets.get(name, {}).get("code", "") | |
| def delete(self, name): | |
| if name in self.snippets: del self.snippets[name]; self._save(); return f"Deleted: {name}" | |
| return "Not found" | |
| def get_names(self): return list(self.snippets.keys()) | |
| snippets = SnippetLibrary() | |
| # ═══════════════════════════════════════ | |
| # Code Analyzer & FindReplace | |
| # ═══════════════════════════════════════ | |
| class CodeAnalyzer: | |
| def analyze(code: str) -> str: | |
| if not code or not code.strip(): return "Empty file" | |
| try: tree = ast.parse(code) | |
| except SyntaxError as e: return f"⚠ Syntax error line {e.lineno}: {e.msg}" | |
| lines = [f"📊 {len(code.splitlines())} lines"] | |
| imports = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for a in node.names: imports.append(a.name) | |
| elif isinstance(node, ast.ImportFrom): | |
| imports.append(f"from {node.module}") | |
| elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| args = ", ".join(a.arg for a in node.args.args) | |
| lines.append(f" L{node.lineno} ⚡ def {node.name}({args})") | |
| elif isinstance(node, ast.ClassDef): | |
| lines.append(f" L{node.lineno} 🏗 class {node.name}") | |
| if imports: | |
| lines.insert(1, f"📦 {', '.join(imports)}") | |
| return "\n".join(lines) | |
| analyzer = CodeAnalyzer() | |
| class FindReplace: | |
| def find_in_file(code, query, case): | |
| if not query: return "" | |
| res = [] | |
| for i, line in enumerate(code.splitlines(), 1): | |
| match = (query in line) if case else (query.lower() in line.lower()) | |
| if match: | |
| res.append(f" L{i}: {line.strip()}") | |
| return f"Found {len(res)} match(es):\n" + "\n".join(res) if res else "No matches." | |
| def find_all_files(files, query): | |
| if not query: return "" | |
| res = [] | |
| for fname, content in files.items(): | |
| for i, line in enumerate(content.splitlines(), 1): | |
| if query.lower() in line.lower(): | |
| res.append(f" {fname}:L{i}: {line.strip()}") | |
| return f"Found {len(res)} across all files:\n" + "\n".join(res) if res else "No matches." | |
| def replace_in_file(code, find, replace, case): | |
| if not find: return code, "Empty find." | |
| flags = 0 if case else re.IGNORECASE | |
| new_code, count = re.subn(re.escape(find), replace, code, flags=flags) | |
| return new_code, f"Replaced {count} occurrence(s)." | |
| finder = FindReplace() | |
| # ═══════════════════════════════════════ | |
| # PTY Terminal (fixed) | |
| # ═══════════════════════════════════════ | |
| class PTYTerminal: | |
| STRIP_ANSI = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b\[.*?[@-~]|\r') | |
| def __init__(self): | |
| self.master_fd = None; self.pid = None | |
| self.lock = threading.Lock() | |
| self.log_lines: List[str] = [] | |
| self.max_lines = 800 | |
| self.cmd_history: List[str] = [] | |
| self._spawn() | |
| self._append("AXON TERMINAL v4.0") | |
| self._append("══════════════════════════════════════") | |
| self._append("Full PTY shell · Cross-file imports") | |
| self._append(" pip/npm/apt-get/git/curl/make") | |
| self._append(" GUI apps → Display tab") | |
| self._append("══════════════════════════════════════") | |
| def _spawn(self): | |
| try: | |
| pid, fd = pty.openpty() | |
| self.pid = os.fork() | |
| if self.pid == 0: | |
| # Child — become the shell | |
| os.close(pid) | |
| os.setsid() | |
| fcntl.ioctl(fd, termios.TIOCSCTTY, 0) | |
| os.dup2(fd, 0); os.dup2(fd, 1); os.dup2(fd, 2) | |
| if fd > 2: os.close(fd) | |
| env = os.environ.copy() | |
| env.update({"TERM": "dumb", "PS1": "$ ", | |
| "DEBIAN_FRONTEND": "noninteractive", "DISPLAY": DISPLAY_NUM}) | |
| os.execvpe("/bin/bash", ["/bin/bash", "--norc", "--noprofile", "-i"], env) | |
| else: | |
| # Parent — hold the master fd | |
| os.close(fd) | |
| self.master_fd = pid | |
| # Set non-blocking properly | |
| flags = fcntl.fcntl(self.master_fd, fcntl.F_GETFL) | |
| fcntl.fcntl(self.master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | |
| # Set terminal size | |
| fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, | |
| struct.pack("HHHH", 40, 120, 0, 0)) | |
| time.sleep(0.3) | |
| self._read_raw(0.2) # Drain initial output | |
| except Exception as e: | |
| self._append(f"[Shell Error] {e}") | |
| self.master_fd = None | |
| def _read_raw(self, timeout=0.1) -> str: | |
| """Read raw bytes from PTY.""" | |
| if not self.master_fd: return "" | |
| out = []; deadline = time.time() + timeout | |
| while True: | |
| rem = deadline - time.time() | |
| if rem <= 0: break | |
| try: | |
| r, _, _ = select.select([self.master_fd], [], [], min(rem, 0.05)) | |
| if r: | |
| c = os.read(self.master_fd, 4096) | |
| if c: | |
| out.append(c.decode("utf-8", errors="replace")) | |
| deadline = time.time() + 0.15 | |
| else: break | |
| elif out: break | |
| except OSError: break | |
| return "".join(out) | |
| def _clean(self, text): | |
| """Strip ANSI escape codes.""" | |
| c = self.STRIP_ANSI.sub("", text) | |
| return "".join(ch for ch in c if ch in "\n\t" or ord(ch) >= 32) | |
| def _append(self, text): | |
| self.log_lines.append(text) | |
| while len(self.log_lines) > self.max_lines: | |
| self.log_lines.pop(0) | |
| def get_log(self): return "\n".join(self.log_lines) | |
| def run_command(self, cmd): | |
| cmd = cmd.strip() | |
| if not cmd: return self.get_log() | |
| # Save history | |
| if not self.cmd_history or self.cmd_history[-1] != cmd: | |
| self.cmd_history.append(cmd) | |
| if len(self.cmd_history) > 100: self.cmd_history.pop(0) | |
| with self.lock: | |
| if cmd.lower() == "clear": | |
| self.log_lines = []; return "" | |
| if cmd.lower() == "history": | |
| self._append("Command history:") | |
| for i, c in enumerate(self.cmd_history[-20:], 1): | |
| self._append(f" {i:3d} {c}") | |
| return self.get_log() | |
| if not self.master_fd: | |
| self._append(f"$ {cmd}") | |
| self._append("[Error] No shell.") | |
| return self.get_log() | |
| # Drain stale output | |
| self._read_raw(0.05) | |
| self._append(f"$ {cmd}") | |
| try: | |
| os.write(self.master_fd, (cmd + "\n").encode()) | |
| except OSError as e: | |
| self._append(f"[Write Error] {e}") | |
| return self.get_log() | |
| # Determine timeout based on command type | |
| parts = cmd.split() | |
| base = parts[0].lower() if parts else "" | |
| long_cmds = ["pip","pip3","npm","npx","apt-get","apt","git", | |
| "wget","curl","make","cmake","cargo","yarn","conda"] | |
| wait = 180 if base in long_cmds else (60 if base in ("python","python3","node") else 15) | |
| # Collect output | |
| chunks = []; start = time.time(); idle = 0 | |
| while time.time() - start < wait: | |
| c = self._read_raw(0.3) | |
| if c: | |
| idle = 0; chunks.append(c) | |
| if "".join(chunks).rstrip().endswith("$"): break | |
| else: | |
| idle += 1 | |
| if base not in long_cmds and idle >= 3: break | |
| if base in long_cmds and idle >= 10: break | |
| # Clean and filter | |
| raw = self._clean("".join(chunks)) | |
| lines = raw.split("\n") | |
| filtered = []; skip_echo = True | |
| for line in lines: | |
| s = line.strip() | |
| if skip_echo and s == cmd.strip(): | |
| skip_echo = False; continue | |
| if s in ("$", "$ "): continue | |
| filtered.append(line) | |
| result = "\n".join(filtered).strip() | |
| if result: self._append(result) | |
| return self.get_log() | |
| def get_history(self) -> List[str]: | |
| return list(reversed(self.cmd_history[-30:])) | |
| def cleanup(self): | |
| if self.pid and self.pid > 0: | |
| try: os.kill(self.pid, signal.SIGTERM) | |
| except: pass | |
| if self.master_fd: | |
| try: os.close(self.master_fd) | |
| except: pass | |
| terminal = PTYTerminal() | |
| # ═══════════════════════════════════════ | |
| # AI Model — Qwen2.5CMR Q4_K_M GGUF | |
| # ═══════════════════════════════════════ | |
| _llm_instance = None | |
| _llm_lock = threading.Lock() | |
| def load_model(): | |
| global _llm_instance | |
| if _llm_instance is not None: | |
| return _llm_instance | |
| with _llm_lock: | |
| if _llm_instance is not None: | |
| return _llm_instance | |
| print(f"Downloading {MODEL_REPO}/{MODEL_FILE}...") | |
| t0 = time.time() | |
| try: | |
| model_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename=MODEL_FILE, | |
| ) | |
| print(f"Downloaded in {time.time()-t0:.1f}s, loading...") | |
| from llama_cpp import Llama | |
| _llm_instance = Llama( | |
| model_path=model_path, | |
| n_ctx=4096, | |
| n_threads=os.cpu_count() or 4, | |
| n_gpu_layers=0, # CPU only | |
| verbose=False, | |
| ) | |
| print(f"Model ready in {time.time()-t0:.1f}s total") | |
| return _llm_instance | |
| except Exception as e: | |
| print(f"Model error: {e}") | |
| return None | |
| def ai_gen(system_prompt, code, max_tokens=300): | |
| llm = load_model() | |
| if not llm: return "Error: model failed to load" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": code}, | |
| ] | |
| try: | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=0.3, | |
| top_p=0.9, | |
| repeat_penalty=1.1, | |
| ) | |
| text = response["choices"][0]["message"]["content"].strip() | |
| return text if text else "(No output from model)" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # ═══════════════════════════════════════ | |
| # GUI code detection | |
| # ═══════════════════════════════════════ | |
| GUI_HINTS = [ | |
| "pygame", "tkinter", "turtle", "pyglet", "arcade", "kivy", | |
| "display.set_mode", "mainloop()", "Tk()", "Canvas(", | |
| "pygame.init", "pygame.display", "screen.fill", | |
| "from turtle import", "import turtle", | |
| ] | |
| def is_gui_code(c): return any(h in c for h in GUI_HINTS) | |
| # ═══════════════════════════════════════ | |
| # Theme + CSS | |
| # ═══════════════════════════════════════ | |
| theme = gr.themes.Default( | |
| primary_hue="cyan", neutral_hue="gray", | |
| font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"] | |
| ).set( | |
| body_background_fill="#080808", body_text_color="#e0e0e0", | |
| button_primary_background_fill="#0ff", button_primary_background_fill_hover="#0dd", | |
| button_primary_text_color="#000", | |
| button_secondary_background_fill="#1a1a1a", button_secondary_background_fill_hover="#2a2a2a", | |
| button_secondary_text_color="#0ff", button_secondary_border_color="#0ff4", | |
| block_background_fill="#0d0d0d", block_border_color="#1a1a1a", | |
| block_label_text_color="#0f8", | |
| input_background_fill="#0a0a0a", input_border_color="#1a3a2a", input_placeholder_color="#0f83", | |
| ) | |
| css = """ | |
| * { border-radius: 0 !important; } | |
| .gradio-container { max-width: 100% !important; } | |
| /* Header */ | |
| .axon-header { | |
| background: linear-gradient(90deg, #080808, #0a1a1a, #080808); | |
| border-bottom: 1px solid #0ff3; padding: 12px 20px !important; margin-bottom: 8px; | |
| } | |
| .axon-header h1 { | |
| font-family: 'IBM Plex Mono', monospace !important; | |
| background: linear-gradient(90deg, #0ff, #0f8); | |
| -webkit-background-clip: text; -webkit-text-fill-color: transparent; | |
| font-size: 1.4em !important; letter-spacing: 3px; margin: 0 !important; | |
| } | |
| .axon-header p { color: #0f84 !important; font-size: 0.75em; letter-spacing: 1px; } | |
| /* Terminal */ | |
| .term-box textarea { | |
| background: #050505 !important; color: #00ff41 !important; | |
| font-family: 'IBM Plex Mono', monospace !important; | |
| font-size: 12px !important; line-height: 1.5 !important; | |
| border: 1px solid #0f31 !important; text-shadow: 0 0 4px #0f32; | |
| } | |
| .term-input input { | |
| background: #050505 !important; color: #00ff41 !important; | |
| font-family: 'IBM Plex Mono', monospace !important; | |
| border: 1px solid #0f32 !important; font-size: 12px !important; | |
| } | |
| .term-input input::placeholder { color: #0f83 !important; } | |
| .term-input input:focus { border-color: #0ff !important; box-shadow: 0 0 8px #0ff2 !important; } | |
| /* Editor */ | |
| .code-editor textarea, .code-editor .cm-editor { | |
| font-family: 'IBM Plex Mono', monospace !important; font-size: 13px !important; | |
| } | |
| /* Buttons */ | |
| .tb { | |
| font-size: 0.8em !important; padding: 6px 12px !important; letter-spacing: 0.5px; | |
| text-transform: uppercase; font-weight: 600 !important; | |
| border: 1px solid transparent !important; transition: all 0.2s ease !important; | |
| } | |
| .tb:hover { border-color: #0ff !important; box-shadow: 0 0 12px #0ff2 !important; } | |
| /* Accordion */ | |
| .gr-accordion { border: 1px solid #1a2a2a !important; background: #0a0a0a !important; } | |
| .gr-accordion > .label-wrap { background: #0d0d0d !important; border-bottom: 1px solid #1a2a2a !important; } | |
| .gr-accordion > .label-wrap:hover { background: #121a1a !important; } | |
| .gr-accordion > .label-wrap span { | |
| color: #0ff !important; font-family: 'IBM Plex Mono', monospace !important; | |
| letter-spacing: 1px; font-size: 0.85em; | |
| } | |
| /* Tabs */ | |
| .tabs > .tab-nav > button { | |
| font-family: 'IBM Plex Mono', monospace !important; | |
| letter-spacing: 1px; font-size: 0.8em; text-transform: uppercase; | |
| color: #888 !important; border-bottom: 2px solid transparent !important; | |
| } | |
| .tabs > .tab-nav > button.selected { | |
| color: #0ff !important; border-bottom-color: #0ff !important; text-shadow: 0 0 8px #0ff4; | |
| } | |
| /* Structure + find */ | |
| .struct-box textarea { background: #050505 !important; color: #0ff !important; | |
| font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; } | |
| .find-box textarea { background: #050505 !important; color: #ffab40 !important; | |
| font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; } | |
| /* Status bar */ | |
| .status-bar { | |
| background: #0a0a0a !important; border-top: 1px solid #1a2a2a; | |
| padding: 4px 12px !important; font-size: 11px !important; color: #0f84 !important; | |
| } | |
| .status-bar strong { color: #0ff !important; } | |
| /* Misc */ | |
| ::-webkit-scrollbar { width: 6px; height: 6px; } | |
| ::-webkit-scrollbar-track { background: #0a0a0a; } | |
| ::-webkit-scrollbar-thumb { background: #0f83; } | |
| input, textarea, select { color: #0f8 !important; } | |
| .stop-btn { background: #a00 !important; color: #fff !important; border: none !important; } | |
| .stop-btn:hover { background: #c00 !important; } | |
| """ | |
| # ═══════════════════════════════════════ | |
| # UI | |
| # ═══════════════════════════════════════ | |
| with gr.Blocks(title="Axon Pro") as demo: | |
| with gr.Row(elem_classes="axon-header"): | |
| gr.Markdown("# ⬡ AXON PRO\n\nPYTHON AI IDE — v4.0") | |
| with gr.Row(equal_height=False): | |
| # ══ LEFT SIDEBAR ══ | |
| with gr.Column(scale=1, min_width=200): | |
| with gr.Accordion("📁 EXPLORER", open=True): | |
| file_list = gr.Dropdown(choices=fs.get_all_files(), value=fs.current_file, | |
| label="Files", interactive=True) | |
| new_file_txt = gr.Textbox(placeholder="new_file.py", show_label=False) | |
| with gr.Row(): | |
| new_btn = gr.Button("NEW", size="sm", elem_classes="tb") | |
| save_btn = gr.Button("SAVE", size="sm", elem_classes="tb") | |
| del_btn = gr.Button("DEL", size="sm", elem_classes="tb stop-btn") | |
| with gr.Accordion("🗺 STRUCTURE", open=True): | |
| structure_view = gr.Textbox( | |
| value=analyzer.analyze(fs.get_current_file_content()), | |
| label="", lines=10, interactive=False, elem_classes="struct-box", | |
| show_label=False) | |
| with gr.Accordion("📋 SNIPPETS", open=False): | |
| snip_list = gr.Dropdown(choices=snippets.get_names(), label="Saved", | |
| interactive=True) | |
| snip_name = gr.Textbox(placeholder="Snippet name", show_label=False) | |
| snip_tags = gr.Textbox(placeholder="Tags: api, util, loop", show_label=False) | |
| with gr.Row(): | |
| snip_save = gr.Button("SAVE", size="sm", elem_classes="tb") | |
| snip_insert = gr.Button("INSERT", size="sm", elem_classes="tb") | |
| snip_del = gr.Button("DEL", size="sm", elem_classes="tb stop-btn") | |
| snip_status = gr.Markdown("") | |
| # ══ CENTER ══ | |
| with gr.Column(scale=4): | |
| editor = gr.Code(value=fs.get_current_file_content(), | |
| label=f" {fs.current_file}", language="python", | |
| lines=20, interactive=True, elem_classes="code-editor") | |
| with gr.Row(): | |
| run_btn = gr.Button("▶ RUN", variant="primary", elem_classes="tb") | |
| stop_btn = gr.Button("■ STOP", size="sm", elem_classes="tb stop-btn") | |
| ai_complete = gr.Button("✦ COMPLETE", elem_classes="tb") | |
| ai_explain = gr.Button("◈ EXPLAIN", elem_classes="tb") | |
| ai_refactor = gr.Button("⟲ REFACTOR", elem_classes="tb") | |
| with gr.Tabs() as bottom_tabs: | |
| with gr.Tab("▶ OUTPUT", id="output-tab"): | |
| run_output = gr.Textbox(value="Run your code to see output here.", | |
| lines=12, max_lines=25, interactive=False, | |
| elem_classes="term-box", label="", show_label=False) | |
| with gr.Tab("⌘ TERMINAL", id="term-tab"): | |
| term_out = gr.Textbox(value=terminal.get_log(), lines=12, max_lines=25, | |
| interactive=False, elem_classes="term-box", | |
| label="", show_label=False) | |
| with gr.Row(): | |
| term_in = gr.Textbox(placeholder="$ pip install, git clone, ls ...", | |
| show_label=False, elem_classes="term-input") | |
| with gr.Row(): | |
| clear_btn = gr.Button("CLEAR", size="sm", elem_classes="tb") | |
| hist_dd = gr.Dropdown(choices=[], label="", interactive=True, | |
| scale=3, container=False) | |
| hist_btn = gr.Button("↻ HISTORY", size="sm", elem_classes="tb") | |
| with gr.Tab("🔍 FIND", id="find-tab"): | |
| with gr.Row(): | |
| find_query = gr.Textbox(label="Find", scale=4) | |
| replace_query = gr.Textbox(label="Replace", scale=4) | |
| find_case = gr.Checkbox(label="Case", value=False) | |
| with gr.Row(): | |
| find_btn = gr.Button("FIND", size="sm", elem_classes="tb") | |
| find_all_btn = gr.Button("FIND ALL FILES", size="sm", elem_classes="tb") | |
| replace_btn = gr.Button("REPLACE ALL", size="sm", elem_classes="tb") | |
| find_results = gr.Textbox(value="", lines=6, interactive=False, | |
| elem_classes="find-box", label="", show_label=False) | |
| with gr.Tab("💬 AI CHAT", id="chat-tab"): | |
| chat_history = gr.Chatbot(label="", height=250) | |
| with gr.Row(): | |
| chat_input = gr.Textbox(placeholder="Describe code you want...", | |
| scale=8, container=False) | |
| send_btn = gr.Button("GEN", variant="primary", scale=1, elem_classes="tb") | |
| with gr.Tab("⇄ DIFF", id="diff-tab"): | |
| diff_view = gr.Code(label="AI Changes", language="python", | |
| interactive=False, lines=15) | |
| with gr.Row(): | |
| apply_btn = gr.Button("✓ APPLY", variant="primary", elem_classes="tb") | |
| discard_btn = gr.Button("✗ DISCARD", elem_classes="tb") | |
| # ══ RIGHT: DISPLAY ══ | |
| with gr.Column(scale=2, min_width=250): | |
| with gr.Accordion("🖥 DISPLAY OUTPUT", open=True): | |
| display_image = gr.Image(label="", type="filepath", interactive=False, height=420) | |
| with gr.Row(): | |
| capture_btn = gr.Button("📸 CAPTURE", size="sm", elem_classes="tb") | |
| auto_capture = gr.Checkbox(label="Auto Refresh", value=False) | |
| refresh_speed = gr.Dropdown( | |
| choices=["0.5s", "1s", "2s", "5s"], value="1s", | |
| label="", container=False, scale=1, interactive=True) | |
| gui_status = gr.Markdown( | |
| f"<small>Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} " | |
| f"| {SCREEN_W}x{SCREEN_H} | SDL: x11 + dummy audio</small>") | |
| status_bar = gr.Markdown( | |
| f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ " | |
| f"Qwen2.5CMR Q4_K_M │ PTY + Xvfb │ Snippets + Structure + Find", | |
| elem_classes="status-bar") | |
| # State | |
| diff_original = gr.State("") | |
| diff_modified = gr.State("") | |
| auto_timer = gr.Timer(1, active=False) | |
| # ═══════════════════════════════════════ | |
| # HANDLERS | |
| # ═══════════════════════════════════════ | |
| # --- File ops --- | |
| def on_editor_change(content): | |
| fs.save_file(content) | |
| return analyzer.analyze(content) | |
| def on_file_select(filename): | |
| fs.set_current_file(filename) | |
| c = fs.get_current_file_content() | |
| return c, gr.update(label=f" {filename}"), analyzer.analyze(c) | |
| def on_new_file(name): | |
| if not name or not name.strip(): | |
| return gr.update(), gr.update(), gr.update(), gr.update() | |
| name = name.strip() | |
| fs.create_file(name) | |
| c = fs.get_current_file_content() | |
| return (gr.update(choices=fs.get_all_files(), value=name), | |
| c, gr.update(label=f" {name}"), analyzer.analyze(c)) | |
| def on_save(code): | |
| fs.save_file(code) | |
| return (f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ " | |
| f"Qwen2.5CMR Q4_K_M │ ✓ Saved {fs.current_file}") | |
| def on_delete(): | |
| name = fs.current_file | |
| if fs.delete_file(name): | |
| c = fs.get_current_file_content() | |
| return (gr.update(choices=fs.get_all_files(), value=fs.current_file), | |
| c, gr.update(label=f" {fs.current_file}"), analyzer.analyze(c)) | |
| return gr.update(), gr.update(), gr.update(), gr.update() | |
| # --- Run (smart: auto-detects GUI vs CLI) --- | |
| def on_run(code): | |
| fs.save_file(code) | |
| if is_gui_code(code): | |
| # GUI app — launch, take first screenshot, auto-enable refresh | |
| msg = gui_mgr.launch(code) | |
| time.sleep(0.5) # Extra time for first frame | |
| ss = vdisplay.capture() | |
| status = f"<small>Xvfb: ● ON | {gui_mgr.get_status()}</small>" | |
| return msg, ss, status, gr.update(value=True) # Enable auto-refresh | |
| # Normal script — run with subprocess, output goes to OUTPUT tab | |
| tmp = "/tmp/_axon_run.py" | |
| with open(tmp, "w") as f: f.write(code) | |
| try: | |
| env = os.environ.copy() | |
| env["DISPLAY"] = DISPLAY_NUM | |
| env["PYTHONPATH"] = fs._sync_dir + ":" + env.get("PYTHONPATH", "") | |
| r = subprocess.run([sys.executable, tmp], capture_output=True, text=True, | |
| timeout=30, env=env, cwd=fs._sync_dir) | |
| output = "" | |
| if r.stdout.strip(): output += r.stdout.rstrip() | |
| if r.stderr.strip(): | |
| if output: output += "\n" | |
| output += r.stderr.rstrip() | |
| if not output: output = "(No output)" | |
| except subprocess.TimeoutExpired: | |
| output = "[Timed out after 30s]" | |
| except Exception as e: | |
| output = f"[Error] {e}" | |
| return output, gr.update(), gr.update(), gr.update() | |
| def on_stop(): | |
| msg = gui_mgr.stop() | |
| status = f"<small>Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} | ○ idle</small>" | |
| return msg, status, gr.update(value=False) # Disable auto-refresh | |
| # --- Terminal --- | |
| def on_term_cmd(cmd): | |
| return terminal.run_command(cmd), "" | |
| def on_clear(): | |
| terminal.log_lines = []; return "" | |
| def on_refresh_hist(): | |
| return gr.update(choices=terminal.get_history()) | |
| def on_select_hist(cmd): | |
| return cmd if cmd else gr.update() | |
| # --- AI --- | |
| def on_complete(code): | |
| result = ai_gen("Complete this Python code. Only output the completion, no explanations.", code) | |
| new_code = code + "\n" + result | |
| return new_code, analyzer.analyze(new_code) | |
| def on_explain(code): | |
| explanation = ai_gen("Explain this Python code concisely.", code, 512) | |
| terminal._append(f"[AI Explanation]\n{explanation}") | |
| return terminal.get_log() | |
| def on_refactor(code): | |
| refactored = ai_gen("Refactor this Python code for PEP 8 and best practices. Output only code.", code, 512) | |
| fs.save_file(code) | |
| import difflib | |
| diff = "\n".join(difflib.unified_diff(code.splitlines(), refactored.splitlines(), lineterm="")) | |
| return diff, code, refactored | |
| def on_generate(prompt_text, history): | |
| generated = ai_gen(f"Write Python code for: {prompt_text}", "", 512) | |
| import difflib | |
| diff = "\n".join(difflib.unified_diff([], generated.splitlines(), lineterm="")) | |
| new_h = history + [ | |
| {"role": "user", "content": prompt_text}, | |
| {"role": "assistant", "content": "Code generated → check DIFF tab"}, | |
| ] | |
| return diff, "", generated, new_h, "" | |
| def on_apply(modified): | |
| return modified, analyzer.analyze(modified) | |
| # --- Snippets --- | |
| def on_snip_save(name, tags, code): | |
| msg = snippets.add(name, code, tags) | |
| return msg, gr.update(choices=snippets.get_names()) | |
| def on_snip_insert(name, code): | |
| snip_code = snippets.get(name) | |
| if snip_code: | |
| # Append snippet at cursor position (end of file) | |
| return code + "\n\n" + snip_code if code.strip() else snip_code | |
| return code | |
| def on_snip_del(name): | |
| msg = snippets.delete(name) | |
| return msg, gr.update(choices=snippets.get_names()) | |
| # --- Find --- | |
| def on_find(code, query, case): | |
| return finder.find_in_file(code, query, case) | |
| def on_find_all(query): | |
| return finder.find_all_files(fs.files, query) | |
| def on_replace(code, fq, rq, case): | |
| new_code, msg = finder.replace_in_file(code, fq, rq, case) | |
| fs.save_file(new_code) | |
| return new_code, msg, analyzer.analyze(new_code) | |
| # --- Display --- | |
| def on_capture(): | |
| return vdisplay.capture() | |
| def on_auto_tick(): | |
| ss = vdisplay.capture() | |
| # Auto-disable if GUI process died | |
| if not gui_mgr.is_running: | |
| return ss if ss else gr.update() | |
| return ss if ss else gr.update() | |
| def on_auto_toggle(checked): | |
| return gr.Timer(1, active=checked) | |
| def on_speed_change(speed, auto_on): | |
| speed_map = {"0.5s": 0.5, "1s": 1, "2s": 2, "5s": 5} | |
| interval = speed_map.get(speed, 1) | |
| return gr.Timer(interval, active=auto_on) | |
| # ═══════════════════════════════════════ | |
| # WIRING | |
| # ═══════════════════════════════════════ | |
| # Files | |
| editor.change(on_editor_change, editor, structure_view) | |
| file_list.change(on_file_select, file_list, [editor, editor, structure_view]) | |
| new_btn.click(on_new_file, new_file_txt, [file_list, editor, editor, structure_view]) | |
| save_btn.click(on_save, editor, status_bar) | |
| del_btn.click(on_delete, None, [file_list, editor, editor, structure_view]) | |
| # Run | |
| run_btn.click(on_run, editor, [run_output, display_image, gui_status, auto_capture] | |
| ).then(lambda: gr.Tabs(selected="output-tab"), None, bottom_tabs) | |
| stop_btn.click(on_stop, None, [run_output, gui_status, auto_capture]) | |
| # Terminal | |
| term_in.submit(on_term_cmd, term_in, [term_out, term_in]) | |
| clear_btn.click(on_clear, None, term_out) | |
| hist_btn.click(on_refresh_hist, None, hist_dd) | |
| hist_dd.change(on_select_hist, hist_dd, term_in) | |
| # AI | |
| ai_complete.click(on_complete, editor, [editor, structure_view]) | |
| ai_explain.click(on_explain, editor, term_out) | |
| ai_refactor.click(on_refactor, editor, [diff_view, diff_original, diff_modified] | |
| ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) | |
| # Chat | |
| chat_input.submit(on_generate, [chat_input, chat_history], | |
| [diff_view, diff_original, diff_modified, chat_history, chat_input] | |
| ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) | |
| send_btn.click(on_generate, [chat_input, chat_history], | |
| [diff_view, diff_original, diff_modified, chat_history, chat_input] | |
| ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) | |
| # Diff | |
| apply_btn.click(on_apply, diff_modified, [editor, structure_view] | |
| ).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs) | |
| discard_btn.click(lambda: (gr.update(), gr.update()), None, [editor, structure_view] | |
| ).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs) | |
| # Snippets | |
| snip_save.click(on_snip_save, [snip_name, snip_tags, editor], [snip_status, snip_list]) | |
| snip_insert.click(on_snip_insert, [snip_list, editor], editor) | |
| snip_del.click(on_snip_del, snip_list, [snip_status, snip_list]) | |
| # Find | |
| find_btn.click(on_find, [editor, find_query, find_case], find_results) | |
| find_all_btn.click(on_find_all, find_query, find_results) | |
| replace_btn.click(on_replace, [editor, find_query, replace_query, find_case], | |
| [editor, find_results, structure_view]) | |
| # Display | |
| capture_btn.click(on_capture, None, display_image) | |
| auto_capture.change(on_auto_toggle, auto_capture, auto_timer) | |
| refresh_speed.change(on_speed_change, [refresh_speed, auto_capture], auto_timer) | |
| auto_timer.tick(on_auto_tick, None, display_image) | |
| if __name__ == "__main__": | |
| import atexit | |
| atexit.register(terminal.cleanup) | |
| atexit.register(gui_mgr.stop) | |
| atexit.register(vdisplay.cleanup) | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, | |
| theme=theme, css=css, ssr_mode=False, allowed_paths=["/tmp"]) |