Spaces:
Sleeping
Sleeping
| # ============================================================================== | |
| # HuggingFace Space - Sam Model Chat Interface with Streaming | |
| # ============================================================================== | |
| # Loads model directly from HuggingFace Hub: Smilyai-labs/Sam-1-large | |
| # ============================================================================== | |
| import gradio as gr | |
| import tensorflow as tf | |
| import keras | |
| import numpy as np | |
| from tokenizers import Tokenizer | |
| from huggingface_hub import hf_hub_download | |
| import os | |
| # ============================================================================== | |
| # Model Configuration | |
| # ============================================================================== | |
| MODEL_REPO = "Smilyai-labs/Sam-1-large" # Your HuggingFace model repo | |
| MAX_NEW_TOKENS = 512 | |
| TEMPERATURE = 0.8 | |
| TOP_P = 0.9 | |
| TOP_K = 50 | |
| # ============================================================================== | |
| # Custom Keras Layers (Must match training code) | |
| # ============================================================================== | |
| class RotaryEmbedding(keras.layers.Layer): | |
| def __init__(self, dim, max_len=2048, theta=10000, **kwargs): | |
| super().__init__(**kwargs) | |
| self.dim = dim | |
| self.max_len = max_len | |
| self.theta = theta | |
| self.built_cache = False | |
| def build(self, input_shape): | |
| if not self.built_cache: | |
| inv_freq = 1.0 / (self.theta ** (tf.range(0, self.dim, 2, dtype=tf.float32) / self.dim)) | |
| t = tf.range(self.max_len, dtype=tf.float32) | |
| freqs = tf.einsum("i,j->ij", t, inv_freq) | |
| emb = tf.concat([freqs, freqs], axis=-1) | |
| self.cos_cached = tf.constant(tf.cos(emb), dtype=tf.float32) | |
| self.sin_cached = tf.constant(tf.sin(emb), dtype=tf.float32) | |
| self.built_cache = True | |
| super().build(input_shape) | |
| def rotate_half(self, x): | |
| x1, x2 = tf.split(x, 2, axis=-1) | |
| return tf.concat([-x2, x1], axis=-1) | |
| def call(self, q, k): | |
| seq_len = tf.shape(q)[2] | |
| dtype = q.dtype | |
| cos = tf.cast(self.cos_cached[:seq_len, :], dtype)[None, None, :, :] | |
| sin = tf.cast(self.sin_cached[:seq_len, :], dtype)[None, None, :, :] | |
| q_rotated = (q * cos) + (self.rotate_half(q) * sin) | |
| k_rotated = (k * cos) + (self.rotate_half(k) * sin) | |
| return q_rotated, k_rotated | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({"dim": self.dim, "max_len": self.max_len, "theta": self.theta}) | |
| return config | |
| class RMSNorm(keras.layers.Layer): | |
| def __init__(self, epsilon=1e-5, **kwargs): | |
| super().__init__(**kwargs) | |
| self.epsilon = epsilon | |
| def build(self, input_shape): | |
| self.scale = self.add_weight(name="scale", shape=(input_shape[-1],), initializer="ones") | |
| def call(self, x): | |
| variance = tf.reduce_mean(tf.square(x), axis=-1, keepdims=True) | |
| return x * tf.math.rsqrt(variance + self.epsilon) * self.scale | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({"epsilon": self.epsilon}) | |
| return config | |
| class TransformerBlock(keras.layers.Layer): | |
| def __init__(self, d_model, n_heads, ff_dim, dropout, max_len, rope_theta, layer_idx=0, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.n_heads = n_heads | |
| self.ff_dim = ff_dim | |
| self.dropout_rate = dropout | |
| self.max_len = max_len | |
| self.rope_theta = rope_theta | |
| self.head_dim = d_model // n_heads | |
| self.layer_idx = layer_idx | |
| self.pre_attn_norm = RMSNorm() | |
| self.pre_ffn_norm = RMSNorm() | |
| self.q_proj = keras.layers.Dense(d_model, use_bias=False, name="q_proj") | |
| self.k_proj = keras.layers.Dense(d_model, use_bias=False, name="k_proj") | |
| self.v_proj = keras.layers.Dense(d_model, use_bias=False, name="v_proj") | |
| self.out_proj = keras.layers.Dense(d_model, use_bias=False, name="o_proj") | |
| self.rope = RotaryEmbedding(self.head_dim, max_len=max_len, theta=rope_theta) | |
| self.gate_proj = keras.layers.Dense(ff_dim, use_bias=False, name="gate_proj") | |
| self.up_proj = keras.layers.Dense(ff_dim, use_bias=False, name="up_proj") | |
| self.down_proj = keras.layers.Dense(d_model, use_bias=False, name="down_proj") | |
| self.dropout = keras.layers.Dropout(dropout) | |
| def call(self, x, training=None): | |
| B, T, D = tf.shape(x)[0], tf.shape(x)[1], self.d_model | |
| dtype = x.dtype | |
| res = x | |
| y = self.pre_attn_norm(x) | |
| q = tf.transpose(tf.reshape(self.q_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) | |
| k = tf.transpose(tf.reshape(self.k_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) | |
| v = tf.transpose(tf.reshape(self.v_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) | |
| q, k = self.rope(q, k) | |
| scores = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.head_dim, dtype)) | |
| mask = tf.where( | |
| tf.linalg.band_part(tf.ones([T, T], dtype=dtype), -1, 0) == 0, | |
| tf.constant(-1e9, dtype=dtype), | |
| tf.constant(0.0, dtype=dtype) | |
| ) | |
| scores += mask | |
| attn = tf.matmul(tf.nn.softmax(scores, axis=-1), v) | |
| attn = tf.reshape(tf.transpose(attn, [0, 2, 1, 3]), [B, T, D]) | |
| x = res + self.dropout(self.out_proj(attn), training=training) | |
| res = x | |
| y = self.pre_ffn_norm(x) | |
| ffn = self.down_proj(keras.activations.silu(self.gate_proj(y)) * self.up_proj(y)) | |
| return res + self.dropout(ffn, training=training) | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "d_model": self.d_model, | |
| "n_heads": self.n_heads, | |
| "ff_dim": self.ff_dim, | |
| "dropout": self.dropout_rate, | |
| "max_len": self.max_len, | |
| "rope_theta": self.rope_theta, | |
| "layer_idx": self.layer_idx | |
| }) | |
| return config | |
| class SAM1Model(keras.Model): | |
| def __init__(self, **kwargs): | |
| super().__init__() | |
| if 'config' in kwargs and isinstance(kwargs['config'], dict): | |
| self.cfg = kwargs['config'] | |
| elif 'vocab_size' in kwargs: | |
| self.cfg = kwargs | |
| else: | |
| self.cfg = kwargs.get('cfg', kwargs) | |
| self.embed = keras.layers.Embedding(self.cfg['vocab_size'], self.cfg['d_model'], name="embed_tokens") | |
| ff_dim = int(self.cfg['d_model'] * self.cfg['ff_mult']) | |
| block_args = { | |
| 'd_model': self.cfg['d_model'], | |
| 'n_heads': self.cfg['n_heads'], | |
| 'ff_dim': ff_dim, | |
| 'dropout': self.cfg['dropout'], | |
| 'max_len': self.cfg['max_len'], | |
| 'rope_theta': self.cfg['rope_theta'] | |
| } | |
| self.blocks = [] | |
| for i in range(self.cfg['n_layers']): | |
| block = TransformerBlock(name=f"block_{i}", layer_idx=i, **block_args) | |
| self.blocks.append(block) | |
| self.norm = RMSNorm(name="final_norm") | |
| self.lm_head = keras.layers.Dense(self.cfg['vocab_size'], use_bias=False, name="lm_head") | |
| def call(self, input_ids, training=None): | |
| x = self.embed(input_ids) | |
| for block in self.blocks: | |
| x = block(x, training=training) | |
| return self.lm_head(self.norm(x)) | |
| def get_config(self): | |
| base_config = super().get_config() | |
| base_config['config'] = self.cfg | |
| return base_config | |
| # ============================================================================== | |
| # Load Model and Tokenizer from HuggingFace Hub | |
| # ============================================================================== | |
| print("🔥 Loading Sam model from HuggingFace Hub...") | |
| print(f" Repository: {MODEL_REPO}") | |
| try: | |
| # Download model file | |
| print("📥 Downloading model.keras...") | |
| model_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename="model.keras", | |
| cache_dir="./model_cache" | |
| ) | |
| print(f"✅ Model downloaded to: {model_path}") | |
| # Download tokenizer | |
| print("📥 Downloading tokenizer.json...") | |
| tokenizer_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename="tokenizer.json", | |
| cache_dir="./model_cache" | |
| ) | |
| print(f"✅ Tokenizer downloaded to: {tokenizer_path}") | |
| # Load tokenizer | |
| tokenizer = Tokenizer.from_file(tokenizer_path) | |
| eos_token = "<|endoftext|>" | |
| eos_token_id = tokenizer.token_to_id(eos_token) | |
| print(f"✅ Tokenizer loaded (vocab_size={tokenizer.get_vocab_size()})") | |
| # Load model | |
| print("🔄 Loading model into memory...") | |
| model = keras.models.load_model(model_path) | |
| print(f"✅ Model loaded successfully!") | |
| except Exception as e: | |
| print(f"❌ Error loading model: {e}") | |
| print("\n💡 Troubleshooting:") | |
| print("1. Make sure the model repo exists: https://huggingface.co/Smilyai-labs/Sam-1-large") | |
| print("2. Check that model.keras and tokenizer.json are in the repo") | |
| print("3. If repo is private, you may need to login: huggingface-cli login") | |
| raise | |
| # ============================================================================== | |
| # Generation Functions | |
| # ============================================================================== | |
| def sample_token(logits, temperature=1.0, top_p=0.9, top_k=50): | |
| """Sample next token with temperature, top-p, and top-k""" | |
| logits = logits / temperature | |
| # Top-k filtering | |
| if top_k > 0: | |
| top_k_logits, top_k_indices = tf.nn.top_k(logits, k=min(top_k, logits.shape[-1])) | |
| logits = tf.where( | |
| tf.reduce_any(tf.equal(tf.expand_dims(tf.range(logits.shape[-1]), 0), | |
| tf.expand_dims(top_k_indices, -1)), axis=1), | |
| logits, | |
| tf.fill(logits.shape, -1e10) | |
| ) | |
| # Top-p (nucleus) filtering | |
| if top_p < 1.0: | |
| sorted_logits = tf.sort(logits, direction='DESCENDING') | |
| sorted_probs = tf.nn.softmax(sorted_logits) | |
| cumsum_probs = tf.cumsum(sorted_probs) | |
| sorted_indices_to_remove = cumsum_probs > top_p | |
| sorted_indices_to_remove = tf.concat([ | |
| [False], | |
| sorted_indices_to_remove[:-1] | |
| ], axis=0) | |
| sorted_indices = tf.argsort(logits, direction='DESCENDING') | |
| indices_to_remove = tf.gather(sorted_indices_to_remove, tf.argsort(sorted_indices)) | |
| logits = tf.where(indices_to_remove, -1e10, logits) | |
| # Sample | |
| probs = tf.nn.softmax(logits) | |
| next_token = tf.random.categorical(tf.math.log(probs[None, :]), num_samples=1)[0, 0] | |
| return next_token.numpy() | |
| def generate_stream(prompt, max_new_tokens=512, temperature=0.8, top_p=0.9, top_k=50): | |
| """Generate text with streaming (yields tokens as they're generated)""" | |
| # Format prompt | |
| formatted_prompt = f"User: {prompt}\nSam:" | |
| # Tokenize | |
| encoding = tokenizer.encode(formatted_prompt) | |
| input_ids = np.array([encoding.ids], dtype=np.int32) | |
| # Check if prompt is too long | |
| if input_ids.shape[1] > 1000: | |
| yield "❌ Error: Prompt is too long (max 1000 tokens)" | |
| return | |
| generated_text = "" | |
| for _ in range(max_new_tokens): | |
| # Get logits | |
| logits = model(input_ids, training=False) | |
| next_token_logits = logits[0, -1, :].numpy() | |
| # Sample next token | |
| next_token = sample_token(next_token_logits, temperature, top_p, top_k) | |
| # Stop if EOS | |
| if next_token == eos_token_id: | |
| break | |
| # Decode token | |
| token_text = tokenizer.decode([next_token]) | |
| generated_text += token_text | |
| # Yield for streaming | |
| yield generated_text | |
| # Append to input | |
| input_ids = np.concatenate([input_ids, [[next_token]]], axis=1) | |
| # Stop if we hit max length | |
| if input_ids.shape[1] >= 1024: | |
| break | |
| def chat_interface(message, history, temperature, top_p, top_k, max_tokens): | |
| """Gradio chat interface with streaming""" | |
| if not message.strip(): | |
| yield history | |
| return | |
| # Build conversation context from history (last 3 turns to save tokens) | |
| conversation = "" | |
| recent_history = history[-3:] if len(history) > 3 else history | |
| for user_msg, bot_msg in recent_history: | |
| conversation += f"User: {user_msg}\nSam: {bot_msg}\n" | |
| # Add current message | |
| full_prompt = conversation + message if conversation else message | |
| # Add user message to history immediately | |
| history.append([message, ""]) | |
| # Generate with streaming | |
| for response_chunk in generate_stream( | |
| full_prompt, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k | |
| ): | |
| # Update the bot's response in history | |
| history[-1][1] = response_chunk | |
| yield history | |
| # ============================================================================== | |
| # Gradio Interface | |
| # ============================================================================== | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Chat with Sam") as demo: | |
| gr.Markdown(""" | |
| # 🤖 Chat with Sam | |
| **Sam** is a fine-tuned language model trained on math, code, reasoning, and conversational data. | |
| ### ✨ Capabilities: | |
| - 🧮 **Math**: Solve arithmetic and word problems (trained on GSM8K) | |
| - 💻 **Code**: Write Python, JavaScript, and more (trained on CodeAlpaca) | |
| - 🤔 **Reasoning**: Show step-by-step thinking with `<think>` tags | |
| - 💬 **Chat**: Natural conversations on any topic | |
| ### 📊 Model Info: | |
| - **Architecture**: 768d, 16 layers, 12 heads (~100M parameters) | |
| - **Context**: 1024 tokens | |
| - **Training**: TPU v5e-8 on multi-dataset mix | |
| """) | |
| chatbot = gr.Chatbot( | |
| label="💬 Conversation", | |
| height=450, | |
| show_copy_button=True, | |
| avatar_images=(None, "🤖"), | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Your message", | |
| placeholder="Ask Sam anything... (e.g., 'What is 127 * 43?' or 'Write a function to sort a list')", | |
| lines=2, | |
| scale=4, | |
| autofocus=True | |
| ) | |
| submit = gr.Button("Send 🚀", scale=1, variant="primary") | |
| with gr.Accordion("⚙️ Generation Settings", open=False): | |
| with gr.Row(): | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=2.0, | |
| value=TEMPERATURE, | |
| step=0.1, | |
| label="Temperature", | |
| info="Higher = more creative/random" | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=TOP_P, | |
| step=0.05, | |
| label="Top-p", | |
| info="Nucleus sampling threshold" | |
| ) | |
| with gr.Row(): | |
| top_k = gr.Slider( | |
| minimum=1, | |
| maximum=100, | |
| value=TOP_K, | |
| step=1, | |
| label="Top-k", | |
| info="Vocabulary size limit" | |
| ) | |
| max_tokens = gr.Slider( | |
| minimum=50, | |
| maximum=512, | |
| value=MAX_NEW_TOKENS, | |
| step=50, | |
| label="Max tokens", | |
| info="Maximum response length" | |
| ) | |
| with gr.Row(): | |
| clear = gr.Button("🗑️ Clear Chat") | |
| with gr.Accordion("💡 Example Prompts", open=False): | |
| gr.Examples( | |
| examples=[ | |
| ["What is 127 * 43?"], | |
| ["Write a Python function to reverse a string"], | |
| ["Explain how photosynthesis works"], | |
| ["What's the capital of France?"], | |
| ["Write a haiku about coding"], | |
| ["How do I sort a list in Python?"], | |
| ], | |
| inputs=msg, | |
| label="Click to try:" | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### 📝 Tips: | |
| - Sam uses conversational format: `User: ... Sam: ...` | |
| - Watch for `<think>` tags showing reasoning process | |
| - Adjust temperature for more creative (higher) or focused (lower) responses | |
| - Model remembers last 3 conversation turns for context | |
| ### 🔗 Links: | |
| - Model: [Smilyai-labs/Sam-1-large](https://huggingface.co/Smilyai-labs/Sam-1-large) | |
| - Training: TPU v5e-8 on Kaggle | |
| - Framework: TensorFlow/Keras | |
| """) | |
| # Event handlers | |
| def respond(message, chat_history, temperature, top_p, top_k, max_tokens): | |
| """Handle message and generate response""" | |
| # Add user message to history | |
| chat_history.append([message, None]) | |
| # Build conversation context from history (last 3 turns to save tokens) | |
| conversation = "" | |
| recent_history = chat_history[:-1][-3:] if len(chat_history) > 1 else [] | |
| for user_msg, bot_msg in recent_history: | |
| if bot_msg: # Only include completed turns | |
| conversation += f"User: {user_msg}\nSam: {bot_msg}\n" | |
| # Add current message | |
| full_prompt = conversation + message if conversation else message | |
| # Generate with streaming | |
| chat_history[-1][1] = "" | |
| for response_chunk in generate_stream( | |
| full_prompt, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k | |
| ): | |
| chat_history[-1][1] = response_chunk | |
| yield chat_history | |
| msg.submit( | |
| respond, | |
| [msg, chatbot, temperature, top_p, top_k, max_tokens], | |
| chatbot | |
| ).then( | |
| lambda: gr.Textbox(value=""), | |
| None, | |
| msg | |
| ) | |
| submit.click( | |
| respond, | |
| [msg, chatbot, temperature, top_p, top_k, max_tokens], | |
| chatbot | |
| ).then( | |
| lambda: gr.Textbox(value=""), | |
| None, | |
| msg | |
| ) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| # Launch | |
| if __name__ == "__main__": | |
| print("\n" + "="*70) | |
| print("🚀 STARTING SAM CHAT INTERFACE".center(70)) | |
| print("="*70) | |
| print(f"\n✅ Model loaded from: {MODEL_REPO}") | |
| print(f"✅ Vocab size: {tokenizer.get_vocab_size()}") | |
| print(f"✅ Ready to chat!\n") | |
| demo.queue() # Enable streaming | |
| demo.launch() |