#!/usr/bin/env python3 """Packed unary loader. Loads weights, passes pointers to C engine.""" import ctypes, os, sys, time, json import numpy as np from ctypes import c_int, c_float, c_void_p, POINTER, c_uint8, c_uint64 class PackedEngine: def __init__(self, model_dir, engine_path="./packed_engine.so"): self.lib = ctypes.CDLL(engine_path) self.lib.model_alloc.restype = c_void_p self.lib.forward_token.restype = POINTER(c_float) self.model_dir = model_dir with open(os.path.join(model_dir, "manifest.json")) as f: self.manifest = json.load(f) with open(os.path.join(model_dir, "config.json")) as f: self.config = json.load(f) self.arrays = [] # prevent GC self.model = self.lib.model_alloc() self._load_weights() def _keep(self, arr): self.arrays.append(arr) return arr.ctypes.data def _load_file(self, key, ext, dtype): path = os.path.join(self.model_dir, key.replace(".", "_") + ext) return np.fromfile(path, dtype=dtype) def _load_weights(self): t0 = time.time() fp16_keys = self.manifest["fp16"] packed_keys = self.manifest["packed"] # Embeddings emb = self._load_file("model.embed_tokens.weight", ".fp16", np.uint16) self.lib.model_set_embed(self.model, self._keep(emb)) print(f" Embeddings: {emb.nbytes/1e6:.1f} MB") # LM head lm = self._load_file("lm_head.weight", ".fp16", np.uint16) od, id_ = fp16_keys["lm_head.weight"] self.lib.model_set_lm_head(self.model, self._keep(lm), od, id_) print(f" LM head: {lm.nbytes/1e6:.1f} MB") # Final norm fn = self._load_file("model.norm.weight", ".fp16", np.uint16).astype(np.float32) # fp16 stored, convert fn_f16 = self._load_file("model.norm.weight", ".fp16", np.float16) fn = fn_f16.astype(np.float32) self.lib.model_set_final_norm(self.model, self._keep(fn)) n_layers = self.config["num_hidden_layers"] for l in range(n_layers): pfx = f"model.layers.{l}" # Norms in_f16 = self._load_file(f"{pfx}.input_layernorm.weight", ".fp16", np.float16) pn_f16 = self._load_file(f"{pfx}.post_attention_layernorm.weight", ".fp16", np.float16) in_f = in_f16.astype(np.float32) pn_f = pn_f16.astype(np.float32) self.lib.layer_set_norms(self.model, l, self._keep(in_f), self._keep(pn_f)) # Biases (Q/K/V) qb = kb = vb = None qb_key = f"{pfx}.self_attn.q_proj.bias" if qb_key in fp16_keys: qb_f16 = self._load_file(qb_key, ".fp16", np.float16) qb = qb_f16.astype(np.float32) kb_f16 = self._load_file(f"{pfx}.self_attn.k_proj.bias", ".fp16", np.float16) kb = kb_f16.astype(np.float32) vb_f16 = self._load_file(f"{pfx}.self_attn.v_proj.bias", ".fp16", np.float16) vb = vb_f16.astype(np.float32) self.lib.layer_set_bias(self.model, l, self._keep(qb), self._keep(kb), self._keep(vb)) else: self.lib.layer_set_bias(self.model, l, None, None, None) # 7 linear layers: q,k,v,o,gate,up,down args = [] for name in ['self_attn.q_proj','self_attn.k_proj','self_attn.v_proj', 'self_attn.o_proj','mlp.gate_proj','mlp.up_proj','mlp.down_proj']: key = f"{pfx}.{name}.weight" shape = packed_keys[key] od, id_ = shape mags = self._load_file(key, ".mags", np.uint8) signs = self._load_file(key, ".signs", np.uint64) scales = self._load_file(key, ".scales", np.float32) rmm = self._load_file(key, ".rmm", np.uint8) args.extend([self._keep(mags), self._keep(signs), self._keep(scales), self._keep(rmm), od, id_]) self.lib.layer_set_linears(self.model, l, *args) if (l+1) % 7 == 0 or l == n_layers-1: print(f" Loaded {l+1}/{n_layers} layers") dt = time.time() - t0 total = sum(a.nbytes for a in self.arrays) print(f"\nModel loaded in {dt:.1f}s, {total/1e6:.0f} MB in Python arrays") def generate(self, token_ids, max_new_tokens=100, temperature=0.6, top_p=0.9, eos_id=151643): prompt = (c_int * len(token_ids))(*token_ids) output = (c_int * max_new_tokens)() self.lib.model_reset_cache(self.model) t0 = time.time() n = self.lib.generate(self.model, prompt, len(token_ids), output, max_new_tokens, c_float(temperature), c_float(top_p), eos_id) dt = time.time() - t0 tokens = [output[i] for i in range(n)] return tokens, n, dt if __name__ == "__main__": from transformers import AutoTokenizer model_dir = sys.argv[1] if len(sys.argv) > 1 else "deepseek-r1-1.5b-packed" tok_dir = sys.argv[2] if len(sys.argv) > 2 else "deepseek-r1-1.5b-hf" print("Loading tokenizer...") tok = AutoTokenizer.from_pretrained(tok_dir, trust_remote_code=True) print("Loading packed unary engine...") engine = PackedEngine(model_dir, "./packed_engine.so") prompts = ["What is 2+2?", "Explain gravity in one sentence.", "Write a haiku about snow."] for prompt in prompts: msgs = [{"role": "user", "content": prompt}] ids = tok.apply_chat_template(msgs, add_generation_prompt=True) tokens, n, dt = engine.generate(ids, max_new_tokens=100, temperature=0.6) text = tok.decode(tokens, skip_special_tokens=False) print(f"\n[{prompt}] ({n} tok, {dt:.1f}s, {n/dt:.1f} tok/s)") print(text[:300]) print("---")