import warnings warnings.filterwarnings("ignore") import gradio as gr import time import sys import subprocess import os import pty import select import signal import fcntl import struct import termios import threading import re import ast import json import tempfile from pathlib import Path from huggingface_hub import hf_hub_download from typing import List, Dict from functools import lru_cache os.environ["TOKENIZERS_PARALLELISM"] = "false" MODEL_REPO = "AIencoder/Qwen2.5CMR-Q4_K_M-GGUF" MODEL_FILE = "qwen2.5cmr-q4_k_m.gguf" DISPLAY_NUM = ":99" SCREEN_W, SCREEN_H = 800, 600 SNIPPETS_FILE = "/tmp/axon_snippets.json" # ═══════════════════════════════════════ # Virtual Display (Optimized for pygame/tkinter/turtle) # ═══════════════════════════════════════ class VirtualDisplay: def __init__(self): self.xvfb_proc = None self.display = DISPLAY_NUM self._last_capture = None self._start_xvfb() def _start_xvfb(self): try: subprocess.run(["pkill", "-f", f"Xvfb {self.display}"], capture_output=True, timeout=5) time.sleep(0.3) # Full-featured Xvfb: GLX for OpenGL, RENDER for anti-aliasing, # 24-bit color, no access control self.xvfb_proc = subprocess.Popen([ "Xvfb", self.display, "-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24", "-ac", # No access control "-nolisten", "tcp", # Security "+extension", "GLX", # OpenGL support (pygame) "+extension", "RENDER", # Anti-aliased rendering ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(0.8) if self.xvfb_proc.poll() is None: os.environ["DISPLAY"] = self.display print(f"[Xvfb] Running on {self.display} ({SCREEN_W}x{SCREEN_H})") else: # Fallback: simpler Xvfb without extensions self.xvfb_proc = subprocess.Popen([ "Xvfb", self.display, "-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24", "-ac", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(0.5) if self.xvfb_proc.poll() is None: os.environ["DISPLAY"] = self.display print(f"[Xvfb] Running (fallback mode)") else: self.xvfb_proc = None print("[Xvfb] Failed to start") except Exception as e: print(f"[Xvfb] {e}"); self.xvfb_proc = None def capture(self): """Capture the virtual display. Returns filepath or None.""" if not self.is_running: return None # Clean up previous capture to avoid /tmp filling up if self._last_capture: try: os.unlink(self._last_capture) except: pass tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) tmp.close() cap_env = {**os.environ, "DISPLAY": self.display} # Method 1: xwd + convert (most reliable for Xvfb) try: xwd = subprocess.run( ["xwd", "-root", "-display", self.display, "-silent"], capture_output=True, timeout=3, env=cap_env) if xwd.returncode == 0 and xwd.stdout: conv = subprocess.run( ["convert", "xwd:-", tmp.name], input=xwd.stdout, capture_output=True, timeout=3) if conv.returncode == 0 and os.path.getsize(tmp.name) > 100: self._last_capture = tmp.name return tmp.name except: pass # Method 2: scrot try: r = subprocess.run( ["scrot", tmp.name], capture_output=True, timeout=3, env=cap_env) if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100: self._last_capture = tmp.name return tmp.name except: pass # Method 3: import (ImageMagick) try: r = subprocess.run( f"import -window root -display {self.display} {tmp.name}", shell=True, capture_output=True, timeout=3, env=cap_env) if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100: self._last_capture = tmp.name return tmp.name except: pass try: os.unlink(tmp.name) except: pass return None @property def is_running(self): return self.xvfb_proc is not None and self.xvfb_proc.poll() is None def cleanup(self): if self._last_capture: try: os.unlink(self._last_capture) except: pass if self.xvfb_proc: self.xvfb_proc.terminate() try: self.xvfb_proc.wait(timeout=3) except: self.xvfb_proc.kill() vdisplay = VirtualDisplay() # ═══════════════════════════════════════ # GUI Process Manager (pygame/tkinter/turtle) # ═══════════════════════════════════════ GUI_LOG = "/tmp/_axon_gui.log" class GUIProcessManager: def __init__(self): self.process = None def launch(self, code): self.stop() tmp = "/tmp/_axon_gui_run.py" with open(tmp, "w") as f: f.write(code) # Full environment for GUI frameworks env = os.environ.copy() env.update({ "DISPLAY": DISPLAY_NUM, # SDL / pygame "SDL_VIDEODRIVER": "x11", "SDL_AUDIODRIVER": "dummy", # No audio device in headless "PYGAME_HIDE_SUPPORT_PROMPT": "1", # Suppress pygame welcome # Tkinter / general X11 "XDG_RUNTIME_DIR": "/tmp", "MESA_GL_VERSION_OVERRIDE": "3.3", # OpenGL compat "LIBGL_ALWAYS_SOFTWARE": "1", # Software rendering (no GPU) # Python "PYTHONUNBUFFERED": "1", "PYTHONPATH": "/tmp/axon_workspace:" + env.get("PYTHONPATH", ""), }) try: # Log file instead of PIPE — PIPE deadlocks pygame's output log_f = open(GUI_LOG, "w") self.process = subprocess.Popen( [sys.executable, "-u", tmp], # -u = unbuffered stdout=log_f, stderr=log_f, env=env, preexec_fn=os.setsid, cwd="/tmp/axon_workspace") time.sleep(1.0) # Give pygame time to create the window if self.process.poll() is not None: log_f.close() try: with open(GUI_LOG) as f: msg = f.read().strip() except: msg = "" return f"[Exited immediately]\n{msg}" if msg else "[Exited — no output]" return f"[GUI launched — PID {self.process.pid}] Auto-refresh enabled." except Exception as e: return f"[Error] {e}" def get_log(self): try: with open(GUI_LOG) as f: return f.read()[-2000:] except: return "" def stop(self): if self.process and self.process.poll() is None: pid = self.process.pid try: os.killpg(os.getpgid(pid), signal.SIGTERM) self.process.wait(timeout=3) except: try: os.killpg(os.getpgid(pid), signal.SIGKILL) except: pass self.process = None return f"[Stopped PID {pid}]" self.process = None return "[No process]" @property def is_running(self): return self.process is not None and self.process.poll() is None def get_status(self): return f"● PID {self.process.pid}" if self.is_running else "○ idle" gui_mgr = GUIProcessManager() # ═══════════════════════════════════════ # File System # ═══════════════════════════════════════ class FileSystem: def __init__(self): self.files: Dict[str, str] = { "main.py": '# Start coding here\nfrom utils import add\n\nresult = add(3, 7)\nprint(f"3 + 7 = {result}")', "utils.py": "def add(a, b):\n return a + b\n\ndef multiply(a, b):\n return a * b", } self.current_file = "main.py" self._sync_dir = "/tmp/axon_workspace" self._sync_to_disk() def _sync_to_disk(self): os.makedirs(self._sync_dir, exist_ok=True) for name, content in self.files.items(): path = os.path.join(self._sync_dir, name) with open(path, "w") as f: f.write(content) def save_file(self, content): if self.current_file: self.files[self.current_file] = content path = os.path.join(self._sync_dir, self.current_file) os.makedirs(os.path.dirname(path) if "/" in self.current_file else self._sync_dir, exist_ok=True) with open(path, "w") as f: f.write(content) def get_current_file_content(self): return self.files.get(self.current_file, "") def set_current_file(self, f): if f in self.files: self.current_file = f def create_file(self, f, c=""): if f and f not in self.files: self.files[f] = c; self.current_file = f; self._sync_to_disk() def delete_file(self, f): if f in self.files and len(self.files) > 1: del self.files[f] path = os.path.join(self._sync_dir, f) if os.path.exists(path): os.unlink(path) self.current_file = list(self.files.keys())[0] return True return False def get_all_files(self): return list(self.files.keys()) fs = FileSystem() # ═══════════════════════════════════════ # Snippet Library # ═══════════════════════════════════════ class SnippetLibrary: def __init__(self): self.snippets = {} self._load() def _load(self): if os.path.exists(SNIPPETS_FILE): try: with open(SNIPPETS_FILE) as f: self.snippets = json.load(f) except: self.snippets = {} def _save(self): try: with open(SNIPPETS_FILE, "w") as f: json.dump(self.snippets, f) except: pass def add(self, name, code, tags=""): if not name.strip(): return "Error: name required" self.snippets[name.strip()] = {"code": code, "tags": tags, "created": time.strftime("%Y-%m-%d")} self._save(); return f"Saved: {name}" def get(self, name): return self.snippets.get(name, {}).get("code", "") def delete(self, name): if name in self.snippets: del self.snippets[name]; self._save(); return f"Deleted: {name}" return "Not found" def get_names(self): return list(self.snippets.keys()) snippets = SnippetLibrary() # ═══════════════════════════════════════ # Code Analyzer & FindReplace # ═══════════════════════════════════════ class CodeAnalyzer: @staticmethod def analyze(code: str) -> str: if not code or not code.strip(): return "Empty file" try: tree = ast.parse(code) except SyntaxError as e: return f"⚠ Syntax error line {e.lineno}: {e.msg}" lines = [f"📊 {len(code.splitlines())} lines"] imports = [] for node in ast.walk(tree): if isinstance(node, ast.Import): for a in node.names: imports.append(a.name) elif isinstance(node, ast.ImportFrom): imports.append(f"from {node.module}") elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): args = ", ".join(a.arg for a in node.args.args) lines.append(f" L{node.lineno} ⚡ def {node.name}({args})") elif isinstance(node, ast.ClassDef): lines.append(f" L{node.lineno} 🏗 class {node.name}") if imports: lines.insert(1, f"📦 {', '.join(imports)}") return "\n".join(lines) analyzer = CodeAnalyzer() class FindReplace: @staticmethod def find_in_file(code, query, case): if not query: return "" res = [] for i, line in enumerate(code.splitlines(), 1): match = (query in line) if case else (query.lower() in line.lower()) if match: res.append(f" L{i}: {line.strip()}") return f"Found {len(res)} match(es):\n" + "\n".join(res) if res else "No matches." @staticmethod def find_all_files(files, query): if not query: return "" res = [] for fname, content in files.items(): for i, line in enumerate(content.splitlines(), 1): if query.lower() in line.lower(): res.append(f" {fname}:L{i}: {line.strip()}") return f"Found {len(res)} across all files:\n" + "\n".join(res) if res else "No matches." @staticmethod def replace_in_file(code, find, replace, case): if not find: return code, "Empty find." flags = 0 if case else re.IGNORECASE new_code, count = re.subn(re.escape(find), replace, code, flags=flags) return new_code, f"Replaced {count} occurrence(s)." finder = FindReplace() # ═══════════════════════════════════════ # PTY Terminal (fixed) # ═══════════════════════════════════════ class PTYTerminal: STRIP_ANSI = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b\[.*?[@-~]|\r') def __init__(self): self.master_fd = None; self.pid = None self.lock = threading.Lock() self.log_lines: List[str] = [] self.max_lines = 800 self.cmd_history: List[str] = [] self._spawn() self._append("AXON TERMINAL v4.0") self._append("══════════════════════════════════════") self._append("Full PTY shell · Cross-file imports") self._append(" pip/npm/apt-get/git/curl/make") self._append(" GUI apps → Display tab") self._append("══════════════════════════════════════") def _spawn(self): try: pid, fd = pty.openpty() self.pid = os.fork() if self.pid == 0: # Child — become the shell os.close(pid) os.setsid() fcntl.ioctl(fd, termios.TIOCSCTTY, 0) os.dup2(fd, 0); os.dup2(fd, 1); os.dup2(fd, 2) if fd > 2: os.close(fd) env = os.environ.copy() env.update({"TERM": "dumb", "PS1": "$ ", "DEBIAN_FRONTEND": "noninteractive", "DISPLAY": DISPLAY_NUM}) os.execvpe("/bin/bash", ["/bin/bash", "--norc", "--noprofile", "-i"], env) else: # Parent — hold the master fd os.close(fd) self.master_fd = pid # Set non-blocking properly flags = fcntl.fcntl(self.master_fd, fcntl.F_GETFL) fcntl.fcntl(self.master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) # Set terminal size fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, struct.pack("HHHH", 40, 120, 0, 0)) time.sleep(0.3) self._read_raw(0.2) # Drain initial output except Exception as e: self._append(f"[Shell Error] {e}") self.master_fd = None def _read_raw(self, timeout=0.1) -> str: """Read raw bytes from PTY.""" if not self.master_fd: return "" out = []; deadline = time.time() + timeout while True: rem = deadline - time.time() if rem <= 0: break try: r, _, _ = select.select([self.master_fd], [], [], min(rem, 0.05)) if r: c = os.read(self.master_fd, 4096) if c: out.append(c.decode("utf-8", errors="replace")) deadline = time.time() + 0.15 else: break elif out: break except OSError: break return "".join(out) def _clean(self, text): """Strip ANSI escape codes.""" c = self.STRIP_ANSI.sub("", text) return "".join(ch for ch in c if ch in "\n\t" or ord(ch) >= 32) def _append(self, text): self.log_lines.append(text) while len(self.log_lines) > self.max_lines: self.log_lines.pop(0) def get_log(self): return "\n".join(self.log_lines) def run_command(self, cmd): cmd = cmd.strip() if not cmd: return self.get_log() # Save history if not self.cmd_history or self.cmd_history[-1] != cmd: self.cmd_history.append(cmd) if len(self.cmd_history) > 100: self.cmd_history.pop(0) with self.lock: if cmd.lower() == "clear": self.log_lines = []; return "" if cmd.lower() == "history": self._append("Command history:") for i, c in enumerate(self.cmd_history[-20:], 1): self._append(f" {i:3d} {c}") return self.get_log() if not self.master_fd: self._append(f"$ {cmd}") self._append("[Error] No shell.") return self.get_log() # Drain stale output self._read_raw(0.05) self._append(f"$ {cmd}") try: os.write(self.master_fd, (cmd + "\n").encode()) except OSError as e: self._append(f"[Write Error] {e}") return self.get_log() # Determine timeout based on command type parts = cmd.split() base = parts[0].lower() if parts else "" long_cmds = ["pip","pip3","npm","npx","apt-get","apt","git", "wget","curl","make","cmake","cargo","yarn","conda"] wait = 180 if base in long_cmds else (60 if base in ("python","python3","node") else 15) # Collect output chunks = []; start = time.time(); idle = 0 while time.time() - start < wait: c = self._read_raw(0.3) if c: idle = 0; chunks.append(c) if "".join(chunks).rstrip().endswith("$"): break else: idle += 1 if base not in long_cmds and idle >= 3: break if base in long_cmds and idle >= 10: break # Clean and filter raw = self._clean("".join(chunks)) lines = raw.split("\n") filtered = []; skip_echo = True for line in lines: s = line.strip() if skip_echo and s == cmd.strip(): skip_echo = False; continue if s in ("$", "$ "): continue filtered.append(line) result = "\n".join(filtered).strip() if result: self._append(result) return self.get_log() def get_history(self) -> List[str]: return list(reversed(self.cmd_history[-30:])) def cleanup(self): if self.pid and self.pid > 0: try: os.kill(self.pid, signal.SIGTERM) except: pass if self.master_fd: try: os.close(self.master_fd) except: pass terminal = PTYTerminal() # ═══════════════════════════════════════ # AI Model — Qwen2.5CMR Q4_K_M GGUF # ═══════════════════════════════════════ _llm_instance = None _llm_lock = threading.Lock() def load_model(): global _llm_instance if _llm_instance is not None: return _llm_instance with _llm_lock: if _llm_instance is not None: return _llm_instance print(f"Downloading {MODEL_REPO}/{MODEL_FILE}...") t0 = time.time() try: model_path = hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILE, ) print(f"Downloaded in {time.time()-t0:.1f}s, loading...") from llama_cpp import Llama _llm_instance = Llama( model_path=model_path, n_ctx=4096, n_threads=os.cpu_count() or 4, n_gpu_layers=0, # CPU only verbose=False, ) print(f"Model ready in {time.time()-t0:.1f}s total") return _llm_instance except Exception as e: print(f"Model error: {e}") return None def ai_gen(system_prompt, code, max_tokens=300): llm = load_model() if not llm: return "Error: model failed to load" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": code}, ] try: response = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=0.3, top_p=0.9, repeat_penalty=1.1, ) text = response["choices"][0]["message"]["content"].strip() return text if text else "(No output from model)" except Exception as e: return f"Error: {e}" # ═══════════════════════════════════════ # GUI code detection # ═══════════════════════════════════════ GUI_HINTS = [ "pygame", "tkinter", "turtle", "pyglet", "arcade", "kivy", "display.set_mode", "mainloop()", "Tk()", "Canvas(", "pygame.init", "pygame.display", "screen.fill", "from turtle import", "import turtle", ] def is_gui_code(c): return any(h in c for h in GUI_HINTS) # ═══════════════════════════════════════ # Theme + CSS # ═══════════════════════════════════════ theme = gr.themes.Default( primary_hue="cyan", neutral_hue="gray", font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"] ).set( body_background_fill="#080808", body_text_color="#e0e0e0", button_primary_background_fill="#0ff", button_primary_background_fill_hover="#0dd", button_primary_text_color="#000", button_secondary_background_fill="#1a1a1a", button_secondary_background_fill_hover="#2a2a2a", button_secondary_text_color="#0ff", button_secondary_border_color="#0ff4", block_background_fill="#0d0d0d", block_border_color="#1a1a1a", block_label_text_color="#0f8", input_background_fill="#0a0a0a", input_border_color="#1a3a2a", input_placeholder_color="#0f83", ) css = """ * { border-radius: 0 !important; } .gradio-container { max-width: 100% !important; } /* Header */ .axon-header { background: linear-gradient(90deg, #080808, #0a1a1a, #080808); border-bottom: 1px solid #0ff3; padding: 12px 20px !important; margin-bottom: 8px; } .axon-header h1 { font-family: 'IBM Plex Mono', monospace !important; background: linear-gradient(90deg, #0ff, #0f8); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 1.4em !important; letter-spacing: 3px; margin: 0 !important; } .axon-header p { color: #0f84 !important; font-size: 0.75em; letter-spacing: 1px; } /* Terminal */ .term-box textarea { background: #050505 !important; color: #00ff41 !important; font-family: 'IBM Plex Mono', monospace !important; font-size: 12px !important; line-height: 1.5 !important; border: 1px solid #0f31 !important; text-shadow: 0 0 4px #0f32; } .term-input input { background: #050505 !important; color: #00ff41 !important; font-family: 'IBM Plex Mono', monospace !important; border: 1px solid #0f32 !important; font-size: 12px !important; } .term-input input::placeholder { color: #0f83 !important; } .term-input input:focus { border-color: #0ff !important; box-shadow: 0 0 8px #0ff2 !important; } /* Editor */ .code-editor textarea, .code-editor .cm-editor { font-family: 'IBM Plex Mono', monospace !important; font-size: 13px !important; } /* Buttons */ .tb { font-size: 0.8em !important; padding: 6px 12px !important; letter-spacing: 0.5px; text-transform: uppercase; font-weight: 600 !important; border: 1px solid transparent !important; transition: all 0.2s ease !important; } .tb:hover { border-color: #0ff !important; box-shadow: 0 0 12px #0ff2 !important; } /* Accordion */ .gr-accordion { border: 1px solid #1a2a2a !important; background: #0a0a0a !important; } .gr-accordion > .label-wrap { background: #0d0d0d !important; border-bottom: 1px solid #1a2a2a !important; } .gr-accordion > .label-wrap:hover { background: #121a1a !important; } .gr-accordion > .label-wrap span { color: #0ff !important; font-family: 'IBM Plex Mono', monospace !important; letter-spacing: 1px; font-size: 0.85em; } /* Tabs */ .tabs > .tab-nav > button { font-family: 'IBM Plex Mono', monospace !important; letter-spacing: 1px; font-size: 0.8em; text-transform: uppercase; color: #888 !important; border-bottom: 2px solid transparent !important; } .tabs > .tab-nav > button.selected { color: #0ff !important; border-bottom-color: #0ff !important; text-shadow: 0 0 8px #0ff4; } /* Structure + find */ .struct-box textarea { background: #050505 !important; color: #0ff !important; font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; } .find-box textarea { background: #050505 !important; color: #ffab40 !important; font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; } /* Status bar */ .status-bar { background: #0a0a0a !important; border-top: 1px solid #1a2a2a; padding: 4px 12px !important; font-size: 11px !important; color: #0f84 !important; } .status-bar strong { color: #0ff !important; } /* Misc */ ::-webkit-scrollbar { width: 6px; height: 6px; } ::-webkit-scrollbar-track { background: #0a0a0a; } ::-webkit-scrollbar-thumb { background: #0f83; } input, textarea, select { color: #0f8 !important; } .stop-btn { background: #a00 !important; color: #fff !important; border: none !important; } .stop-btn:hover { background: #c00 !important; } """ # ═══════════════════════════════════════ # UI # ═══════════════════════════════════════ with gr.Blocks(title="Axon Pro") as demo: with gr.Row(elem_classes="axon-header"): gr.Markdown("# ⬡ AXON PRO\n\nPYTHON AI IDE — v4.0") with gr.Row(equal_height=False): # ══ LEFT SIDEBAR ══ with gr.Column(scale=1, min_width=200): with gr.Accordion("📁 EXPLORER", open=True): file_list = gr.Dropdown(choices=fs.get_all_files(), value=fs.current_file, label="Files", interactive=True) new_file_txt = gr.Textbox(placeholder="new_file.py", show_label=False) with gr.Row(): new_btn = gr.Button("NEW", size="sm", elem_classes="tb") save_btn = gr.Button("SAVE", size="sm", elem_classes="tb") del_btn = gr.Button("DEL", size="sm", elem_classes="tb stop-btn") with gr.Accordion("🗺 STRUCTURE", open=True): structure_view = gr.Textbox( value=analyzer.analyze(fs.get_current_file_content()), label="", lines=10, interactive=False, elem_classes="struct-box", show_label=False) with gr.Accordion("📋 SNIPPETS", open=False): snip_list = gr.Dropdown(choices=snippets.get_names(), label="Saved", interactive=True) snip_name = gr.Textbox(placeholder="Snippet name", show_label=False) snip_tags = gr.Textbox(placeholder="Tags: api, util, loop", show_label=False) with gr.Row(): snip_save = gr.Button("SAVE", size="sm", elem_classes="tb") snip_insert = gr.Button("INSERT", size="sm", elem_classes="tb") snip_del = gr.Button("DEL", size="sm", elem_classes="tb stop-btn") snip_status = gr.Markdown("") # ══ CENTER ══ with gr.Column(scale=4): editor = gr.Code(value=fs.get_current_file_content(), label=f" {fs.current_file}", language="python", lines=20, interactive=True, elem_classes="code-editor") with gr.Row(): run_btn = gr.Button("▶ RUN", variant="primary", elem_classes="tb") stop_btn = gr.Button("■ STOP", size="sm", elem_classes="tb stop-btn") ai_complete = gr.Button("✦ COMPLETE", elem_classes="tb") ai_explain = gr.Button("◈ EXPLAIN", elem_classes="tb") ai_refactor = gr.Button("⟲ REFACTOR", elem_classes="tb") with gr.Tabs() as bottom_tabs: with gr.Tab("▶ OUTPUT", id="output-tab"): run_output = gr.Textbox(value="Run your code to see output here.", lines=12, max_lines=25, interactive=False, elem_classes="term-box", label="", show_label=False) with gr.Tab("⌘ TERMINAL", id="term-tab"): term_out = gr.Textbox(value=terminal.get_log(), lines=12, max_lines=25, interactive=False, elem_classes="term-box", label="", show_label=False) with gr.Row(): term_in = gr.Textbox(placeholder="$ pip install, git clone, ls ...", show_label=False, elem_classes="term-input") with gr.Row(): clear_btn = gr.Button("CLEAR", size="sm", elem_classes="tb") hist_dd = gr.Dropdown(choices=[], label="", interactive=True, scale=3, container=False) hist_btn = gr.Button("↻ HISTORY", size="sm", elem_classes="tb") with gr.Tab("🔍 FIND", id="find-tab"): with gr.Row(): find_query = gr.Textbox(label="Find", scale=4) replace_query = gr.Textbox(label="Replace", scale=4) find_case = gr.Checkbox(label="Case", value=False) with gr.Row(): find_btn = gr.Button("FIND", size="sm", elem_classes="tb") find_all_btn = gr.Button("FIND ALL FILES", size="sm", elem_classes="tb") replace_btn = gr.Button("REPLACE ALL", size="sm", elem_classes="tb") find_results = gr.Textbox(value="", lines=6, interactive=False, elem_classes="find-box", label="", show_label=False) with gr.Tab("💬 AI CHAT", id="chat-tab"): chat_history = gr.Chatbot(label="", height=250) with gr.Row(): chat_input = gr.Textbox(placeholder="Describe code you want...", scale=8, container=False) send_btn = gr.Button("GEN", variant="primary", scale=1, elem_classes="tb") with gr.Tab("⇄ DIFF", id="diff-tab"): diff_view = gr.Code(label="AI Changes", language="python", interactive=False, lines=15) with gr.Row(): apply_btn = gr.Button("✓ APPLY", variant="primary", elem_classes="tb") discard_btn = gr.Button("✗ DISCARD", elem_classes="tb") # ══ RIGHT: DISPLAY ══ with gr.Column(scale=2, min_width=250): with gr.Accordion("🖥 DISPLAY OUTPUT", open=True): display_image = gr.Image(label="", type="filepath", interactive=False, height=420) with gr.Row(): capture_btn = gr.Button("📸 CAPTURE", size="sm", elem_classes="tb") auto_capture = gr.Checkbox(label="Auto Refresh", value=False) refresh_speed = gr.Dropdown( choices=["0.5s", "1s", "2s", "5s"], value="1s", label="", container=False, scale=1, interactive=True) gui_status = gr.Markdown( f"Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} " f"| {SCREEN_W}x{SCREEN_H} | SDL: x11 + dummy audio") status_bar = gr.Markdown( f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ " f"Qwen2.5CMR Q4_K_M │ PTY + Xvfb │ Snippets + Structure + Find", elem_classes="status-bar") # State diff_original = gr.State("") diff_modified = gr.State("") auto_timer = gr.Timer(1, active=False) # ═══════════════════════════════════════ # HANDLERS # ═══════════════════════════════════════ # --- File ops --- def on_editor_change(content): fs.save_file(content) return analyzer.analyze(content) def on_file_select(filename): fs.set_current_file(filename) c = fs.get_current_file_content() return c, gr.update(label=f" {filename}"), analyzer.analyze(c) def on_new_file(name): if not name or not name.strip(): return gr.update(), gr.update(), gr.update(), gr.update() name = name.strip() fs.create_file(name) c = fs.get_current_file_content() return (gr.update(choices=fs.get_all_files(), value=name), c, gr.update(label=f" {name}"), analyzer.analyze(c)) def on_save(code): fs.save_file(code) return (f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ " f"Qwen2.5CMR Q4_K_M │ ✓ Saved {fs.current_file}") def on_delete(): name = fs.current_file if fs.delete_file(name): c = fs.get_current_file_content() return (gr.update(choices=fs.get_all_files(), value=fs.current_file), c, gr.update(label=f" {fs.current_file}"), analyzer.analyze(c)) return gr.update(), gr.update(), gr.update(), gr.update() # --- Run (smart: auto-detects GUI vs CLI) --- def on_run(code): fs.save_file(code) if is_gui_code(code): # GUI app — launch, take first screenshot, auto-enable refresh msg = gui_mgr.launch(code) time.sleep(0.5) # Extra time for first frame ss = vdisplay.capture() status = f"Xvfb: ● ON | {gui_mgr.get_status()}" return msg, ss, status, gr.update(value=True) # Enable auto-refresh # Normal script — run with subprocess, output goes to OUTPUT tab tmp = "/tmp/_axon_run.py" with open(tmp, "w") as f: f.write(code) try: env = os.environ.copy() env["DISPLAY"] = DISPLAY_NUM env["PYTHONPATH"] = fs._sync_dir + ":" + env.get("PYTHONPATH", "") r = subprocess.run([sys.executable, tmp], capture_output=True, text=True, timeout=30, env=env, cwd=fs._sync_dir) output = "" if r.stdout.strip(): output += r.stdout.rstrip() if r.stderr.strip(): if output: output += "\n" output += r.stderr.rstrip() if not output: output = "(No output)" except subprocess.TimeoutExpired: output = "[Timed out after 30s]" except Exception as e: output = f"[Error] {e}" return output, gr.update(), gr.update(), gr.update() def on_stop(): msg = gui_mgr.stop() status = f"Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} | ○ idle" return msg, status, gr.update(value=False) # Disable auto-refresh # --- Terminal --- def on_term_cmd(cmd): return terminal.run_command(cmd), "" def on_clear(): terminal.log_lines = []; return "" def on_refresh_hist(): return gr.update(choices=terminal.get_history()) def on_select_hist(cmd): return cmd if cmd else gr.update() # --- AI --- def on_complete(code): result = ai_gen("Complete this Python code. Only output the completion, no explanations.", code) new_code = code + "\n" + result return new_code, analyzer.analyze(new_code) def on_explain(code): explanation = ai_gen("Explain this Python code concisely.", code, 512) terminal._append(f"[AI Explanation]\n{explanation}") return terminal.get_log() def on_refactor(code): refactored = ai_gen("Refactor this Python code for PEP 8 and best practices. Output only code.", code, 512) fs.save_file(code) import difflib diff = "\n".join(difflib.unified_diff(code.splitlines(), refactored.splitlines(), lineterm="")) return diff, code, refactored def on_generate(prompt_text, history): generated = ai_gen(f"Write Python code for: {prompt_text}", "", 512) import difflib diff = "\n".join(difflib.unified_diff([], generated.splitlines(), lineterm="")) new_h = history + [ {"role": "user", "content": prompt_text}, {"role": "assistant", "content": "Code generated → check DIFF tab"}, ] return diff, "", generated, new_h, "" def on_apply(modified): return modified, analyzer.analyze(modified) # --- Snippets --- def on_snip_save(name, tags, code): msg = snippets.add(name, code, tags) return msg, gr.update(choices=snippets.get_names()) def on_snip_insert(name, code): snip_code = snippets.get(name) if snip_code: # Append snippet at cursor position (end of file) return code + "\n\n" + snip_code if code.strip() else snip_code return code def on_snip_del(name): msg = snippets.delete(name) return msg, gr.update(choices=snippets.get_names()) # --- Find --- def on_find(code, query, case): return finder.find_in_file(code, query, case) def on_find_all(query): return finder.find_all_files(fs.files, query) def on_replace(code, fq, rq, case): new_code, msg = finder.replace_in_file(code, fq, rq, case) fs.save_file(new_code) return new_code, msg, analyzer.analyze(new_code) # --- Display --- def on_capture(): return vdisplay.capture() def on_auto_tick(): ss = vdisplay.capture() # Auto-disable if GUI process died if not gui_mgr.is_running: return ss if ss else gr.update() return ss if ss else gr.update() def on_auto_toggle(checked): return gr.Timer(1, active=checked) def on_speed_change(speed, auto_on): speed_map = {"0.5s": 0.5, "1s": 1, "2s": 2, "5s": 5} interval = speed_map.get(speed, 1) return gr.Timer(interval, active=auto_on) # ═══════════════════════════════════════ # WIRING # ═══════════════════════════════════════ # Files editor.change(on_editor_change, editor, structure_view) file_list.change(on_file_select, file_list, [editor, editor, structure_view]) new_btn.click(on_new_file, new_file_txt, [file_list, editor, editor, structure_view]) save_btn.click(on_save, editor, status_bar) del_btn.click(on_delete, None, [file_list, editor, editor, structure_view]) # Run run_btn.click(on_run, editor, [run_output, display_image, gui_status, auto_capture] ).then(lambda: gr.Tabs(selected="output-tab"), None, bottom_tabs) stop_btn.click(on_stop, None, [run_output, gui_status, auto_capture]) # Terminal term_in.submit(on_term_cmd, term_in, [term_out, term_in]) clear_btn.click(on_clear, None, term_out) hist_btn.click(on_refresh_hist, None, hist_dd) hist_dd.change(on_select_hist, hist_dd, term_in) # AI ai_complete.click(on_complete, editor, [editor, structure_view]) ai_explain.click(on_explain, editor, term_out) ai_refactor.click(on_refactor, editor, [diff_view, diff_original, diff_modified] ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) # Chat chat_input.submit(on_generate, [chat_input, chat_history], [diff_view, diff_original, diff_modified, chat_history, chat_input] ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) send_btn.click(on_generate, [chat_input, chat_history], [diff_view, diff_original, diff_modified, chat_history, chat_input] ).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs) # Diff apply_btn.click(on_apply, diff_modified, [editor, structure_view] ).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs) discard_btn.click(lambda: (gr.update(), gr.update()), None, [editor, structure_view] ).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs) # Snippets snip_save.click(on_snip_save, [snip_name, snip_tags, editor], [snip_status, snip_list]) snip_insert.click(on_snip_insert, [snip_list, editor], editor) snip_del.click(on_snip_del, snip_list, [snip_status, snip_list]) # Find find_btn.click(on_find, [editor, find_query, find_case], find_results) find_all_btn.click(on_find_all, find_query, find_results) replace_btn.click(on_replace, [editor, find_query, replace_query, find_case], [editor, find_results, structure_view]) # Display capture_btn.click(on_capture, None, display_image) auto_capture.change(on_auto_toggle, auto_capture, auto_timer) refresh_speed.change(on_speed_change, [refresh_speed, auto_capture], auto_timer) auto_timer.tick(on_auto_tick, None, display_image) if __name__ == "__main__": import atexit atexit.register(terminal.cleanup) atexit.register(gui_mgr.stop) atexit.register(vdisplay.cleanup) demo.queue().launch(server_name="0.0.0.0", server_port=7860, theme=theme, css=css, ssr_mode=False, allowed_paths=["/tmp"])