VibeThinker / app.py
VladBoyko's picture
Update app.py
8a46019 verified
raw
history blame
16.3 kB
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread, Event
import re
import time
import html
# --- Configuration ---
MODEL_ID = "WeiboAI/VibeThinker-1.5B"
class VibeThinkerModel:
def __init__(self):
self.model = None
self.tokenizer = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.stop_signal = Event()
def load_model(self):
if self.model is not None: return
print(f"🔄 Loading {MODEL_ID}...")
try:
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16,
device_map="auto",
trust_remote_code=True,
low_cpu_mem_usage=True
)
print("✅ Model loaded.")
except Exception as e:
raise e
def stop_generation(self):
self.stop_signal.set()
def _detect_tail_loop(self, text, min_phrase_len=3, max_phrase_len=10, threshold=20):
"""
Detects if the generator has gotten stuck in a loop at the END of the text.
Criteria: A phrase of 3-10 words repeated at least 20 times consecutively.
"""
words = text.split()
total_words = len(words)
# We need at least (min_phrase * threshold) words to even check
if total_words < min_phrase_len * threshold:
return False
# Only check the end of the string (optimization)
# We look at the last (max_phrase * threshold) words
check_window = max_phrase_len * threshold
recent_words = words[-check_window:] if total_words > check_window else words
for phrase_len in range(min_phrase_len, max_phrase_len + 1):
# The candidate phrase is the very last 'phrase_len' words
candidate_phrase = recent_words[-phrase_len:]
# Construct what the tail SHOULD look like if it's looping
# e.g. if phrase is "and then", we expect "and then and then..."
# We check if the tail of the text matches (phrase * threshold)
required_len = phrase_len * threshold
if len(recent_words) < required_len:
continue
segment_to_check = recent_words[-required_len:]
# Efficient check: does the segment consist ONLY of the candidate phrase?
# We compare the segment against the candidate phrase repeated
expected_segment = candidate_phrase * threshold
if segment_to_check == expected_segment:
return True
return False
def generate_response_streaming(self, prompt, temperature=0.6, max_new_tokens=32000):
if not self.model: self.load_model()
self.stop_signal.clear()
try:
start_time = time.time()
# Optimized Prompt for VibeThinker
messages = [
{"role": "system", "content": "You are an expert algorithm engineer. Analyze the problem deeply, then provide a clean Python solution."},
{"role": "user", "content": prompt}
]
text_input = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(text_input, return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=0.95,
top_k=50,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
streamer=streamer,
)
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
generated_text = ""
loop_detected = False
# Token counter for loop check frequency
token_count = 0
for new_text in streamer:
if self.stop_signal.is_set(): break
generated_text += new_text
token_count += 1
# Check for loops every 10 tokens to save CPU
if token_count % 10 == 0:
if self._detect_tail_loop(generated_text):
loop_detected = True
self.stop_signal.set() # Stop the model
# Optional: Truncate the repetitive garbage
# (Simple truncation for UI cleanliness)
generated_text = generated_text + "\n\n[⚠️ Generation stopped: Infinite loop detected]"
break
yield generated_text, {
"time": time.time() - start_time,
"tokens": len(self.tokenizer.encode(generated_text)),
"generating": True
}
if not self.stop_signal.is_set():
thread.join()
yield generated_text, {
"time": time.time() - start_time,
"tokens": len(self.tokenizer.encode(generated_text)),
"generating": False
}
except Exception as e:
yield f"Error: {str(e)}", None
vibe_model = VibeThinkerModel()
class ModernUIParser:
"""Parses text into a structured, modern UI"""
def format_code(self, code, lang="python"):
"""Applies basic HTML syntax highlighting regex"""
code = html.escape(code)
# Comments
code = re.sub(r'(#.*?)(?=\n|$)', r'<span class="c">\1</span>', code)
# Keywords
keywords = r'\b(def|class|return|if|else|elif|for|while|import|from|try|except|with|as|pass|None|True|False)\b'
code = re.sub(keywords, r'<span class="k">\1</span>', code)
# Builtins/Calls
code = re.sub(r'\b(print|len|range|enumerate|zip|super|__init__)\b', r'<span class="nf">\1</span>', code)
# Strings
code = re.sub(r'(&quot;.*?&quot;)', r'<span class="s">\1</span>', code)
code = re.sub(r"('.*?')", r'<span class="s">\1</span>', code)
return code
def parse_and_render(self, text, stats):
# 1. Separate Thinking from Content
# Heuristic: Content before the first code block or explicit "Solution" header is usually thinking
thinking = ""
solution = text
# Find split point
markers = ["```", "Here is the solution", "### Solution", "Implementation:"]
first_marker_idx = len(text)
for m in markers:
idx = text.find(m)
if idx != -1 and idx < first_marker_idx:
first_marker_idx = idx
if first_marker_idx < len(text) and first_marker_idx > 50:
thinking = text[:first_marker_idx].strip()
solution = text[first_marker_idx:].strip()
# 2. Process Solution Text (Markdown-ish to HTML)
# Handle Code Blocks
parts = re.split(r'(```\w*\n.*?```)', solution, flags=re.DOTALL)
solution_html = ""
for part in parts:
if part.startswith('```'):
# Extract lang and code
match = re.match(r'```(\w*)\n(.*?)```', part, re.DOTALL)
if match:
lang = match.group(1) or "text"
code_content = match.group(2)
highlighted = self.format_code(code_content, lang)
solution_html += f"""
<div class="code-block">
<div class="code-header">
<span class="lang-tag">{lang}</span>
<span class="copy-btn" onclick="navigator.clipboard.writeText(this.parentElement.nextElementSibling.innerText)">Copy</span>
</div>
<pre>{highlighted}</pre>
</div>"""
else:
solution_html += f"<pre>{html.escape(part)}</pre>"
else:
# Normal text processing
clean_text = html.escape(part)
# Headers
clean_text = re.sub(r'^### (.*?)$', r'<h3>\1</h3>', clean_text, flags=re.M)
clean_text = re.sub(r'^## (.*?)$', r'<h2>\1</h2>', clean_text, flags=re.M)
clean_text = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', clean_text)
# Line breaks
clean_text = clean_text.replace('\n', '<br>')
solution_html += f"<div class='text-content'>{clean_text}</div>"
# 3. Process Thinking
thinking_html = html.escape(thinking).replace('\n', '<br>')
# 4. Stats & Cursor
is_gen = stats['generating'] if stats else False
t_sec = stats['tokens'] / stats['time'] if stats and stats['time'] > 0 else 0
cursor = '<span class="cursor"></span>' if is_gen else ''
# CSS Styles (Modern Dark Theme)
css = """
<style>
:root { --bg: #0f1117; --card: #1e293b; --accent: #6366f1; --text: #e2e8f0; --dim: #94a3b8; }
.ui-container { font-family: 'Inter', system-ui, sans-serif; color: var(--text); line-height: 1.6; }
/* Stats Bar */
.stats-bar { display: flex; gap: 15px; margin-bottom: 20px; font-size: 12px; text-transform: uppercase; letter-spacing: 1px; }
.stat-pill { background: #334155; padding: 4px 10px; border-radius: 20px; color: #cbd5e1; display: flex; align-items: center; gap: 6px; }
.stat-active { border: 1px solid var(--accent); color: var(--accent); background: rgba(99, 102, 241, 0.1); }
/* Thinking Section */
details.thinking-box { margin-bottom: 20px; border: 1px solid #312e81; border-radius: 8px; background: rgba(49, 46, 129, 0.1); overflow: hidden; }
details.thinking-box summary { padding: 12px 16px; cursor: pointer; font-weight: 600; color: #818cf8; list-style: none; outline: none; user-select: none; }
details.thinking-box summary::marker { display: none; }
details.thinking-box summary:hover { background: rgba(49, 46, 129, 0.2); }
.thought-content { padding: 16px; font-family: 'JetBrains Mono', monospace; font-size: 13px; color: #a5b4fc; border-top: 1px solid #312e81; }
/* Solution Section */
.solution-box { background: var(--bg); padding: 10px 0; }
.text-content { margin-bottom: 10px; }
h2, h3 { color: white; margin-top: 20px; margin-bottom: 10px; font-weight: 600; }
strong { color: #fff; font-weight: 700; }
/* Code Blocks */
.code-block { background: #0d1117; border: 1px solid #30363d; border-radius: 8px; margin: 15px 0; overflow: hidden; }
.code-header { background: #161b22; padding: 6px 12px; display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid #30363d; }
.lang-tag { font-size: 11px; color: #8b949e; text-transform: uppercase; font-weight: bold; }
.copy-btn { font-size: 11px; cursor: pointer; color: #58a6ff; }
.copy-btn:hover { text-decoration: underline; }
pre { margin: 0; padding: 16px; overflow-x: auto; font-family: 'Fira Code', 'Consolas', monospace; font-size: 14px; color: #c9d1d9; }
/* Syntax Highlighting Colors */
.k { color: #ff7b72; } /* Keyword */
.s { color: #a5d6ff; } /* String */
.c { color: #8b949e; font-style: italic; } /* Comment */
.nf { color: #d2a8ff; } /* Function */
/* Cursor Animation */
.cursor { display: inline-block; width: 8px; height: 18px; background: var(--accent); vertical-align: text-bottom; animation: blink 1s step-end infinite; margin-left: 2px; }
@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } }
</style>
"""
html_out = f"""{css}
<div class="ui-container">
<div class="stats-bar">
<div class="stat-pill {'stat-active' if is_gen else ''}">
{ '🟢 GENERATING' if is_gen else '⚪ COMPLETE' }
</div>
<div class="stat-pill">⏱️ {stats['time']:.1f}s</div>
<div class="stat-pill">⚡ {t_sec:.1f} T/s</div>
<div class="stat-pill">📝 {stats['tokens']} Tok</div>
</div>
"""
if thinking:
# Open by default if generating, closed if done
is_open = "open" if is_gen else ""
html_out += f"""
<details class="thinking-box" {is_open}>
<summary>🧠 Chain of Thought (Process)</summary>
<div class="thought-content">
{thinking_html} {cursor if not solution else ''}
</div>
</details>
"""
html_out += f"""
<div class="solution-box">
{solution_html} {cursor if solution or not thinking else ''}
</div>
</div>
"""
return html_out
parser = ModernUIParser()
def run_gen(prompt, temp, max_tokens):
if not prompt: return "Please enter a prompt."
gen = vibe_model.generate_response_streaming(prompt, temp, max_tokens)
for text, stats in gen:
if stats:
yield parser.parse_and_render(text, stats)
else:
yield f"<div style='color:red'>Error: {text}</div>"
def stop_action():
vibe_model.stop_generation()
# --- GRADIO INTERFACE ---
with gr.Blocks(
title="VibeThinker IDE",
theme=gr.themes.Base(
primary_hue="indigo",
neutral_hue="slate",
font=("Inter", "sans-serif")
),
css=".gradio-container { background-color: #0f1117 !important; border: none; }"
) as demo:
gr.Markdown("""
<div style="text-align: center; margin-bottom: 20px;">
<h1 style="color: white; font-size: 2rem;">⚡ VibeThinker IDE</h1>
<p style="color: #94a3b8;">Specialized 1.5B Model for Algorithms & Competitive Coding</p>
</div>
""")
with gr.Row():
# Left Column: Inputs
with gr.Column(scale=1, variant="panel"):
input_text = gr.Textbox(
label="Problem Statement",
lines=8,
placeholder="Paste a LeetCode problem or ask for a specific algorithm...",
elem_id="input-box"
)
with gr.Accordion("Settings", open=False):
temp = gr.Slider(0.1, 1.0, value=0.6, label="Temperature")
tokens = gr.Slider(1024, 32000, value=8192, label="Max Tokens")
with gr.Row():
btn_run = gr.Button("▶ Run", variant="primary", scale=2)
btn_stop = gr.Button("⏹ Stop", variant="stop", scale=1)
# Right Column: Output
with gr.Column(scale=2):
out_html = gr.HTML(label="Result Console")
btn_run.click(run_gen, inputs=[input_text, temp, tokens], outputs=out_html)
btn_stop.click(stop_action, None, None)
gr.Examples(
examples=[
["Determine if a Sudoku board is valid. Provide a Python solution with O(1) space complexity if possible."],
["Explain the Knuth-Morris-Pratt (KMP) algorithm and implement it in Python."],
["Solve the 'Trapping Rain Water' problem using the two-pointer approach."],
],
inputs=input_text
)
if __name__ == "__main__":
demo.launch()