Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| from threading import Thread, Event | |
| import re | |
| import time | |
| import html | |
| # --- Configuration --- | |
| MODEL_ID = "WeiboAI/VibeThinker-1.5B" | |
| class VibeThinkerModel: | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.stop_signal = Event() | |
| def load_model(self): | |
| if self.model is not None: return | |
| print(f"🔄 Loading {MODEL_ID}...") | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16, | |
| device_map="auto", | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True | |
| ) | |
| print("✅ Model loaded.") | |
| except Exception as e: | |
| raise e | |
| def stop_generation(self): | |
| self.stop_signal.set() | |
| def _detect_tail_loop(self, text, min_phrase_len=3, max_phrase_len=10, threshold=20): | |
| """ | |
| Detects if the generator has gotten stuck in a loop at the END of the text. | |
| Criteria: A phrase of 3-10 words repeated at least 20 times consecutively. | |
| """ | |
| words = text.split() | |
| total_words = len(words) | |
| # We need at least (min_phrase * threshold) words to even check | |
| if total_words < min_phrase_len * threshold: | |
| return False | |
| # Only check the end of the string (optimization) | |
| # We look at the last (max_phrase * threshold) words | |
| check_window = max_phrase_len * threshold | |
| recent_words = words[-check_window:] if total_words > check_window else words | |
| for phrase_len in range(min_phrase_len, max_phrase_len + 1): | |
| # The candidate phrase is the very last 'phrase_len' words | |
| candidate_phrase = recent_words[-phrase_len:] | |
| # Construct what the tail SHOULD look like if it's looping | |
| # e.g. if phrase is "and then", we expect "and then and then..." | |
| # We check if the tail of the text matches (phrase * threshold) | |
| required_len = phrase_len * threshold | |
| if len(recent_words) < required_len: | |
| continue | |
| segment_to_check = recent_words[-required_len:] | |
| # Efficient check: does the segment consist ONLY of the candidate phrase? | |
| # We compare the segment against the candidate phrase repeated | |
| expected_segment = candidate_phrase * threshold | |
| if segment_to_check == expected_segment: | |
| return True | |
| return False | |
| def generate_response_streaming(self, prompt, temperature=0.6, max_new_tokens=32000): | |
| if not self.model: self.load_model() | |
| self.stop_signal.clear() | |
| try: | |
| start_time = time.time() | |
| # Optimized Prompt for VibeThinker | |
| messages = [ | |
| {"role": "system", "content": "You are an expert algorithm engineer. Analyze the problem deeply, then provide a clean Python solution."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| text_input = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = self.tokenizer(text_input, return_tensors="pt").to(self.device) | |
| streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = dict( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| top_p=0.95, | |
| top_k=50, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| streamer=streamer, | |
| ) | |
| thread = Thread(target=self.model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| generated_text = "" | |
| loop_detected = False | |
| # Token counter for loop check frequency | |
| token_count = 0 | |
| for new_text in streamer: | |
| if self.stop_signal.is_set(): break | |
| generated_text += new_text | |
| token_count += 1 | |
| # Check for loops every 10 tokens to save CPU | |
| if token_count % 10 == 0: | |
| if self._detect_tail_loop(generated_text): | |
| loop_detected = True | |
| self.stop_signal.set() # Stop the model | |
| # Optional: Truncate the repetitive garbage | |
| # (Simple truncation for UI cleanliness) | |
| generated_text = generated_text + "\n\n[⚠️ Generation stopped: Infinite loop detected]" | |
| break | |
| yield generated_text, { | |
| "time": time.time() - start_time, | |
| "tokens": len(self.tokenizer.encode(generated_text)), | |
| "generating": True | |
| } | |
| if not self.stop_signal.is_set(): | |
| thread.join() | |
| yield generated_text, { | |
| "time": time.time() - start_time, | |
| "tokens": len(self.tokenizer.encode(generated_text)), | |
| "generating": False | |
| } | |
| except Exception as e: | |
| yield f"Error: {str(e)}", None | |
| vibe_model = VibeThinkerModel() | |
| class ModernUIParser: | |
| """Parses text into a structured, modern UI""" | |
| def format_code(self, code, lang="python"): | |
| """Applies basic HTML syntax highlighting regex""" | |
| code = html.escape(code) | |
| # Comments | |
| code = re.sub(r'(#.*?)(?=\n|$)', r'<span class="c">\1</span>', code) | |
| # Keywords | |
| keywords = r'\b(def|class|return|if|else|elif|for|while|import|from|try|except|with|as|pass|None|True|False)\b' | |
| code = re.sub(keywords, r'<span class="k">\1</span>', code) | |
| # Builtins/Calls | |
| code = re.sub(r'\b(print|len|range|enumerate|zip|super|__init__)\b', r'<span class="nf">\1</span>', code) | |
| # Strings | |
| code = re.sub(r'(".*?")', r'<span class="s">\1</span>', code) | |
| code = re.sub(r"('.*?')", r'<span class="s">\1</span>', code) | |
| return code | |
| def parse_and_render(self, text, stats): | |
| # 1. Separate Thinking from Content | |
| # Heuristic: Content before the first code block or explicit "Solution" header is usually thinking | |
| thinking = "" | |
| solution = text | |
| # Find split point | |
| markers = ["```", "Here is the solution", "### Solution", "Implementation:"] | |
| first_marker_idx = len(text) | |
| for m in markers: | |
| idx = text.find(m) | |
| if idx != -1 and idx < first_marker_idx: | |
| first_marker_idx = idx | |
| if first_marker_idx < len(text) and first_marker_idx > 50: | |
| thinking = text[:first_marker_idx].strip() | |
| solution = text[first_marker_idx:].strip() | |
| # 2. Process Solution Text (Markdown-ish to HTML) | |
| # Handle Code Blocks | |
| parts = re.split(r'(```\w*\n.*?```)', solution, flags=re.DOTALL) | |
| solution_html = "" | |
| for part in parts: | |
| if part.startswith('```'): | |
| # Extract lang and code | |
| match = re.match(r'```(\w*)\n(.*?)```', part, re.DOTALL) | |
| if match: | |
| lang = match.group(1) or "text" | |
| code_content = match.group(2) | |
| highlighted = self.format_code(code_content, lang) | |
| solution_html += f""" | |
| <div class="code-block"> | |
| <div class="code-header"> | |
| <span class="lang-tag">{lang}</span> | |
| <span class="copy-btn" onclick="navigator.clipboard.writeText(this.parentElement.nextElementSibling.innerText)">Copy</span> | |
| </div> | |
| <pre>{highlighted}</pre> | |
| </div>""" | |
| else: | |
| solution_html += f"<pre>{html.escape(part)}</pre>" | |
| else: | |
| # Normal text processing | |
| clean_text = html.escape(part) | |
| # Headers | |
| clean_text = re.sub(r'^### (.*?)$', r'<h3>\1</h3>', clean_text, flags=re.M) | |
| clean_text = re.sub(r'^## (.*?)$', r'<h2>\1</h2>', clean_text, flags=re.M) | |
| clean_text = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', clean_text) | |
| # Line breaks | |
| clean_text = clean_text.replace('\n', '<br>') | |
| solution_html += f"<div class='text-content'>{clean_text}</div>" | |
| # 3. Process Thinking | |
| thinking_html = html.escape(thinking).replace('\n', '<br>') | |
| # 4. Stats & Cursor | |
| is_gen = stats['generating'] if stats else False | |
| t_sec = stats['tokens'] / stats['time'] if stats and stats['time'] > 0 else 0 | |
| cursor = '<span class="cursor"></span>' if is_gen else '' | |
| # CSS Styles (Modern Dark Theme) | |
| css = """ | |
| <style> | |
| :root { --bg: #0f1117; --card: #1e293b; --accent: #6366f1; --text: #e2e8f0; --dim: #94a3b8; } | |
| .ui-container { font-family: 'Inter', system-ui, sans-serif; color: var(--text); line-height: 1.6; } | |
| /* Stats Bar */ | |
| .stats-bar { display: flex; gap: 15px; margin-bottom: 20px; font-size: 12px; text-transform: uppercase; letter-spacing: 1px; } | |
| .stat-pill { background: #334155; padding: 4px 10px; border-radius: 20px; color: #cbd5e1; display: flex; align-items: center; gap: 6px; } | |
| .stat-active { border: 1px solid var(--accent); color: var(--accent); background: rgba(99, 102, 241, 0.1); } | |
| /* Thinking Section */ | |
| details.thinking-box { margin-bottom: 20px; border: 1px solid #312e81; border-radius: 8px; background: rgba(49, 46, 129, 0.1); overflow: hidden; } | |
| details.thinking-box summary { padding: 12px 16px; cursor: pointer; font-weight: 600; color: #818cf8; list-style: none; outline: none; user-select: none; } | |
| details.thinking-box summary::marker { display: none; } | |
| details.thinking-box summary:hover { background: rgba(49, 46, 129, 0.2); } | |
| .thought-content { padding: 16px; font-family: 'JetBrains Mono', monospace; font-size: 13px; color: #a5b4fc; border-top: 1px solid #312e81; } | |
| /* Solution Section */ | |
| .solution-box { background: var(--bg); padding: 10px 0; } | |
| .text-content { margin-bottom: 10px; } | |
| h2, h3 { color: white; margin-top: 20px; margin-bottom: 10px; font-weight: 600; } | |
| strong { color: #fff; font-weight: 700; } | |
| /* Code Blocks */ | |
| .code-block { background: #0d1117; border: 1px solid #30363d; border-radius: 8px; margin: 15px 0; overflow: hidden; } | |
| .code-header { background: #161b22; padding: 6px 12px; display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid #30363d; } | |
| .lang-tag { font-size: 11px; color: #8b949e; text-transform: uppercase; font-weight: bold; } | |
| .copy-btn { font-size: 11px; cursor: pointer; color: #58a6ff; } | |
| .copy-btn:hover { text-decoration: underline; } | |
| pre { margin: 0; padding: 16px; overflow-x: auto; font-family: 'Fira Code', 'Consolas', monospace; font-size: 14px; color: #c9d1d9; } | |
| /* Syntax Highlighting Colors */ | |
| .k { color: #ff7b72; } /* Keyword */ | |
| .s { color: #a5d6ff; } /* String */ | |
| .c { color: #8b949e; font-style: italic; } /* Comment */ | |
| .nf { color: #d2a8ff; } /* Function */ | |
| /* Cursor Animation */ | |
| .cursor { display: inline-block; width: 8px; height: 18px; background: var(--accent); vertical-align: text-bottom; animation: blink 1s step-end infinite; margin-left: 2px; } | |
| @keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } } | |
| </style> | |
| """ | |
| html_out = f"""{css} | |
| <div class="ui-container"> | |
| <div class="stats-bar"> | |
| <div class="stat-pill {'stat-active' if is_gen else ''}"> | |
| { '🟢 GENERATING' if is_gen else '⚪ COMPLETE' } | |
| </div> | |
| <div class="stat-pill">⏱️ {stats['time']:.1f}s</div> | |
| <div class="stat-pill">⚡ {t_sec:.1f} T/s</div> | |
| <div class="stat-pill">📝 {stats['tokens']} Tok</div> | |
| </div> | |
| """ | |
| if thinking: | |
| # Open by default if generating, closed if done | |
| is_open = "open" if is_gen else "" | |
| html_out += f""" | |
| <details class="thinking-box" {is_open}> | |
| <summary>🧠 Chain of Thought (Process)</summary> | |
| <div class="thought-content"> | |
| {thinking_html} {cursor if not solution else ''} | |
| </div> | |
| </details> | |
| """ | |
| html_out += f""" | |
| <div class="solution-box"> | |
| {solution_html} {cursor if solution or not thinking else ''} | |
| </div> | |
| </div> | |
| """ | |
| return html_out | |
| parser = ModernUIParser() | |
| def run_gen(prompt, temp, max_tokens): | |
| if not prompt: return "Please enter a prompt." | |
| gen = vibe_model.generate_response_streaming(prompt, temp, max_tokens) | |
| for text, stats in gen: | |
| if stats: | |
| yield parser.parse_and_render(text, stats) | |
| else: | |
| yield f"<div style='color:red'>Error: {text}</div>" | |
| def stop_action(): | |
| vibe_model.stop_generation() | |
| # --- GRADIO INTERFACE --- | |
| with gr.Blocks( | |
| title="VibeThinker IDE", | |
| theme=gr.themes.Base( | |
| primary_hue="indigo", | |
| neutral_hue="slate", | |
| font=("Inter", "sans-serif") | |
| ), | |
| css=".gradio-container { background-color: #0f1117 !important; border: none; }" | |
| ) as demo: | |
| gr.Markdown(""" | |
| <div style="text-align: center; margin-bottom: 20px;"> | |
| <h1 style="color: white; font-size: 2rem;">⚡ VibeThinker IDE</h1> | |
| <p style="color: #94a3b8;">Specialized 1.5B Model for Algorithms & Competitive Coding</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| # Left Column: Inputs | |
| with gr.Column(scale=1, variant="panel"): | |
| input_text = gr.Textbox( | |
| label="Problem Statement", | |
| lines=8, | |
| placeholder="Paste a LeetCode problem or ask for a specific algorithm...", | |
| elem_id="input-box" | |
| ) | |
| with gr.Accordion("Settings", open=False): | |
| temp = gr.Slider(0.1, 1.0, value=0.6, label="Temperature") | |
| tokens = gr.Slider(1024, 32000, value=8192, label="Max Tokens") | |
| with gr.Row(): | |
| btn_run = gr.Button("▶ Run", variant="primary", scale=2) | |
| btn_stop = gr.Button("⏹ Stop", variant="stop", scale=1) | |
| # Right Column: Output | |
| with gr.Column(scale=2): | |
| out_html = gr.HTML(label="Result Console") | |
| btn_run.click(run_gen, inputs=[input_text, temp, tokens], outputs=out_html) | |
| btn_stop.click(stop_action, None, None) | |
| gr.Examples( | |
| examples=[ | |
| ["Determine if a Sudoku board is valid. Provide a Python solution with O(1) space complexity if possible."], | |
| ["Explain the Knuth-Morris-Pratt (KMP) algorithm and implement it in Python."], | |
| ["Solve the 'Trapping Rain Water' problem using the two-pointer approach."], | |
| ], | |
| inputs=input_text | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |