Keeby-smilyai's picture
Update app.py
eb1a5a2 verified
# ==============================================================================
# HuggingFace Space - Sam Model Chat Interface with Streaming
# ==============================================================================
# Loads model directly from HuggingFace Hub: Smilyai-labs/Sam-1-large
# ==============================================================================
import gradio as gr
import tensorflow as tf
import keras
import numpy as np
from tokenizers import Tokenizer
from huggingface_hub import hf_hub_download
import os
# ==============================================================================
# Model Configuration
# ==============================================================================
MODEL_REPO = "Smilyai-labs/Sam-1-large" # Your HuggingFace model repo
MAX_NEW_TOKENS = 512
TEMPERATURE = 0.8
TOP_P = 0.9
TOP_K = 50
# ==============================================================================
# Custom Keras Layers (Must match training code)
# ==============================================================================
@keras.saving.register_keras_serializable()
class RotaryEmbedding(keras.layers.Layer):
def __init__(self, dim, max_len=2048, theta=10000, **kwargs):
super().__init__(**kwargs)
self.dim = dim
self.max_len = max_len
self.theta = theta
self.built_cache = False
def build(self, input_shape):
if not self.built_cache:
inv_freq = 1.0 / (self.theta ** (tf.range(0, self.dim, 2, dtype=tf.float32) / self.dim))
t = tf.range(self.max_len, dtype=tf.float32)
freqs = tf.einsum("i,j->ij", t, inv_freq)
emb = tf.concat([freqs, freqs], axis=-1)
self.cos_cached = tf.constant(tf.cos(emb), dtype=tf.float32)
self.sin_cached = tf.constant(tf.sin(emb), dtype=tf.float32)
self.built_cache = True
super().build(input_shape)
def rotate_half(self, x):
x1, x2 = tf.split(x, 2, axis=-1)
return tf.concat([-x2, x1], axis=-1)
def call(self, q, k):
seq_len = tf.shape(q)[2]
dtype = q.dtype
cos = tf.cast(self.cos_cached[:seq_len, :], dtype)[None, None, :, :]
sin = tf.cast(self.sin_cached[:seq_len, :], dtype)[None, None, :, :]
q_rotated = (q * cos) + (self.rotate_half(q) * sin)
k_rotated = (k * cos) + (self.rotate_half(k) * sin)
return q_rotated, k_rotated
def get_config(self):
config = super().get_config()
config.update({"dim": self.dim, "max_len": self.max_len, "theta": self.theta})
return config
@keras.saving.register_keras_serializable()
class RMSNorm(keras.layers.Layer):
def __init__(self, epsilon=1e-5, **kwargs):
super().__init__(**kwargs)
self.epsilon = epsilon
def build(self, input_shape):
self.scale = self.add_weight(name="scale", shape=(input_shape[-1],), initializer="ones")
def call(self, x):
variance = tf.reduce_mean(tf.square(x), axis=-1, keepdims=True)
return x * tf.math.rsqrt(variance + self.epsilon) * self.scale
def get_config(self):
config = super().get_config()
config.update({"epsilon": self.epsilon})
return config
@keras.saving.register_keras_serializable()
class TransformerBlock(keras.layers.Layer):
def __init__(self, d_model, n_heads, ff_dim, dropout, max_len, rope_theta, layer_idx=0, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.n_heads = n_heads
self.ff_dim = ff_dim
self.dropout_rate = dropout
self.max_len = max_len
self.rope_theta = rope_theta
self.head_dim = d_model // n_heads
self.layer_idx = layer_idx
self.pre_attn_norm = RMSNorm()
self.pre_ffn_norm = RMSNorm()
self.q_proj = keras.layers.Dense(d_model, use_bias=False, name="q_proj")
self.k_proj = keras.layers.Dense(d_model, use_bias=False, name="k_proj")
self.v_proj = keras.layers.Dense(d_model, use_bias=False, name="v_proj")
self.out_proj = keras.layers.Dense(d_model, use_bias=False, name="o_proj")
self.rope = RotaryEmbedding(self.head_dim, max_len=max_len, theta=rope_theta)
self.gate_proj = keras.layers.Dense(ff_dim, use_bias=False, name="gate_proj")
self.up_proj = keras.layers.Dense(ff_dim, use_bias=False, name="up_proj")
self.down_proj = keras.layers.Dense(d_model, use_bias=False, name="down_proj")
self.dropout = keras.layers.Dropout(dropout)
def call(self, x, training=None):
B, T, D = tf.shape(x)[0], tf.shape(x)[1], self.d_model
dtype = x.dtype
res = x
y = self.pre_attn_norm(x)
q = tf.transpose(tf.reshape(self.q_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
k = tf.transpose(tf.reshape(self.k_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
v = tf.transpose(tf.reshape(self.v_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
q, k = self.rope(q, k)
scores = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.head_dim, dtype))
mask = tf.where(
tf.linalg.band_part(tf.ones([T, T], dtype=dtype), -1, 0) == 0,
tf.constant(-1e9, dtype=dtype),
tf.constant(0.0, dtype=dtype)
)
scores += mask
attn = tf.matmul(tf.nn.softmax(scores, axis=-1), v)
attn = tf.reshape(tf.transpose(attn, [0, 2, 1, 3]), [B, T, D])
x = res + self.dropout(self.out_proj(attn), training=training)
res = x
y = self.pre_ffn_norm(x)
ffn = self.down_proj(keras.activations.silu(self.gate_proj(y)) * self.up_proj(y))
return res + self.dropout(ffn, training=training)
def get_config(self):
config = super().get_config()
config.update({
"d_model": self.d_model,
"n_heads": self.n_heads,
"ff_dim": self.ff_dim,
"dropout": self.dropout_rate,
"max_len": self.max_len,
"rope_theta": self.rope_theta,
"layer_idx": self.layer_idx
})
return config
@keras.saving.register_keras_serializable()
class SAM1Model(keras.Model):
def __init__(self, **kwargs):
super().__init__()
if 'config' in kwargs and isinstance(kwargs['config'], dict):
self.cfg = kwargs['config']
elif 'vocab_size' in kwargs:
self.cfg = kwargs
else:
self.cfg = kwargs.get('cfg', kwargs)
self.embed = keras.layers.Embedding(self.cfg['vocab_size'], self.cfg['d_model'], name="embed_tokens")
ff_dim = int(self.cfg['d_model'] * self.cfg['ff_mult'])
block_args = {
'd_model': self.cfg['d_model'],
'n_heads': self.cfg['n_heads'],
'ff_dim': ff_dim,
'dropout': self.cfg['dropout'],
'max_len': self.cfg['max_len'],
'rope_theta': self.cfg['rope_theta']
}
self.blocks = []
for i in range(self.cfg['n_layers']):
block = TransformerBlock(name=f"block_{i}", layer_idx=i, **block_args)
self.blocks.append(block)
self.norm = RMSNorm(name="final_norm")
self.lm_head = keras.layers.Dense(self.cfg['vocab_size'], use_bias=False, name="lm_head")
def call(self, input_ids, training=None):
x = self.embed(input_ids)
for block in self.blocks:
x = block(x, training=training)
return self.lm_head(self.norm(x))
def get_config(self):
base_config = super().get_config()
base_config['config'] = self.cfg
return base_config
# ==============================================================================
# Load Model and Tokenizer from HuggingFace Hub
# ==============================================================================
print("🔥 Loading Sam model from HuggingFace Hub...")
print(f" Repository: {MODEL_REPO}")
try:
# Download model file
print("📥 Downloading model.keras...")
model_path = hf_hub_download(
repo_id=MODEL_REPO,
filename="model.keras",
cache_dir="./model_cache"
)
print(f"✅ Model downloaded to: {model_path}")
# Download tokenizer
print("📥 Downloading tokenizer.json...")
tokenizer_path = hf_hub_download(
repo_id=MODEL_REPO,
filename="tokenizer.json",
cache_dir="./model_cache"
)
print(f"✅ Tokenizer downloaded to: {tokenizer_path}")
# Load tokenizer
tokenizer = Tokenizer.from_file(tokenizer_path)
eos_token = "<|endoftext|>"
eos_token_id = tokenizer.token_to_id(eos_token)
print(f"✅ Tokenizer loaded (vocab_size={tokenizer.get_vocab_size()})")
# Load model
print("🔄 Loading model into memory...")
model = keras.models.load_model(model_path)
print(f"✅ Model loaded successfully!")
except Exception as e:
print(f"❌ Error loading model: {e}")
print("\n💡 Troubleshooting:")
print("1. Make sure the model repo exists: https://huggingface.co/Smilyai-labs/Sam-1-large")
print("2. Check that model.keras and tokenizer.json are in the repo")
print("3. If repo is private, you may need to login: huggingface-cli login")
raise
# ==============================================================================
# Generation Functions
# ==============================================================================
def sample_token(logits, temperature=1.0, top_p=0.9, top_k=50):
"""Sample next token with temperature, top-p, and top-k"""
logits = logits / temperature
# Top-k filtering
if top_k > 0:
top_k_logits, top_k_indices = tf.nn.top_k(logits, k=min(top_k, logits.shape[-1]))
logits = tf.where(
tf.reduce_any(tf.equal(tf.expand_dims(tf.range(logits.shape[-1]), 0),
tf.expand_dims(top_k_indices, -1)), axis=1),
logits,
tf.fill(logits.shape, -1e10)
)
# Top-p (nucleus) filtering
if top_p < 1.0:
sorted_logits = tf.sort(logits, direction='DESCENDING')
sorted_probs = tf.nn.softmax(sorted_logits)
cumsum_probs = tf.cumsum(sorted_probs)
sorted_indices_to_remove = cumsum_probs > top_p
sorted_indices_to_remove = tf.concat([
[False],
sorted_indices_to_remove[:-1]
], axis=0)
sorted_indices = tf.argsort(logits, direction='DESCENDING')
indices_to_remove = tf.gather(sorted_indices_to_remove, tf.argsort(sorted_indices))
logits = tf.where(indices_to_remove, -1e10, logits)
# Sample
probs = tf.nn.softmax(logits)
next_token = tf.random.categorical(tf.math.log(probs[None, :]), num_samples=1)[0, 0]
return next_token.numpy()
def generate_stream(prompt, max_new_tokens=512, temperature=0.8, top_p=0.9, top_k=50):
"""Generate text with streaming (yields tokens as they're generated)"""
# Format prompt
formatted_prompt = f"User: {prompt}\nSam:"
# Tokenize
encoding = tokenizer.encode(formatted_prompt)
input_ids = np.array([encoding.ids], dtype=np.int32)
# Check if prompt is too long
if input_ids.shape[1] > 1000:
yield "❌ Error: Prompt is too long (max 1000 tokens)"
return
generated_text = ""
for _ in range(max_new_tokens):
# Get logits
logits = model(input_ids, training=False)
next_token_logits = logits[0, -1, :].numpy()
# Sample next token
next_token = sample_token(next_token_logits, temperature, top_p, top_k)
# Stop if EOS
if next_token == eos_token_id:
break
# Decode token
token_text = tokenizer.decode([next_token])
generated_text += token_text
# Yield for streaming
yield generated_text
# Append to input
input_ids = np.concatenate([input_ids, [[next_token]]], axis=1)
# Stop if we hit max length
if input_ids.shape[1] >= 1024:
break
def chat_interface(message, history, temperature, top_p, top_k, max_tokens):
"""Gradio chat interface with streaming"""
if not message.strip():
yield history
return
# Build conversation context from history (last 3 turns to save tokens)
conversation = ""
recent_history = history[-3:] if len(history) > 3 else history
for user_msg, bot_msg in recent_history:
conversation += f"User: {user_msg}\nSam: {bot_msg}\n"
# Add current message
full_prompt = conversation + message if conversation else message
# Add user message to history immediately
history.append([message, ""])
# Generate with streaming
for response_chunk in generate_stream(
full_prompt,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k
):
# Update the bot's response in history
history[-1][1] = response_chunk
yield history
# ==============================================================================
# Gradio Interface
# ==============================================================================
with gr.Blocks(theme=gr.themes.Soft(), title="Chat with Sam") as demo:
gr.Markdown("""
# 🤖 Chat with Sam
**Sam** is a fine-tuned language model trained on math, code, reasoning, and conversational data.
### ✨ Capabilities:
- 🧮 **Math**: Solve arithmetic and word problems (trained on GSM8K)
- 💻 **Code**: Write Python, JavaScript, and more (trained on CodeAlpaca)
- 🤔 **Reasoning**: Show step-by-step thinking with `<think>` tags
- 💬 **Chat**: Natural conversations on any topic
### 📊 Model Info:
- **Architecture**: 768d, 16 layers, 12 heads (~100M parameters)
- **Context**: 1024 tokens
- **Training**: TPU v5e-8 on multi-dataset mix
""")
chatbot = gr.Chatbot(
label="💬 Conversation",
height=450,
show_copy_button=True,
avatar_images=(None, "🤖"),
)
with gr.Row():
msg = gr.Textbox(
label="Your message",
placeholder="Ask Sam anything... (e.g., 'What is 127 * 43?' or 'Write a function to sort a list')",
lines=2,
scale=4,
autofocus=True
)
submit = gr.Button("Send 🚀", scale=1, variant="primary")
with gr.Accordion("⚙️ Generation Settings", open=False):
with gr.Row():
temperature = gr.Slider(
minimum=0.1,
maximum=2.0,
value=TEMPERATURE,
step=0.1,
label="Temperature",
info="Higher = more creative/random"
)
top_p = gr.Slider(
minimum=0.1,
maximum=1.0,
value=TOP_P,
step=0.05,
label="Top-p",
info="Nucleus sampling threshold"
)
with gr.Row():
top_k = gr.Slider(
minimum=1,
maximum=100,
value=TOP_K,
step=1,
label="Top-k",
info="Vocabulary size limit"
)
max_tokens = gr.Slider(
minimum=50,
maximum=512,
value=MAX_NEW_TOKENS,
step=50,
label="Max tokens",
info="Maximum response length"
)
with gr.Row():
clear = gr.Button("🗑️ Clear Chat")
with gr.Accordion("💡 Example Prompts", open=False):
gr.Examples(
examples=[
["What is 127 * 43?"],
["Write a Python function to reverse a string"],
["Explain how photosynthesis works"],
["What's the capital of France?"],
["Write a haiku about coding"],
["How do I sort a list in Python?"],
],
inputs=msg,
label="Click to try:"
)
gr.Markdown("""
---
### 📝 Tips:
- Sam uses conversational format: `User: ... Sam: ...`
- Watch for `<think>` tags showing reasoning process
- Adjust temperature for more creative (higher) or focused (lower) responses
- Model remembers last 3 conversation turns for context
### 🔗 Links:
- Model: [Smilyai-labs/Sam-1-large](https://huggingface.co/Smilyai-labs/Sam-1-large)
- Training: TPU v5e-8 on Kaggle
- Framework: TensorFlow/Keras
""")
# Event handlers
def respond(message, chat_history, temperature, top_p, top_k, max_tokens):
"""Handle message and generate response"""
# Add user message to history
chat_history.append([message, None])
# Build conversation context from history (last 3 turns to save tokens)
conversation = ""
recent_history = chat_history[:-1][-3:] if len(chat_history) > 1 else []
for user_msg, bot_msg in recent_history:
if bot_msg: # Only include completed turns
conversation += f"User: {user_msg}\nSam: {bot_msg}\n"
# Add current message
full_prompt = conversation + message if conversation else message
# Generate with streaming
chat_history[-1][1] = ""
for response_chunk in generate_stream(
full_prompt,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k
):
chat_history[-1][1] = response_chunk
yield chat_history
msg.submit(
respond,
[msg, chatbot, temperature, top_p, top_k, max_tokens],
chatbot
).then(
lambda: gr.Textbox(value=""),
None,
msg
)
submit.click(
respond,
[msg, chatbot, temperature, top_p, top_k, max_tokens],
chatbot
).then(
lambda: gr.Textbox(value=""),
None,
msg
)
clear.click(lambda: None, None, chatbot, queue=False)
# Launch
if __name__ == "__main__":
print("\n" + "="*70)
print("🚀 STARTING SAM CHAT INTERFACE".center(70))
print("="*70)
print(f"\n✅ Model loaded from: {MODEL_REPO}")
print(f"✅ Vocab size: {tokenizer.get_vocab_size()}")
print(f"✅ Ready to chat!\n")
demo.queue() # Enable streaming
demo.launch()