# ============================================================================== # HuggingFace Space - Sam Model Chat Interface with Streaming # ============================================================================== # Loads model directly from HuggingFace Hub: Smilyai-labs/Sam-1-large # ============================================================================== import gradio as gr import tensorflow as tf import keras import numpy as np from tokenizers import Tokenizer from huggingface_hub import hf_hub_download import os # ============================================================================== # Model Configuration # ============================================================================== MODEL_REPO = "Smilyai-labs/Sam-1-large" # Your HuggingFace model repo MAX_NEW_TOKENS = 512 TEMPERATURE = 0.8 TOP_P = 0.9 TOP_K = 50 # ============================================================================== # Custom Keras Layers (Must match training code) # ============================================================================== @keras.saving.register_keras_serializable() class RotaryEmbedding(keras.layers.Layer): def __init__(self, dim, max_len=2048, theta=10000, **kwargs): super().__init__(**kwargs) self.dim = dim self.max_len = max_len self.theta = theta self.built_cache = False def build(self, input_shape): if not self.built_cache: inv_freq = 1.0 / (self.theta ** (tf.range(0, self.dim, 2, dtype=tf.float32) / self.dim)) t = tf.range(self.max_len, dtype=tf.float32) freqs = tf.einsum("i,j->ij", t, inv_freq) emb = tf.concat([freqs, freqs], axis=-1) self.cos_cached = tf.constant(tf.cos(emb), dtype=tf.float32) self.sin_cached = tf.constant(tf.sin(emb), dtype=tf.float32) self.built_cache = True super().build(input_shape) def rotate_half(self, x): x1, x2 = tf.split(x, 2, axis=-1) return tf.concat([-x2, x1], axis=-1) def call(self, q, k): seq_len = tf.shape(q)[2] dtype = q.dtype cos = tf.cast(self.cos_cached[:seq_len, :], dtype)[None, None, :, :] sin = tf.cast(self.sin_cached[:seq_len, :], dtype)[None, None, :, :] q_rotated = (q * cos) + (self.rotate_half(q) * sin) k_rotated = (k * cos) + (self.rotate_half(k) * sin) return q_rotated, k_rotated def get_config(self): config = super().get_config() config.update({"dim": self.dim, "max_len": self.max_len, "theta": self.theta}) return config @keras.saving.register_keras_serializable() class RMSNorm(keras.layers.Layer): def __init__(self, epsilon=1e-5, **kwargs): super().__init__(**kwargs) self.epsilon = epsilon def build(self, input_shape): self.scale = self.add_weight(name="scale", shape=(input_shape[-1],), initializer="ones") def call(self, x): variance = tf.reduce_mean(tf.square(x), axis=-1, keepdims=True) return x * tf.math.rsqrt(variance + self.epsilon) * self.scale def get_config(self): config = super().get_config() config.update({"epsilon": self.epsilon}) return config @keras.saving.register_keras_serializable() class TransformerBlock(keras.layers.Layer): def __init__(self, d_model, n_heads, ff_dim, dropout, max_len, rope_theta, layer_idx=0, **kwargs): super().__init__(**kwargs) self.d_model = d_model self.n_heads = n_heads self.ff_dim = ff_dim self.dropout_rate = dropout self.max_len = max_len self.rope_theta = rope_theta self.head_dim = d_model // n_heads self.layer_idx = layer_idx self.pre_attn_norm = RMSNorm() self.pre_ffn_norm = RMSNorm() self.q_proj = keras.layers.Dense(d_model, use_bias=False, name="q_proj") self.k_proj = keras.layers.Dense(d_model, use_bias=False, name="k_proj") self.v_proj = keras.layers.Dense(d_model, use_bias=False, name="v_proj") self.out_proj = keras.layers.Dense(d_model, use_bias=False, name="o_proj") self.rope = RotaryEmbedding(self.head_dim, max_len=max_len, theta=rope_theta) self.gate_proj = keras.layers.Dense(ff_dim, use_bias=False, name="gate_proj") self.up_proj = keras.layers.Dense(ff_dim, use_bias=False, name="up_proj") self.down_proj = keras.layers.Dense(d_model, use_bias=False, name="down_proj") self.dropout = keras.layers.Dropout(dropout) def call(self, x, training=None): B, T, D = tf.shape(x)[0], tf.shape(x)[1], self.d_model dtype = x.dtype res = x y = self.pre_attn_norm(x) q = tf.transpose(tf.reshape(self.q_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) k = tf.transpose(tf.reshape(self.k_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) v = tf.transpose(tf.reshape(self.v_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3]) q, k = self.rope(q, k) scores = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.head_dim, dtype)) mask = tf.where( tf.linalg.band_part(tf.ones([T, T], dtype=dtype), -1, 0) == 0, tf.constant(-1e9, dtype=dtype), tf.constant(0.0, dtype=dtype) ) scores += mask attn = tf.matmul(tf.nn.softmax(scores, axis=-1), v) attn = tf.reshape(tf.transpose(attn, [0, 2, 1, 3]), [B, T, D]) x = res + self.dropout(self.out_proj(attn), training=training) res = x y = self.pre_ffn_norm(x) ffn = self.down_proj(keras.activations.silu(self.gate_proj(y)) * self.up_proj(y)) return res + self.dropout(ffn, training=training) def get_config(self): config = super().get_config() config.update({ "d_model": self.d_model, "n_heads": self.n_heads, "ff_dim": self.ff_dim, "dropout": self.dropout_rate, "max_len": self.max_len, "rope_theta": self.rope_theta, "layer_idx": self.layer_idx }) return config @keras.saving.register_keras_serializable() class SAM1Model(keras.Model): def __init__(self, **kwargs): super().__init__() if 'config' in kwargs and isinstance(kwargs['config'], dict): self.cfg = kwargs['config'] elif 'vocab_size' in kwargs: self.cfg = kwargs else: self.cfg = kwargs.get('cfg', kwargs) self.embed = keras.layers.Embedding(self.cfg['vocab_size'], self.cfg['d_model'], name="embed_tokens") ff_dim = int(self.cfg['d_model'] * self.cfg['ff_mult']) block_args = { 'd_model': self.cfg['d_model'], 'n_heads': self.cfg['n_heads'], 'ff_dim': ff_dim, 'dropout': self.cfg['dropout'], 'max_len': self.cfg['max_len'], 'rope_theta': self.cfg['rope_theta'] } self.blocks = [] for i in range(self.cfg['n_layers']): block = TransformerBlock(name=f"block_{i}", layer_idx=i, **block_args) self.blocks.append(block) self.norm = RMSNorm(name="final_norm") self.lm_head = keras.layers.Dense(self.cfg['vocab_size'], use_bias=False, name="lm_head") def call(self, input_ids, training=None): x = self.embed(input_ids) for block in self.blocks: x = block(x, training=training) return self.lm_head(self.norm(x)) def get_config(self): base_config = super().get_config() base_config['config'] = self.cfg return base_config # ============================================================================== # Load Model and Tokenizer from HuggingFace Hub # ============================================================================== print("šŸ”„ Loading Sam model from HuggingFace Hub...") print(f" Repository: {MODEL_REPO}") try: # Download model file print("šŸ“„ Downloading model.keras...") model_path = hf_hub_download( repo_id=MODEL_REPO, filename="model.keras", cache_dir="./model_cache" ) print(f"āœ… Model downloaded to: {model_path}") # Download tokenizer print("šŸ“„ Downloading tokenizer.json...") tokenizer_path = hf_hub_download( repo_id=MODEL_REPO, filename="tokenizer.json", cache_dir="./model_cache" ) print(f"āœ… Tokenizer downloaded to: {tokenizer_path}") # Load tokenizer tokenizer = Tokenizer.from_file(tokenizer_path) eos_token = "<|endoftext|>" eos_token_id = tokenizer.token_to_id(eos_token) print(f"āœ… Tokenizer loaded (vocab_size={tokenizer.get_vocab_size()})") # Load model print("šŸ”„ Loading model into memory...") model = keras.models.load_model(model_path) print(f"āœ… Model loaded successfully!") except Exception as e: print(f"āŒ Error loading model: {e}") print("\nšŸ’” Troubleshooting:") print("1. Make sure the model repo exists: https://huggingface.co/Smilyai-labs/Sam-1-large") print("2. Check that model.keras and tokenizer.json are in the repo") print("3. If repo is private, you may need to login: huggingface-cli login") raise # ============================================================================== # Generation Functions # ============================================================================== def sample_token(logits, temperature=1.0, top_p=0.9, top_k=50): """Sample next token with temperature, top-p, and top-k""" logits = logits / temperature # Top-k filtering if top_k > 0: top_k_logits, top_k_indices = tf.nn.top_k(logits, k=min(top_k, logits.shape[-1])) logits = tf.where( tf.reduce_any(tf.equal(tf.expand_dims(tf.range(logits.shape[-1]), 0), tf.expand_dims(top_k_indices, -1)), axis=1), logits, tf.fill(logits.shape, -1e10) ) # Top-p (nucleus) filtering if top_p < 1.0: sorted_logits = tf.sort(logits, direction='DESCENDING') sorted_probs = tf.nn.softmax(sorted_logits) cumsum_probs = tf.cumsum(sorted_probs) sorted_indices_to_remove = cumsum_probs > top_p sorted_indices_to_remove = tf.concat([ [False], sorted_indices_to_remove[:-1] ], axis=0) sorted_indices = tf.argsort(logits, direction='DESCENDING') indices_to_remove = tf.gather(sorted_indices_to_remove, tf.argsort(sorted_indices)) logits = tf.where(indices_to_remove, -1e10, logits) # Sample probs = tf.nn.softmax(logits) next_token = tf.random.categorical(tf.math.log(probs[None, :]), num_samples=1)[0, 0] return next_token.numpy() def generate_stream(prompt, max_new_tokens=512, temperature=0.8, top_p=0.9, top_k=50): """Generate text with streaming (yields tokens as they're generated)""" # Format prompt formatted_prompt = f"User: {prompt}\nSam:" # Tokenize encoding = tokenizer.encode(formatted_prompt) input_ids = np.array([encoding.ids], dtype=np.int32) # Check if prompt is too long if input_ids.shape[1] > 1000: yield "āŒ Error: Prompt is too long (max 1000 tokens)" return generated_text = "" for _ in range(max_new_tokens): # Get logits logits = model(input_ids, training=False) next_token_logits = logits[0, -1, :].numpy() # Sample next token next_token = sample_token(next_token_logits, temperature, top_p, top_k) # Stop if EOS if next_token == eos_token_id: break # Decode token token_text = tokenizer.decode([next_token]) generated_text += token_text # Yield for streaming yield generated_text # Append to input input_ids = np.concatenate([input_ids, [[next_token]]], axis=1) # Stop if we hit max length if input_ids.shape[1] >= 1024: break def chat_interface(message, history, temperature, top_p, top_k, max_tokens): """Gradio chat interface with streaming""" if not message.strip(): yield history return # Build conversation context from history (last 3 turns to save tokens) conversation = "" recent_history = history[-3:] if len(history) > 3 else history for user_msg, bot_msg in recent_history: conversation += f"User: {user_msg}\nSam: {bot_msg}\n" # Add current message full_prompt = conversation + message if conversation else message # Add user message to history immediately history.append([message, ""]) # Generate with streaming for response_chunk in generate_stream( full_prompt, max_new_tokens=max_tokens, temperature=temperature, top_p=top_p, top_k=top_k ): # Update the bot's response in history history[-1][1] = response_chunk yield history # ============================================================================== # Gradio Interface # ============================================================================== with gr.Blocks(theme=gr.themes.Soft(), title="Chat with Sam") as demo: gr.Markdown(""" # šŸ¤– Chat with Sam **Sam** is a fine-tuned language model trained on math, code, reasoning, and conversational data. ### ✨ Capabilities: - 🧮 **Math**: Solve arithmetic and word problems (trained on GSM8K) - šŸ’» **Code**: Write Python, JavaScript, and more (trained on CodeAlpaca) - šŸ¤” **Reasoning**: Show step-by-step thinking with `` tags - šŸ’¬ **Chat**: Natural conversations on any topic ### šŸ“Š Model Info: - **Architecture**: 768d, 16 layers, 12 heads (~100M parameters) - **Context**: 1024 tokens - **Training**: TPU v5e-8 on multi-dataset mix """) chatbot = gr.Chatbot( label="šŸ’¬ Conversation", height=450, show_copy_button=True, avatar_images=(None, "šŸ¤–"), ) with gr.Row(): msg = gr.Textbox( label="Your message", placeholder="Ask Sam anything... (e.g., 'What is 127 * 43?' or 'Write a function to sort a list')", lines=2, scale=4, autofocus=True ) submit = gr.Button("Send šŸš€", scale=1, variant="primary") with gr.Accordion("āš™ļø Generation Settings", open=False): with gr.Row(): temperature = gr.Slider( minimum=0.1, maximum=2.0, value=TEMPERATURE, step=0.1, label="Temperature", info="Higher = more creative/random" ) top_p = gr.Slider( minimum=0.1, maximum=1.0, value=TOP_P, step=0.05, label="Top-p", info="Nucleus sampling threshold" ) with gr.Row(): top_k = gr.Slider( minimum=1, maximum=100, value=TOP_K, step=1, label="Top-k", info="Vocabulary size limit" ) max_tokens = gr.Slider( minimum=50, maximum=512, value=MAX_NEW_TOKENS, step=50, label="Max tokens", info="Maximum response length" ) with gr.Row(): clear = gr.Button("šŸ—‘ļø Clear Chat") with gr.Accordion("šŸ’” Example Prompts", open=False): gr.Examples( examples=[ ["What is 127 * 43?"], ["Write a Python function to reverse a string"], ["Explain how photosynthesis works"], ["What's the capital of France?"], ["Write a haiku about coding"], ["How do I sort a list in Python?"], ], inputs=msg, label="Click to try:" ) gr.Markdown(""" --- ### šŸ“ Tips: - Sam uses conversational format: `User: ... Sam: ...` - Watch for `` tags showing reasoning process - Adjust temperature for more creative (higher) or focused (lower) responses - Model remembers last 3 conversation turns for context ### šŸ”— Links: - Model: [Smilyai-labs/Sam-1-large](https://huggingface.co/Smilyai-labs/Sam-1-large) - Training: TPU v5e-8 on Kaggle - Framework: TensorFlow/Keras """) # Event handlers def respond(message, chat_history, temperature, top_p, top_k, max_tokens): """Handle message and generate response""" # Add user message to history chat_history.append([message, None]) # Build conversation context from history (last 3 turns to save tokens) conversation = "" recent_history = chat_history[:-1][-3:] if len(chat_history) > 1 else [] for user_msg, bot_msg in recent_history: if bot_msg: # Only include completed turns conversation += f"User: {user_msg}\nSam: {bot_msg}\n" # Add current message full_prompt = conversation + message if conversation else message # Generate with streaming chat_history[-1][1] = "" for response_chunk in generate_stream( full_prompt, max_new_tokens=max_tokens, temperature=temperature, top_p=top_p, top_k=top_k ): chat_history[-1][1] = response_chunk yield chat_history msg.submit( respond, [msg, chatbot, temperature, top_p, top_k, max_tokens], chatbot ).then( lambda: gr.Textbox(value=""), None, msg ) submit.click( respond, [msg, chatbot, temperature, top_p, top_k, max_tokens], chatbot ).then( lambda: gr.Textbox(value=""), None, msg ) clear.click(lambda: None, None, chatbot, queue=False) # Launch if __name__ == "__main__": print("\n" + "="*70) print("šŸš€ STARTING SAM CHAT INTERFACE".center(70)) print("="*70) print(f"\nāœ… Model loaded from: {MODEL_REPO}") print(f"āœ… Vocab size: {tokenizer.get_vocab_size()}") print(f"āœ… Ready to chat!\n") demo.queue() # Enable streaming demo.launch()