#!/usr/bin/env python3 """ Maaza Nano-Orchestrator 9.6M - Custom BPE Tokenizer Train a tool-focused tokenizer with 8k vocab. Key goal: Tool names become single tokens (maaza_extract_json = 1 token, not 5) """ import json import re from pathlib import Path from typing import List, Dict, Optional, Tuple from collections import Counter import argparse # ============================================================================ # SPECIAL TOKENS # ============================================================================ SPECIAL_TOKENS = [ "<|pad|>", "<|unk|>", "<|bos|>", "<|eos|>", "<|tool_start|>", "<|tool_end|>", "<|param_start|>", "<|param_end|>", "<|user|>", "<|assistant|>", "<|system|>", ] # Tool names as special tokens (single tokens) TOOL_TOKENS = [ # Core CycleCore "maaza_extract_json", "mcpbodega_deploy", "mcpbodega_list", "doom_mcp", "bitchat_send", "crypto_lookup", "scratchpad_mcp", "voice_mcp", # Web & Browser "web_search", "web_fetch", "puppeteer_navigate", "puppeteer_click", "puppeteer_screenshot", "puppeteer_extract", # Data & Files "file_read", "file_write", "database_query", "csv_parse", "json_validate", "image_caption", # Code & Compute "code_execute_python", "code_execute_js", "calculator", "regex_match", "shell_command", # External APIs "weather_lookup", "stock_lookup", "news_fetch", "email_send", "calendar_add", # New Tools (31-36) "mcpbodega_chat", "health_check", "slmbench_query", "slack_send", "github_issue", "cyclecore_terminal", ] # Common JSON/programming tokens JSON_TOKENS = [ '{"tool"', '"params"', '"action"', '"retry"', '"fallback"', "true", "false", "null", ] # Failure recovery action tokens RECOVERY_TOKENS = [ "retry", "fallback", "timeout", "rate_limit", "unavailable", "max_retries", "backoff", "exponential", "alternative", ] # ============================================================================ # BPE TOKENIZER IMPLEMENTATION # ============================================================================ class BPETokenizer: """Custom BPE tokenizer optimized for tool routing.""" def __init__(self, vocab_size: int = 8000): self.vocab_size = vocab_size self.vocab: Dict[str, int] = {} self.inverse_vocab: Dict[int, str] = {} self.merges: List[Tuple[str, str]] = [] # Initialize with special tokens self._init_special_tokens() def _init_special_tokens(self): """Initialize vocabulary with special tokens.""" idx = 0 # Add special tokens for token in SPECIAL_TOKENS: self.vocab[token] = idx self.inverse_vocab[idx] = token idx += 1 # Add tool tokens (critical for single-token tool names) for token in TOOL_TOKENS: self.vocab[token] = idx self.inverse_vocab[idx] = token idx += 1 # Add JSON tokens for token in JSON_TOKENS: self.vocab[token] = idx self.inverse_vocab[idx] = token idx += 1 # Add recovery tokens for token in RECOVERY_TOKENS: self.vocab[token] = idx self.inverse_vocab[idx] = token idx += 1 # Add basic ASCII characters for i in range(256): char = chr(i) if i >= 32 and i < 127 else f"<0x{i:02X}>" if char not in self.vocab: self.vocab[char] = idx self.inverse_vocab[idx] = char idx += 1 self.base_vocab_size = idx def _get_pairs(self, word: List[str]) -> Counter: """Get all adjacent pairs in word.""" pairs = Counter() for i in range(len(word) - 1): pairs[(word[i], word[i + 1])] += 1 return pairs def _merge_pair(self, pair: Tuple[str, str], word: List[str]) -> List[str]: """Merge a specific pair in the word.""" new_word = [] i = 0 while i < len(word): if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]: new_word.append(pair[0] + pair[1]) i += 2 else: new_word.append(word[i]) i += 1 return new_word def _tokenize_word(self, word: str) -> List[str]: """Tokenize a single word to characters.""" # Check if it's a special/tool token first if word in self.vocab: return [word] # Check for tool tokens within the word for tool in TOOL_TOKENS: if tool in word: parts = word.split(tool) result = [] for i, part in enumerate(parts): if part: result.extend(list(part)) if i < len(parts) - 1: result.append(tool) return result return list(word) def train(self, texts: List[str], verbose: bool = True): """Train BPE on a corpus of texts.""" if verbose: print(f"Training BPE tokenizer (target vocab: {self.vocab_size})") print(f" Base vocab size: {self.base_vocab_size}") # Build initial word frequencies word_freqs = Counter() for text in texts: # Pre-tokenize: split on whitespace and punctuation words = re.findall(r'\w+|[^\w\s]', text.lower()) word_freqs.update(words) # Convert words to character lists splits = {} for word, freq in word_freqs.items(): splits[word] = (self._tokenize_word(word), freq) # BPE merging num_merges = self.vocab_size - len(self.vocab) if verbose: print(f" Performing {num_merges} merges...") for merge_idx in range(num_merges): # Count all pairs pair_freqs = Counter() for word, (split, freq) in splits.items(): pairs = self._get_pairs(split) for pair, count in pairs.items(): pair_freqs[pair] += count * freq if not pair_freqs: break # Find most frequent pair best_pair = pair_freqs.most_common(1)[0][0] self.merges.append(best_pair) # Add merged token to vocab merged = best_pair[0] + best_pair[1] if merged not in self.vocab: idx = len(self.vocab) self.vocab[merged] = idx self.inverse_vocab[idx] = merged # Apply merge to all words for word in splits: split, freq = splits[word] splits[word] = (self._merge_pair(best_pair, split), freq) if verbose and (merge_idx + 1) % 500 == 0: print(f" Merge {merge_idx + 1}: '{best_pair[0]}' + '{best_pair[1]}' -> '{merged}'") if verbose: print(f" Final vocab size: {len(self.vocab)}") def encode(self, text: str) -> List[int]: """Encode text to token IDs.""" tokens = [] # First, extract special tokens as whole units # Build regex pattern for special tokens (escape special chars) special_pattern = '|'.join(re.escape(t) for t in SPECIAL_TOKENS) tool_pattern = '|'.join(re.escape(t) for t in TOOL_TOKENS) combined_pattern = f'({special_pattern}|{tool_pattern})' # Split text while preserving special/tool tokens parts = re.split(combined_pattern, text) for part in parts: if not part: continue # Check if this part is a special or tool token if part in self.vocab: tokens.append(self.vocab[part]) continue # Pre-tokenize the non-special part words = re.findall(r'\w+|[^\w\s]|\s+', part) for word in words: # Check for exact matches first if word in self.vocab: tokens.append(self.vocab[word]) continue # Lowercase for matching word_lower = word.lower() if word_lower in self.vocab: tokens.append(self.vocab[word_lower]) continue # Check for tool tokens in word found_tool = False for tool in TOOL_TOKENS: if tool in word_lower: parts_inner = word_lower.split(tool) for i, p in enumerate(parts_inner): if p: tokens.extend(self._encode_subword(p)) if i < len(parts_inner) - 1: tokens.append(self.vocab[tool]) found_tool = True break if found_tool: continue # Apply BPE to word tokens.extend(self._encode_subword(word_lower)) return tokens def _encode_subword(self, word: str) -> List[int]: """Apply BPE merges to encode a subword.""" if not word: return [] if word in self.vocab: return [self.vocab[word]] # Start with characters word_tokens = list(word) # Apply merges for pair in self.merges: i = 0 while i < len(word_tokens) - 1: if word_tokens[i] == pair[0] and word_tokens[i + 1] == pair[1]: word_tokens = word_tokens[:i] + [pair[0] + pair[1]] + word_tokens[i + 2:] else: i += 1 # Convert to IDs ids = [] for token in word_tokens: if token in self.vocab: ids.append(self.vocab[token]) else: # Unknown token - use ids.append(self.vocab["<|unk|>"]) return ids def decode(self, ids: List[int]) -> str: """Decode token IDs back to text.""" tokens = [self.inverse_vocab.get(i, "<|unk|>") for i in ids] text = "".join(tokens) # Clean up special tokens from output for special in SPECIAL_TOKENS: text = text.replace(special, "") return text def save(self, path: str): """Save tokenizer to file.""" data = { "vocab_size": self.vocab_size, "vocab": self.vocab, "merges": self.merges, "special_tokens": SPECIAL_TOKENS, "tool_tokens": TOOL_TOKENS, } with open(path, "w") as f: json.dump(data, f, indent=2) print(f"Tokenizer saved to {path}") @classmethod def load(cls, path: str) -> "BPETokenizer": """Load tokenizer from file.""" with open(path) as f: data = json.load(f) tokenizer = cls(vocab_size=data["vocab_size"]) tokenizer.vocab = data["vocab"] tokenizer.inverse_vocab = {int(v): k for k, v in data["vocab"].items()} tokenizer.merges = [tuple(m) for m in data["merges"]] return tokenizer def __len__(self): return len(self.vocab) def train_from_dataset(dataset_path: str, output_path: str = "tokenizer.json", vocab_size: int = 8000): """Train tokenizer from dataset file.""" print(f"Loading dataset from {dataset_path}") texts = [] with open(dataset_path) as f: for line in f: data = json.loads(line) texts.append(data["prompt"]) texts.append(json.dumps(data["tool_calls"])) print(f"Loaded {len(texts)} text samples") tokenizer = BPETokenizer(vocab_size=vocab_size) tokenizer.train(texts, verbose=True) tokenizer.save(output_path) # Test tokenization print("\n=== Tokenization Tests ===") test_cases = [ "extract the invoice details", '{"tool": "maaza_extract_json", "params": {"text": "test"}}', "puppeteer_navigate to google.com", "The crypto_lookup tool failed with timeout", "retry with exponential backoff", ] for text in test_cases: ids = tokenizer.encode(text) decoded = tokenizer.decode(ids) print(f"\nInput: '{text}'") print(f"Tokens: {ids}") print(f"Decoded: '{decoded}'") print(f"Length: {len(ids)} tokens") # Verify tool names are single tokens print("\n=== Tool Token Verification ===") for tool in TOOL_TOKENS[:5]: # Check first 5 ids = tokenizer.encode(tool) if len(ids) == 1: print(f"āœ“ {tool} = single token (ID: {ids[0]})") else: print(f"āœ— {tool} = {len(ids)} tokens: {ids}") return tokenizer if __name__ == "__main__": parser = argparse.ArgumentParser(description="Train custom BPE tokenizer") parser.add_argument("--input", required=True, help="Input dataset (JSONL)") parser.add_argument("--output", default="tokenizer.json", help="Output path") parser.add_argument("--vocab-size", type=int, default=8000, help="Vocabulary size") args = parser.parse_args() train_from_dataset( dataset_path=args.input, output_path=args.output, vocab_size=args.vocab_size ) print(f"\nāœ“ Tokenizer trained and saved to {args.output}") print(f"Next step: python model.py")