Axon-Pro-IDE / app.py
AIencoder's picture
Update app.py
e80361d verified
import warnings
warnings.filterwarnings("ignore")
import gradio as gr
import time
import sys
import subprocess
import os
import pty
import select
import signal
import fcntl
import struct
import termios
import threading
import re
import ast
import json
import tempfile
from pathlib import Path
from huggingface_hub import hf_hub_download
from typing import List, Dict
from functools import lru_cache
os.environ["TOKENIZERS_PARALLELISM"] = "false"
MODEL_REPO = "AIencoder/Qwen2.5CMR-Q4_K_M-GGUF"
MODEL_FILE = "qwen2.5cmr-q4_k_m.gguf"
DISPLAY_NUM = ":99"
SCREEN_W, SCREEN_H = 800, 600
SNIPPETS_FILE = "/tmp/axon_snippets.json"
# ═══════════════════════════════════════
# Virtual Display (Optimized for pygame/tkinter/turtle)
# ═══════════════════════════════════════
class VirtualDisplay:
def __init__(self):
self.xvfb_proc = None
self.display = DISPLAY_NUM
self._last_capture = None
self._start_xvfb()
def _start_xvfb(self):
try:
subprocess.run(["pkill", "-f", f"Xvfb {self.display}"],
capture_output=True, timeout=5)
time.sleep(0.3)
# Full-featured Xvfb: GLX for OpenGL, RENDER for anti-aliasing,
# 24-bit color, no access control
self.xvfb_proc = subprocess.Popen([
"Xvfb", self.display,
"-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24",
"-ac", # No access control
"-nolisten", "tcp", # Security
"+extension", "GLX", # OpenGL support (pygame)
"+extension", "RENDER", # Anti-aliased rendering
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.8)
if self.xvfb_proc.poll() is None:
os.environ["DISPLAY"] = self.display
print(f"[Xvfb] Running on {self.display} ({SCREEN_W}x{SCREEN_H})")
else:
# Fallback: simpler Xvfb without extensions
self.xvfb_proc = subprocess.Popen([
"Xvfb", self.display,
"-screen", "0", f"{SCREEN_W}x{SCREEN_H}x24", "-ac",
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.5)
if self.xvfb_proc.poll() is None:
os.environ["DISPLAY"] = self.display
print(f"[Xvfb] Running (fallback mode)")
else:
self.xvfb_proc = None
print("[Xvfb] Failed to start")
except Exception as e:
print(f"[Xvfb] {e}"); self.xvfb_proc = None
def capture(self):
"""Capture the virtual display. Returns filepath or None."""
if not self.is_running: return None
# Clean up previous capture to avoid /tmp filling up
if self._last_capture:
try: os.unlink(self._last_capture)
except: pass
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
tmp.close()
cap_env = {**os.environ, "DISPLAY": self.display}
# Method 1: xwd + convert (most reliable for Xvfb)
try:
xwd = subprocess.run(
["xwd", "-root", "-display", self.display, "-silent"],
capture_output=True, timeout=3, env=cap_env)
if xwd.returncode == 0 and xwd.stdout:
conv = subprocess.run(
["convert", "xwd:-", tmp.name],
input=xwd.stdout, capture_output=True, timeout=3)
if conv.returncode == 0 and os.path.getsize(tmp.name) > 100:
self._last_capture = tmp.name
return tmp.name
except: pass
# Method 2: scrot
try:
r = subprocess.run(
["scrot", tmp.name],
capture_output=True, timeout=3, env=cap_env)
if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100:
self._last_capture = tmp.name
return tmp.name
except: pass
# Method 3: import (ImageMagick)
try:
r = subprocess.run(
f"import -window root -display {self.display} {tmp.name}",
shell=True, capture_output=True, timeout=3, env=cap_env)
if r.returncode == 0 and os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 100:
self._last_capture = tmp.name
return tmp.name
except: pass
try: os.unlink(tmp.name)
except: pass
return None
@property
def is_running(self):
return self.xvfb_proc is not None and self.xvfb_proc.poll() is None
def cleanup(self):
if self._last_capture:
try: os.unlink(self._last_capture)
except: pass
if self.xvfb_proc:
self.xvfb_proc.terminate()
try: self.xvfb_proc.wait(timeout=3)
except: self.xvfb_proc.kill()
vdisplay = VirtualDisplay()
# ═══════════════════════════════════════
# GUI Process Manager (pygame/tkinter/turtle)
# ═══════════════════════════════════════
GUI_LOG = "/tmp/_axon_gui.log"
class GUIProcessManager:
def __init__(self):
self.process = None
def launch(self, code):
self.stop()
tmp = "/tmp/_axon_gui_run.py"
with open(tmp, "w") as f: f.write(code)
# Full environment for GUI frameworks
env = os.environ.copy()
env.update({
"DISPLAY": DISPLAY_NUM,
# SDL / pygame
"SDL_VIDEODRIVER": "x11",
"SDL_AUDIODRIVER": "dummy", # No audio device in headless
"PYGAME_HIDE_SUPPORT_PROMPT": "1", # Suppress pygame welcome
# Tkinter / general X11
"XDG_RUNTIME_DIR": "/tmp",
"MESA_GL_VERSION_OVERRIDE": "3.3", # OpenGL compat
"LIBGL_ALWAYS_SOFTWARE": "1", # Software rendering (no GPU)
# Python
"PYTHONUNBUFFERED": "1",
"PYTHONPATH": "/tmp/axon_workspace:" + env.get("PYTHONPATH", ""),
})
try:
# Log file instead of PIPE — PIPE deadlocks pygame's output
log_f = open(GUI_LOG, "w")
self.process = subprocess.Popen(
[sys.executable, "-u", tmp], # -u = unbuffered
stdout=log_f, stderr=log_f,
env=env, preexec_fn=os.setsid, cwd="/tmp/axon_workspace")
time.sleep(1.0) # Give pygame time to create the window
if self.process.poll() is not None:
log_f.close()
try:
with open(GUI_LOG) as f: msg = f.read().strip()
except: msg = ""
return f"[Exited immediately]\n{msg}" if msg else "[Exited — no output]"
return f"[GUI launched — PID {self.process.pid}] Auto-refresh enabled."
except Exception as e:
return f"[Error] {e}"
def get_log(self):
try:
with open(GUI_LOG) as f: return f.read()[-2000:]
except: return ""
def stop(self):
if self.process and self.process.poll() is None:
pid = self.process.pid
try:
os.killpg(os.getpgid(pid), signal.SIGTERM)
self.process.wait(timeout=3)
except:
try: os.killpg(os.getpgid(pid), signal.SIGKILL)
except: pass
self.process = None
return f"[Stopped PID {pid}]"
self.process = None
return "[No process]"
@property
def is_running(self): return self.process is not None and self.process.poll() is None
def get_status(self): return f"● PID {self.process.pid}" if self.is_running else "○ idle"
gui_mgr = GUIProcessManager()
# ═══════════════════════════════════════
# File System
# ═══════════════════════════════════════
class FileSystem:
def __init__(self):
self.files: Dict[str, str] = {
"main.py": '# Start coding here\nfrom utils import add\n\nresult = add(3, 7)\nprint(f"3 + 7 = {result}")',
"utils.py": "def add(a, b):\n return a + b\n\ndef multiply(a, b):\n return a * b",
}
self.current_file = "main.py"
self._sync_dir = "/tmp/axon_workspace"
self._sync_to_disk()
def _sync_to_disk(self):
os.makedirs(self._sync_dir, exist_ok=True)
for name, content in self.files.items():
path = os.path.join(self._sync_dir, name)
with open(path, "w") as f: f.write(content)
def save_file(self, content):
if self.current_file:
self.files[self.current_file] = content
path = os.path.join(self._sync_dir, self.current_file)
os.makedirs(os.path.dirname(path) if "/" in self.current_file else self._sync_dir, exist_ok=True)
with open(path, "w") as f: f.write(content)
def get_current_file_content(self): return self.files.get(self.current_file, "")
def set_current_file(self, f):
if f in self.files: self.current_file = f
def create_file(self, f, c=""):
if f and f not in self.files:
self.files[f] = c; self.current_file = f; self._sync_to_disk()
def delete_file(self, f):
if f in self.files and len(self.files) > 1:
del self.files[f]
path = os.path.join(self._sync_dir, f)
if os.path.exists(path): os.unlink(path)
self.current_file = list(self.files.keys())[0]
return True
return False
def get_all_files(self): return list(self.files.keys())
fs = FileSystem()
# ═══════════════════════════════════════
# Snippet Library
# ═══════════════════════════════════════
class SnippetLibrary:
def __init__(self):
self.snippets = {}
self._load()
def _load(self):
if os.path.exists(SNIPPETS_FILE):
try:
with open(SNIPPETS_FILE) as f: self.snippets = json.load(f)
except: self.snippets = {}
def _save(self):
try:
with open(SNIPPETS_FILE, "w") as f: json.dump(self.snippets, f)
except: pass
def add(self, name, code, tags=""):
if not name.strip(): return "Error: name required"
self.snippets[name.strip()] = {"code": code, "tags": tags, "created": time.strftime("%Y-%m-%d")}
self._save(); return f"Saved: {name}"
def get(self, name): return self.snippets.get(name, {}).get("code", "")
def delete(self, name):
if name in self.snippets: del self.snippets[name]; self._save(); return f"Deleted: {name}"
return "Not found"
def get_names(self): return list(self.snippets.keys())
snippets = SnippetLibrary()
# ═══════════════════════════════════════
# Code Analyzer & FindReplace
# ═══════════════════════════════════════
class CodeAnalyzer:
@staticmethod
def analyze(code: str) -> str:
if not code or not code.strip(): return "Empty file"
try: tree = ast.parse(code)
except SyntaxError as e: return f"⚠ Syntax error line {e.lineno}: {e.msg}"
lines = [f"📊 {len(code.splitlines())} lines"]
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for a in node.names: imports.append(a.name)
elif isinstance(node, ast.ImportFrom):
imports.append(f"from {node.module}")
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
args = ", ".join(a.arg for a in node.args.args)
lines.append(f" L{node.lineno} ⚡ def {node.name}({args})")
elif isinstance(node, ast.ClassDef):
lines.append(f" L{node.lineno} 🏗 class {node.name}")
if imports:
lines.insert(1, f"📦 {', '.join(imports)}")
return "\n".join(lines)
analyzer = CodeAnalyzer()
class FindReplace:
@staticmethod
def find_in_file(code, query, case):
if not query: return ""
res = []
for i, line in enumerate(code.splitlines(), 1):
match = (query in line) if case else (query.lower() in line.lower())
if match:
res.append(f" L{i}: {line.strip()}")
return f"Found {len(res)} match(es):\n" + "\n".join(res) if res else "No matches."
@staticmethod
def find_all_files(files, query):
if not query: return ""
res = []
for fname, content in files.items():
for i, line in enumerate(content.splitlines(), 1):
if query.lower() in line.lower():
res.append(f" {fname}:L{i}: {line.strip()}")
return f"Found {len(res)} across all files:\n" + "\n".join(res) if res else "No matches."
@staticmethod
def replace_in_file(code, find, replace, case):
if not find: return code, "Empty find."
flags = 0 if case else re.IGNORECASE
new_code, count = re.subn(re.escape(find), replace, code, flags=flags)
return new_code, f"Replaced {count} occurrence(s)."
finder = FindReplace()
# ═══════════════════════════════════════
# PTY Terminal (fixed)
# ═══════════════════════════════════════
class PTYTerminal:
STRIP_ANSI = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b\[.*?[@-~]|\r')
def __init__(self):
self.master_fd = None; self.pid = None
self.lock = threading.Lock()
self.log_lines: List[str] = []
self.max_lines = 800
self.cmd_history: List[str] = []
self._spawn()
self._append("AXON TERMINAL v4.0")
self._append("══════════════════════════════════════")
self._append("Full PTY shell · Cross-file imports")
self._append(" pip/npm/apt-get/git/curl/make")
self._append(" GUI apps → Display tab")
self._append("══════════════════════════════════════")
def _spawn(self):
try:
pid, fd = pty.openpty()
self.pid = os.fork()
if self.pid == 0:
# Child — become the shell
os.close(pid)
os.setsid()
fcntl.ioctl(fd, termios.TIOCSCTTY, 0)
os.dup2(fd, 0); os.dup2(fd, 1); os.dup2(fd, 2)
if fd > 2: os.close(fd)
env = os.environ.copy()
env.update({"TERM": "dumb", "PS1": "$ ",
"DEBIAN_FRONTEND": "noninteractive", "DISPLAY": DISPLAY_NUM})
os.execvpe("/bin/bash", ["/bin/bash", "--norc", "--noprofile", "-i"], env)
else:
# Parent — hold the master fd
os.close(fd)
self.master_fd = pid
# Set non-blocking properly
flags = fcntl.fcntl(self.master_fd, fcntl.F_GETFL)
fcntl.fcntl(self.master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Set terminal size
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ,
struct.pack("HHHH", 40, 120, 0, 0))
time.sleep(0.3)
self._read_raw(0.2) # Drain initial output
except Exception as e:
self._append(f"[Shell Error] {e}")
self.master_fd = None
def _read_raw(self, timeout=0.1) -> str:
"""Read raw bytes from PTY."""
if not self.master_fd: return ""
out = []; deadline = time.time() + timeout
while True:
rem = deadline - time.time()
if rem <= 0: break
try:
r, _, _ = select.select([self.master_fd], [], [], min(rem, 0.05))
if r:
c = os.read(self.master_fd, 4096)
if c:
out.append(c.decode("utf-8", errors="replace"))
deadline = time.time() + 0.15
else: break
elif out: break
except OSError: break
return "".join(out)
def _clean(self, text):
"""Strip ANSI escape codes."""
c = self.STRIP_ANSI.sub("", text)
return "".join(ch for ch in c if ch in "\n\t" or ord(ch) >= 32)
def _append(self, text):
self.log_lines.append(text)
while len(self.log_lines) > self.max_lines:
self.log_lines.pop(0)
def get_log(self): return "\n".join(self.log_lines)
def run_command(self, cmd):
cmd = cmd.strip()
if not cmd: return self.get_log()
# Save history
if not self.cmd_history or self.cmd_history[-1] != cmd:
self.cmd_history.append(cmd)
if len(self.cmd_history) > 100: self.cmd_history.pop(0)
with self.lock:
if cmd.lower() == "clear":
self.log_lines = []; return ""
if cmd.lower() == "history":
self._append("Command history:")
for i, c in enumerate(self.cmd_history[-20:], 1):
self._append(f" {i:3d} {c}")
return self.get_log()
if not self.master_fd:
self._append(f"$ {cmd}")
self._append("[Error] No shell.")
return self.get_log()
# Drain stale output
self._read_raw(0.05)
self._append(f"$ {cmd}")
try:
os.write(self.master_fd, (cmd + "\n").encode())
except OSError as e:
self._append(f"[Write Error] {e}")
return self.get_log()
# Determine timeout based on command type
parts = cmd.split()
base = parts[0].lower() if parts else ""
long_cmds = ["pip","pip3","npm","npx","apt-get","apt","git",
"wget","curl","make","cmake","cargo","yarn","conda"]
wait = 180 if base in long_cmds else (60 if base in ("python","python3","node") else 15)
# Collect output
chunks = []; start = time.time(); idle = 0
while time.time() - start < wait:
c = self._read_raw(0.3)
if c:
idle = 0; chunks.append(c)
if "".join(chunks).rstrip().endswith("$"): break
else:
idle += 1
if base not in long_cmds and idle >= 3: break
if base in long_cmds and idle >= 10: break
# Clean and filter
raw = self._clean("".join(chunks))
lines = raw.split("\n")
filtered = []; skip_echo = True
for line in lines:
s = line.strip()
if skip_echo and s == cmd.strip():
skip_echo = False; continue
if s in ("$", "$ "): continue
filtered.append(line)
result = "\n".join(filtered).strip()
if result: self._append(result)
return self.get_log()
def get_history(self) -> List[str]:
return list(reversed(self.cmd_history[-30:]))
def cleanup(self):
if self.pid and self.pid > 0:
try: os.kill(self.pid, signal.SIGTERM)
except: pass
if self.master_fd:
try: os.close(self.master_fd)
except: pass
terminal = PTYTerminal()
# ═══════════════════════════════════════
# AI Model — Qwen2.5CMR Q4_K_M GGUF
# ═══════════════════════════════════════
_llm_instance = None
_llm_lock = threading.Lock()
def load_model():
global _llm_instance
if _llm_instance is not None:
return _llm_instance
with _llm_lock:
if _llm_instance is not None:
return _llm_instance
print(f"Downloading {MODEL_REPO}/{MODEL_FILE}...")
t0 = time.time()
try:
model_path = hf_hub_download(
repo_id=MODEL_REPO,
filename=MODEL_FILE,
)
print(f"Downloaded in {time.time()-t0:.1f}s, loading...")
from llama_cpp import Llama
_llm_instance = Llama(
model_path=model_path,
n_ctx=4096,
n_threads=os.cpu_count() or 4,
n_gpu_layers=0, # CPU only
verbose=False,
)
print(f"Model ready in {time.time()-t0:.1f}s total")
return _llm_instance
except Exception as e:
print(f"Model error: {e}")
return None
def ai_gen(system_prompt, code, max_tokens=300):
llm = load_model()
if not llm: return "Error: model failed to load"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": code},
]
try:
response = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
top_p=0.9,
repeat_penalty=1.1,
)
text = response["choices"][0]["message"]["content"].strip()
return text if text else "(No output from model)"
except Exception as e:
return f"Error: {e}"
# ═══════════════════════════════════════
# GUI code detection
# ═══════════════════════════════════════
GUI_HINTS = [
"pygame", "tkinter", "turtle", "pyglet", "arcade", "kivy",
"display.set_mode", "mainloop()", "Tk()", "Canvas(",
"pygame.init", "pygame.display", "screen.fill",
"from turtle import", "import turtle",
]
def is_gui_code(c): return any(h in c for h in GUI_HINTS)
# ═══════════════════════════════════════
# Theme + CSS
# ═══════════════════════════════════════
theme = gr.themes.Default(
primary_hue="cyan", neutral_hue="gray",
font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"]
).set(
body_background_fill="#080808", body_text_color="#e0e0e0",
button_primary_background_fill="#0ff", button_primary_background_fill_hover="#0dd",
button_primary_text_color="#000",
button_secondary_background_fill="#1a1a1a", button_secondary_background_fill_hover="#2a2a2a",
button_secondary_text_color="#0ff", button_secondary_border_color="#0ff4",
block_background_fill="#0d0d0d", block_border_color="#1a1a1a",
block_label_text_color="#0f8",
input_background_fill="#0a0a0a", input_border_color="#1a3a2a", input_placeholder_color="#0f83",
)
css = """
* { border-radius: 0 !important; }
.gradio-container { max-width: 100% !important; }
/* Header */
.axon-header {
background: linear-gradient(90deg, #080808, #0a1a1a, #080808);
border-bottom: 1px solid #0ff3; padding: 12px 20px !important; margin-bottom: 8px;
}
.axon-header h1 {
font-family: 'IBM Plex Mono', monospace !important;
background: linear-gradient(90deg, #0ff, #0f8);
-webkit-background-clip: text; -webkit-text-fill-color: transparent;
font-size: 1.4em !important; letter-spacing: 3px; margin: 0 !important;
}
.axon-header p { color: #0f84 !important; font-size: 0.75em; letter-spacing: 1px; }
/* Terminal */
.term-box textarea {
background: #050505 !important; color: #00ff41 !important;
font-family: 'IBM Plex Mono', monospace !important;
font-size: 12px !important; line-height: 1.5 !important;
border: 1px solid #0f31 !important; text-shadow: 0 0 4px #0f32;
}
.term-input input {
background: #050505 !important; color: #00ff41 !important;
font-family: 'IBM Plex Mono', monospace !important;
border: 1px solid #0f32 !important; font-size: 12px !important;
}
.term-input input::placeholder { color: #0f83 !important; }
.term-input input:focus { border-color: #0ff !important; box-shadow: 0 0 8px #0ff2 !important; }
/* Editor */
.code-editor textarea, .code-editor .cm-editor {
font-family: 'IBM Plex Mono', monospace !important; font-size: 13px !important;
}
/* Buttons */
.tb {
font-size: 0.8em !important; padding: 6px 12px !important; letter-spacing: 0.5px;
text-transform: uppercase; font-weight: 600 !important;
border: 1px solid transparent !important; transition: all 0.2s ease !important;
}
.tb:hover { border-color: #0ff !important; box-shadow: 0 0 12px #0ff2 !important; }
/* Accordion */
.gr-accordion { border: 1px solid #1a2a2a !important; background: #0a0a0a !important; }
.gr-accordion > .label-wrap { background: #0d0d0d !important; border-bottom: 1px solid #1a2a2a !important; }
.gr-accordion > .label-wrap:hover { background: #121a1a !important; }
.gr-accordion > .label-wrap span {
color: #0ff !important; font-family: 'IBM Plex Mono', monospace !important;
letter-spacing: 1px; font-size: 0.85em;
}
/* Tabs */
.tabs > .tab-nav > button {
font-family: 'IBM Plex Mono', monospace !important;
letter-spacing: 1px; font-size: 0.8em; text-transform: uppercase;
color: #888 !important; border-bottom: 2px solid transparent !important;
}
.tabs > .tab-nav > button.selected {
color: #0ff !important; border-bottom-color: #0ff !important; text-shadow: 0 0 8px #0ff4;
}
/* Structure + find */
.struct-box textarea { background: #050505 !important; color: #0ff !important;
font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; }
.find-box textarea { background: #050505 !important; color: #ffab40 !important;
font-family: 'IBM Plex Mono', monospace !important; font-size: 11px !important; }
/* Status bar */
.status-bar {
background: #0a0a0a !important; border-top: 1px solid #1a2a2a;
padding: 4px 12px !important; font-size: 11px !important; color: #0f84 !important;
}
.status-bar strong { color: #0ff !important; }
/* Misc */
::-webkit-scrollbar { width: 6px; height: 6px; }
::-webkit-scrollbar-track { background: #0a0a0a; }
::-webkit-scrollbar-thumb { background: #0f83; }
input, textarea, select { color: #0f8 !important; }
.stop-btn { background: #a00 !important; color: #fff !important; border: none !important; }
.stop-btn:hover { background: #c00 !important; }
"""
# ═══════════════════════════════════════
# UI
# ═══════════════════════════════════════
with gr.Blocks(title="Axon Pro") as demo:
with gr.Row(elem_classes="axon-header"):
gr.Markdown("# ⬡ AXON PRO\n\nPYTHON AI IDE — v4.0")
with gr.Row(equal_height=False):
# ══ LEFT SIDEBAR ══
with gr.Column(scale=1, min_width=200):
with gr.Accordion("📁 EXPLORER", open=True):
file_list = gr.Dropdown(choices=fs.get_all_files(), value=fs.current_file,
label="Files", interactive=True)
new_file_txt = gr.Textbox(placeholder="new_file.py", show_label=False)
with gr.Row():
new_btn = gr.Button("NEW", size="sm", elem_classes="tb")
save_btn = gr.Button("SAVE", size="sm", elem_classes="tb")
del_btn = gr.Button("DEL", size="sm", elem_classes="tb stop-btn")
with gr.Accordion("🗺 STRUCTURE", open=True):
structure_view = gr.Textbox(
value=analyzer.analyze(fs.get_current_file_content()),
label="", lines=10, interactive=False, elem_classes="struct-box",
show_label=False)
with gr.Accordion("📋 SNIPPETS", open=False):
snip_list = gr.Dropdown(choices=snippets.get_names(), label="Saved",
interactive=True)
snip_name = gr.Textbox(placeholder="Snippet name", show_label=False)
snip_tags = gr.Textbox(placeholder="Tags: api, util, loop", show_label=False)
with gr.Row():
snip_save = gr.Button("SAVE", size="sm", elem_classes="tb")
snip_insert = gr.Button("INSERT", size="sm", elem_classes="tb")
snip_del = gr.Button("DEL", size="sm", elem_classes="tb stop-btn")
snip_status = gr.Markdown("")
# ══ CENTER ══
with gr.Column(scale=4):
editor = gr.Code(value=fs.get_current_file_content(),
label=f" {fs.current_file}", language="python",
lines=20, interactive=True, elem_classes="code-editor")
with gr.Row():
run_btn = gr.Button("▶ RUN", variant="primary", elem_classes="tb")
stop_btn = gr.Button("■ STOP", size="sm", elem_classes="tb stop-btn")
ai_complete = gr.Button("✦ COMPLETE", elem_classes="tb")
ai_explain = gr.Button("◈ EXPLAIN", elem_classes="tb")
ai_refactor = gr.Button("⟲ REFACTOR", elem_classes="tb")
with gr.Tabs() as bottom_tabs:
with gr.Tab("▶ OUTPUT", id="output-tab"):
run_output = gr.Textbox(value="Run your code to see output here.",
lines=12, max_lines=25, interactive=False,
elem_classes="term-box", label="", show_label=False)
with gr.Tab("⌘ TERMINAL", id="term-tab"):
term_out = gr.Textbox(value=terminal.get_log(), lines=12, max_lines=25,
interactive=False, elem_classes="term-box",
label="", show_label=False)
with gr.Row():
term_in = gr.Textbox(placeholder="$ pip install, git clone, ls ...",
show_label=False, elem_classes="term-input")
with gr.Row():
clear_btn = gr.Button("CLEAR", size="sm", elem_classes="tb")
hist_dd = gr.Dropdown(choices=[], label="", interactive=True,
scale=3, container=False)
hist_btn = gr.Button("↻ HISTORY", size="sm", elem_classes="tb")
with gr.Tab("🔍 FIND", id="find-tab"):
with gr.Row():
find_query = gr.Textbox(label="Find", scale=4)
replace_query = gr.Textbox(label="Replace", scale=4)
find_case = gr.Checkbox(label="Case", value=False)
with gr.Row():
find_btn = gr.Button("FIND", size="sm", elem_classes="tb")
find_all_btn = gr.Button("FIND ALL FILES", size="sm", elem_classes="tb")
replace_btn = gr.Button("REPLACE ALL", size="sm", elem_classes="tb")
find_results = gr.Textbox(value="", lines=6, interactive=False,
elem_classes="find-box", label="", show_label=False)
with gr.Tab("💬 AI CHAT", id="chat-tab"):
chat_history = gr.Chatbot(label="", height=250)
with gr.Row():
chat_input = gr.Textbox(placeholder="Describe code you want...",
scale=8, container=False)
send_btn = gr.Button("GEN", variant="primary", scale=1, elem_classes="tb")
with gr.Tab("⇄ DIFF", id="diff-tab"):
diff_view = gr.Code(label="AI Changes", language="python",
interactive=False, lines=15)
with gr.Row():
apply_btn = gr.Button("✓ APPLY", variant="primary", elem_classes="tb")
discard_btn = gr.Button("✗ DISCARD", elem_classes="tb")
# ══ RIGHT: DISPLAY ══
with gr.Column(scale=2, min_width=250):
with gr.Accordion("🖥 DISPLAY OUTPUT", open=True):
display_image = gr.Image(label="", type="filepath", interactive=False, height=420)
with gr.Row():
capture_btn = gr.Button("📸 CAPTURE", size="sm", elem_classes="tb")
auto_capture = gr.Checkbox(label="Auto Refresh", value=False)
refresh_speed = gr.Dropdown(
choices=["0.5s", "1s", "2s", "5s"], value="1s",
label="", container=False, scale=1, interactive=True)
gui_status = gr.Markdown(
f"<small>Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} "
f"| {SCREEN_W}x{SCREEN_H} | SDL: x11 + dummy audio</small>")
status_bar = gr.Markdown(
f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ "
f"Qwen2.5CMR Q4_K_M │ PTY + Xvfb │ Snippets + Structure + Find",
elem_classes="status-bar")
# State
diff_original = gr.State("")
diff_modified = gr.State("")
auto_timer = gr.Timer(1, active=False)
# ═══════════════════════════════════════
# HANDLERS
# ═══════════════════════════════════════
# --- File ops ---
def on_editor_change(content):
fs.save_file(content)
return analyzer.analyze(content)
def on_file_select(filename):
fs.set_current_file(filename)
c = fs.get_current_file_content()
return c, gr.update(label=f" {filename}"), analyzer.analyze(c)
def on_new_file(name):
if not name or not name.strip():
return gr.update(), gr.update(), gr.update(), gr.update()
name = name.strip()
fs.create_file(name)
c = fs.get_current_file_content()
return (gr.update(choices=fs.get_all_files(), value=name),
c, gr.update(label=f" {name}"), analyzer.analyze(c))
def on_save(code):
fs.save_file(code)
return (f"**AXON PRO v4.0** │ Python {sys.version.split()[0]} │ CPU │ "
f"Qwen2.5CMR Q4_K_M │ ✓ Saved {fs.current_file}")
def on_delete():
name = fs.current_file
if fs.delete_file(name):
c = fs.get_current_file_content()
return (gr.update(choices=fs.get_all_files(), value=fs.current_file),
c, gr.update(label=f" {fs.current_file}"), analyzer.analyze(c))
return gr.update(), gr.update(), gr.update(), gr.update()
# --- Run (smart: auto-detects GUI vs CLI) ---
def on_run(code):
fs.save_file(code)
if is_gui_code(code):
# GUI app — launch, take first screenshot, auto-enable refresh
msg = gui_mgr.launch(code)
time.sleep(0.5) # Extra time for first frame
ss = vdisplay.capture()
status = f"<small>Xvfb: ● ON | {gui_mgr.get_status()}</small>"
return msg, ss, status, gr.update(value=True) # Enable auto-refresh
# Normal script — run with subprocess, output goes to OUTPUT tab
tmp = "/tmp/_axon_run.py"
with open(tmp, "w") as f: f.write(code)
try:
env = os.environ.copy()
env["DISPLAY"] = DISPLAY_NUM
env["PYTHONPATH"] = fs._sync_dir + ":" + env.get("PYTHONPATH", "")
r = subprocess.run([sys.executable, tmp], capture_output=True, text=True,
timeout=30, env=env, cwd=fs._sync_dir)
output = ""
if r.stdout.strip(): output += r.stdout.rstrip()
if r.stderr.strip():
if output: output += "\n"
output += r.stderr.rstrip()
if not output: output = "(No output)"
except subprocess.TimeoutExpired:
output = "[Timed out after 30s]"
except Exception as e:
output = f"[Error] {e}"
return output, gr.update(), gr.update(), gr.update()
def on_stop():
msg = gui_mgr.stop()
status = f"<small>Xvfb: {'● ON' if vdisplay.is_running else '○ OFF'} | ○ idle</small>"
return msg, status, gr.update(value=False) # Disable auto-refresh
# --- Terminal ---
def on_term_cmd(cmd):
return terminal.run_command(cmd), ""
def on_clear():
terminal.log_lines = []; return ""
def on_refresh_hist():
return gr.update(choices=terminal.get_history())
def on_select_hist(cmd):
return cmd if cmd else gr.update()
# --- AI ---
def on_complete(code):
result = ai_gen("Complete this Python code. Only output the completion, no explanations.", code)
new_code = code + "\n" + result
return new_code, analyzer.analyze(new_code)
def on_explain(code):
explanation = ai_gen("Explain this Python code concisely.", code, 512)
terminal._append(f"[AI Explanation]\n{explanation}")
return terminal.get_log()
def on_refactor(code):
refactored = ai_gen("Refactor this Python code for PEP 8 and best practices. Output only code.", code, 512)
fs.save_file(code)
import difflib
diff = "\n".join(difflib.unified_diff(code.splitlines(), refactored.splitlines(), lineterm=""))
return diff, code, refactored
def on_generate(prompt_text, history):
generated = ai_gen(f"Write Python code for: {prompt_text}", "", 512)
import difflib
diff = "\n".join(difflib.unified_diff([], generated.splitlines(), lineterm=""))
new_h = history + [
{"role": "user", "content": prompt_text},
{"role": "assistant", "content": "Code generated → check DIFF tab"},
]
return diff, "", generated, new_h, ""
def on_apply(modified):
return modified, analyzer.analyze(modified)
# --- Snippets ---
def on_snip_save(name, tags, code):
msg = snippets.add(name, code, tags)
return msg, gr.update(choices=snippets.get_names())
def on_snip_insert(name, code):
snip_code = snippets.get(name)
if snip_code:
# Append snippet at cursor position (end of file)
return code + "\n\n" + snip_code if code.strip() else snip_code
return code
def on_snip_del(name):
msg = snippets.delete(name)
return msg, gr.update(choices=snippets.get_names())
# --- Find ---
def on_find(code, query, case):
return finder.find_in_file(code, query, case)
def on_find_all(query):
return finder.find_all_files(fs.files, query)
def on_replace(code, fq, rq, case):
new_code, msg = finder.replace_in_file(code, fq, rq, case)
fs.save_file(new_code)
return new_code, msg, analyzer.analyze(new_code)
# --- Display ---
def on_capture():
return vdisplay.capture()
def on_auto_tick():
ss = vdisplay.capture()
# Auto-disable if GUI process died
if not gui_mgr.is_running:
return ss if ss else gr.update()
return ss if ss else gr.update()
def on_auto_toggle(checked):
return gr.Timer(1, active=checked)
def on_speed_change(speed, auto_on):
speed_map = {"0.5s": 0.5, "1s": 1, "2s": 2, "5s": 5}
interval = speed_map.get(speed, 1)
return gr.Timer(interval, active=auto_on)
# ═══════════════════════════════════════
# WIRING
# ═══════════════════════════════════════
# Files
editor.change(on_editor_change, editor, structure_view)
file_list.change(on_file_select, file_list, [editor, editor, structure_view])
new_btn.click(on_new_file, new_file_txt, [file_list, editor, editor, structure_view])
save_btn.click(on_save, editor, status_bar)
del_btn.click(on_delete, None, [file_list, editor, editor, structure_view])
# Run
run_btn.click(on_run, editor, [run_output, display_image, gui_status, auto_capture]
).then(lambda: gr.Tabs(selected="output-tab"), None, bottom_tabs)
stop_btn.click(on_stop, None, [run_output, gui_status, auto_capture])
# Terminal
term_in.submit(on_term_cmd, term_in, [term_out, term_in])
clear_btn.click(on_clear, None, term_out)
hist_btn.click(on_refresh_hist, None, hist_dd)
hist_dd.change(on_select_hist, hist_dd, term_in)
# AI
ai_complete.click(on_complete, editor, [editor, structure_view])
ai_explain.click(on_explain, editor, term_out)
ai_refactor.click(on_refactor, editor, [diff_view, diff_original, diff_modified]
).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs)
# Chat
chat_input.submit(on_generate, [chat_input, chat_history],
[diff_view, diff_original, diff_modified, chat_history, chat_input]
).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs)
send_btn.click(on_generate, [chat_input, chat_history],
[diff_view, diff_original, diff_modified, chat_history, chat_input]
).then(lambda: gr.Tabs(selected="diff-tab"), None, bottom_tabs)
# Diff
apply_btn.click(on_apply, diff_modified, [editor, structure_view]
).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs)
discard_btn.click(lambda: (gr.update(), gr.update()), None, [editor, structure_view]
).then(lambda: gr.Tabs(selected="term-tab"), None, bottom_tabs)
# Snippets
snip_save.click(on_snip_save, [snip_name, snip_tags, editor], [snip_status, snip_list])
snip_insert.click(on_snip_insert, [snip_list, editor], editor)
snip_del.click(on_snip_del, snip_list, [snip_status, snip_list])
# Find
find_btn.click(on_find, [editor, find_query, find_case], find_results)
find_all_btn.click(on_find_all, find_query, find_results)
replace_btn.click(on_replace, [editor, find_query, replace_query, find_case],
[editor, find_results, structure_view])
# Display
capture_btn.click(on_capture, None, display_image)
auto_capture.change(on_auto_toggle, auto_capture, auto_timer)
refresh_speed.change(on_speed_change, [refresh_speed, auto_capture], auto_timer)
auto_timer.tick(on_auto_tick, None, display_image)
if __name__ == "__main__":
import atexit
atexit.register(terminal.cleanup)
atexit.register(gui_mgr.stop)
atexit.register(vdisplay.cleanup)
demo.queue().launch(server_name="0.0.0.0", server_port=7860,
theme=theme, css=css, ssr_mode=False, allowed_paths=["/tmp"])