| | """Google Colab notebook generator for model merging, quantization, and deployment.""" |
| |
|
| | import json |
| | from typing import Optional |
| | from .config_generator import MergeConfig, generate_yaml, MERGE_METHODS |
| |
|
| |
|
| | def _cell(source: str, cell_type: str = "code") -> dict: |
| | """Create a notebook cell.""" |
| | return { |
| | "cell_type": cell_type, |
| | "metadata": {}, |
| | "source": source.split("\n"), |
| | "outputs": [] if cell_type == "code" else [], |
| | **({"execution_count": None} if cell_type == "code" else {}), |
| | } |
| |
|
| |
|
| | def _md(text: str) -> dict: |
| | return _cell(text, "markdown") |
| |
|
| |
|
| | def generate_merge_notebook( |
| | config: MergeConfig, |
| | output_model_name: str = "", |
| | hf_username: str = "", |
| | include_quantize: bool = True, |
| | include_deploy: bool = True, |
| | quant_types: Optional[list[str]] = None, |
| | ) -> dict: |
| | """Generate a complete Colab notebook for merging models. |
| | |
| | Args: |
| | config: MergeConfig with all merge parameters |
| | output_model_name: Name for the merged model (e.g., "My-Merged-7B") |
| | hf_username: HF username for upload (e.g., "AIencoder") |
| | include_quantize: Include GGUF quantization cells |
| | include_deploy: Include HF Space deployment cells |
| | quant_types: List of quantization types (default: ["Q5_K_M", "Q4_K_M"]) |
| | |
| | Returns: |
| | Complete notebook dict (nbformat v4) |
| | """ |
| | if quant_types is None: |
| | quant_types = ["Q5_K_M", "Q4_K_M"] |
| |
|
| | if not output_model_name: |
| | output_model_name = "ForgeKit-Merged-Model" |
| |
|
| | yaml_config = generate_yaml(config) |
| | method_info = MERGE_METHODS.get(config.method, {}) |
| |
|
| | |
| | ram_note = "" |
| | if config.models: |
| | n_models = len(config.models) |
| | |
| | if any("14b" in m.lower() or "13b" in m.lower() for m in config.models): |
| | ram_note = "β οΈ 14B models need **High-RAM runtime** (48GB). Go to Runtime β Change runtime β High-RAM." |
| | elif any("70b" in m.lower() for m in config.models): |
| | ram_note = "β οΈ 70B models need **A100 GPU** (Colab Pro+). This won't work on free tier." |
| | elif any("7b" in m.lower() or "8b" in m.lower() for m in config.models): |
| | ram_note = "π‘ 7-8B models work on **High-RAM CPU** runtime (free tier). No GPU needed." |
| |
|
| | cells = [] |
| |
|
| | |
| | cells.append(_md(f"""# π₯ ForgeKit β Model Merge Notebook |
| | |
| | **Generated by [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** |
| | |
| | This notebook will: |
| | 1. β
Install mergekit and dependencies |
| | 2. β
Merge your selected models using **{method_info.get('name', config.method)}** |
| | 3. {'β
' if include_quantize else 'β¬'} Quantize to GGUF format |
| | 4. {'β
' if include_deploy else 'β¬'} Upload to HuggingFace Hub |
| | |
| | **Models being merged:** |
| | {chr(10).join(f'- `{m}`' for m in config.models)} |
| | |
| | **Method:** {method_info.get('name', config.method)} β {method_info.get('description', '')} |
| | |
| | {ram_note} |
| | |
| | --- |
| | β‘ **Quick Start:** Click **Runtime β Run all** to execute everything.""")) |
| |
|
| | |
| | cells.append(_md("## 1οΈβ£ Install Dependencies")) |
| | cells.append(_cell("""# Install mergekit and dependencies |
| | !pip install -q mergekit[all] huggingface_hub transformers accelerate |
| | !pip install -q pyyaml sentencepiece protobuf |
| | |
| | print("β
All dependencies installed!")""")) |
| |
|
| | |
| | cells.append(_md("## 2οΈβ£ HuggingFace Login\nRequired for downloading gated models and uploading your merge.")) |
| | cells.append(_cell("""from huggingface_hub import notebook_login |
| | notebook_login()""")) |
| |
|
| | |
| | cells.append(_md(f"""## 3οΈβ£ Merge Configuration |
| | |
| | Your merge config (auto-generated by ForgeKit). Edit the YAML below if you want to tweak weights or parameters.""")) |
| |
|
| | escaped_yaml = yaml_config.replace('"', '\\"') |
| | cells.append(_cell(f"""# === CONFIGURATION === |
| | MODEL_NAME = "{output_model_name}" |
| | USERNAME = "{hf_username}" # Change to your HF username |
| | |
| | YAML_CONFIG = \"\"\" |
| | {yaml_config}\"\"\" |
| | |
| | # Display the config |
| | print("π Merge Configuration:") |
| | print("=" * 50) |
| | print(YAML_CONFIG) |
| | print("=" * 50) |
| | print(f"\\nπ¦ Output: {{USERNAME}}/{{MODEL_NAME}}" if USERNAME else f"\\nπ¦ Output: {{MODEL_NAME}}")""")) |
| |
|
| | |
| | cells.append(_md("""## 4οΈβ£ Execute Merge |
| | |
| | This is the main merge step. Time depends on model sizes: |
| | | Size | Estimated Time | |
| | |------|---------------| |
| | | 1-3B | 5-15 min | |
| | | 7B | 15-30 min | |
| | | 14B | 30-60 min |""")) |
| |
|
| | cells.append(_cell("""import yaml |
| | import os |
| | import time |
| | |
| | # Write config to file |
| | with open("merge_config.yaml", "w") as f: |
| | f.write(YAML_CONFIG) |
| | |
| | # Create output directory |
| | os.makedirs("merged_model", exist_ok=True) |
| | |
| | print("π₯ Starting merge...") |
| | print(f" Method: {yaml.safe_load(YAML_CONFIG).get('merge_method', 'unknown')}") |
| | print(f" Models: {len(yaml.safe_load(YAML_CONFIG).get('models', []))}") |
| | print() |
| | |
| | start = time.time() |
| | |
| | # Run mergekit |
| | !mergekit-yaml merge_config.yaml merged_model --copy-tokenizer --allow-crimes --lazy-unpickle |
| | |
| | elapsed = time.time() - start |
| | print(f"\\nβ
Merge complete in {elapsed/60:.1f} minutes!") |
| | print(f"π Output: ./merged_model/") |
| | |
| | # Show output size |
| | total = sum( |
| | os.path.getsize(os.path.join("merged_model", f)) |
| | for f in os.listdir("merged_model") |
| | if os.path.isfile(os.path.join("merged_model", f)) |
| | ) |
| | print(f"πΎ Total size: {total / (1024**3):.2f} GB")""")) |
| |
|
| | |
| | cells.append(_md("## 5οΈβ£ Quick Test\nVerify the merged model loads and generates text.")) |
| | cells.append(_cell("""from transformers import AutoTokenizer, AutoModelForCausalLM |
| | import torch |
| | |
| | print("π§ͺ Loading merged model for testing...") |
| | |
| | tokenizer = AutoTokenizer.from_pretrained("merged_model", trust_remote_code=True) |
| | model = AutoModelForCausalLM.from_pretrained( |
| | "merged_model", |
| | torch_dtype=torch.bfloat16, |
| | device_map="auto", |
| | trust_remote_code=True, |
| | ) |
| | |
| | # Test prompts |
| | test_prompts = [ |
| | "Write a Python function to calculate fibonacci numbers:", |
| | "Explain what machine learning is in simple terms:", |
| | "What is 15 * 23 + 7?", |
| | ] |
| | |
| | print("\\n" + "=" * 60) |
| | for prompt in test_prompts: |
| | inputs = tokenizer(prompt, return_tensors="pt").to(model.device) |
| | with torch.no_grad(): |
| | output = model.generate( |
| | **inputs, |
| | max_new_tokens=100, |
| | do_sample=False, |
| | temperature=1.0, |
| | ) |
| | response = tokenizer.decode(output[0], skip_special_tokens=True) |
| | print(f"\\nπ Prompt: {prompt}") |
| | print(f"π€ Response: {response[len(prompt):].strip()[:200]}...") |
| | print("-" * 60) |
| | |
| | print("\\nβ
Model test complete!") |
| | |
| | # Clean up GPU memory |
| | del model |
| | torch.cuda.empty_cache() if torch.cuda.is_available() else None""")) |
| |
|
| | |
| | cells.append(_md("## 6οΈβ£ Upload to HuggingFace Hub")) |
| |
|
| | model_card = _generate_model_card(config, output_model_name, hf_username) |
| | escaped_card = model_card.replace('"""', '\\"\\"\\"') |
| |
|
| | cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo |
| | |
| | REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME |
| | |
| | # Create repo |
| | try: |
| | create_repo(REPO_ID, exist_ok=True, repo_type="model") |
| | print(f"π¦ Repo ready: https://huggingface.co/{{REPO_ID}}") |
| | except Exception as e: |
| | print(f"β οΈ Repo creation: {{e}}") |
| | |
| | # Write model card |
| | MODEL_CARD = \"\"\"{model_card}\"\"\" |
| | |
| | with open("merged_model/README.md", "w") as f: |
| | f.write(MODEL_CARD) |
| | |
| | # Upload |
| | api = HfApi() |
| | print("β¬οΈ Uploading merged model (this may take a while)...") |
| | api.upload_folder( |
| | repo_id=REPO_ID, |
| | folder_path="merged_model", |
| | commit_message=f"Upload {{MODEL_NAME}} merged with ForgeKit", |
| | ) |
| | print(f"\\nβ
Model uploaded!") |
| | print(f"π https://huggingface.co/{{REPO_ID}}")""")) |
| |
|
| | |
| | if include_quantize: |
| | cells.append(_md(f"""## 7οΈβ£ Quantize to GGUF |
| | |
| | Convert to GGUF format for use with llama.cpp, Ollama, LM Studio, etc. |
| | |
| | **Quantization types:** {', '.join(quant_types)}""")) |
| |
|
| | quant_cmds = "\n".join( |
| | f' !./llama.cpp/llama-quantize model-f16.gguf {output_model_name}-{q}.gguf {q}\n' |
| | f' print(f"β
{q} done: {output_model_name}-{q}.gguf")' |
| | for q in quant_types |
| | ) |
| |
|
| | cells.append(_cell(f"""import os |
| | |
| | print("π¦ Setting up llama.cpp for GGUF conversion...") |
| | |
| | # Clone and build llama.cpp |
| | if not os.path.exists("llama.cpp"): |
| | !git clone --depth 1 https://github.com/ggerganov/llama.cpp |
| | !cd llama.cpp && make -j$(nproc) llama-quantize |
| | |
| | # Install conversion deps |
| | !pip install -q gguf |
| | |
| | # Convert to f16 GGUF first |
| | print("\\nπ Converting to GGUF (f16)...") |
| | !python llama.cpp/convert_hf_to_gguf.py merged_model --outfile model-f16.gguf --outtype f16 |
| | |
| | # Quantize to each target |
| | print("\\nποΈ Quantizing...") |
| | if os.path.exists("model-f16.gguf"): |
| | {quant_cmds} |
| | |
| | # Show file sizes |
| | print("\\nπ Output sizes:") |
| | for f in os.listdir("."): |
| | if f.endswith(".gguf"): |
| | size_gb = os.path.getsize(f) / (1024**3) |
| | print(f" {{f}}: {{size_gb:.2f}} GB") |
| | else: |
| | print("β f16 conversion failed. Check errors above.")""")) |
| |
|
| | |
| | cells.append(_cell(f"""# Upload GGUF files to the same repo |
| | import os |
| | from huggingface_hub import HfApi |
| | |
| | api = HfApi() |
| | REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME |
| | |
| | gguf_files = [f for f in os.listdir(".") if f.endswith(".gguf") and f != "model-f16.gguf"] |
| | |
| | for gf in gguf_files: |
| | print(f"β¬οΈ Uploading {{gf}}...") |
| | api.upload_file( |
| | path_or_fileobj=gf, |
| | path_in_repo=gf, |
| | repo_id=REPO_ID, |
| | ) |
| | print(f" β
Done") |
| | |
| | print(f"\\nπ All GGUF files uploaded to https://huggingface.co/{{REPO_ID}}")""")) |
| |
|
| | |
| | if include_deploy: |
| | cells.append(_md("""## 8οΈβ£ Deploy to HuggingFace Space |
| | |
| | Create a Gradio chat Space running your merged model.""")) |
| |
|
| | cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo |
| | |
| | SPACE_ID = f"{{USERNAME}}/{{MODEL_NAME}}-chat" if USERNAME else f"{{MODEL_NAME}}-chat" |
| | REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME |
| | |
| | # Create Space |
| | try: |
| | create_repo(SPACE_ID, repo_type="space", space_sdk="gradio", exist_ok=True) |
| | print(f"π Space created: https://huggingface.co/spaces/{{SPACE_ID}}") |
| | except Exception as e: |
| | print(f"β οΈ {{e}}") |
| | |
| | # Generate app.py |
| | APP_CODE = '''import gradio as gr |
| | from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer |
| | import torch |
| | from threading import Thread |
| | |
| | MODEL_ID = "{hf_username}/{output_model_name}" if "{hf_username}" else "{output_model_name}" |
| | |
| | tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) |
| | model = AutoModelForCausalLM.from_pretrained( |
| | MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True |
| | ) |
| | |
| | def chat(message, history): |
| | messages = [] |
| | for h in history: |
| | messages.append({{"role": "user", "content": h[0]}}) |
| | if h[1]: |
| | messages.append({{"role": "assistant", "content": h[1]}}) |
| | messages.append({{"role": "user", "content": message}}) |
| | |
| | text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| | inputs = tokenizer(text, return_tensors="pt").to(model.device) |
| | streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) |
| | |
| | thread = Thread(target=model.generate, kwargs={{ |
| | **inputs, "max_new_tokens": 512, "streamer": streamer, "do_sample": True, "temperature": 0.7 |
| | }}) |
| | thread.start() |
| | |
| | response = "" |
| | for token in streamer: |
| | response += token |
| | yield response |
| | |
| | demo = gr.ChatInterface(chat, title="π₯ {output_model_name}", description="Merged with ForgeKit") |
| | demo.launch() |
| | ''' |
| | |
| | api = HfApi() |
| | |
| | # Upload app.py |
| | api.upload_file( |
| | path_or_fileobj=APP_CODE.encode(), |
| | path_in_repo="app.py", |
| | repo_id=SPACE_ID, |
| | repo_type="space", |
| | ) |
| | |
| | # Upload requirements.txt |
| | reqs = "transformers\\ntorch\\naccelerate\\nsentencepiece\\nprotobuf" |
| | api.upload_file( |
| | path_or_fileobj=reqs.encode(), |
| | path_in_repo="requirements.txt", |
| | repo_id=SPACE_ID, |
| | repo_type="space", |
| | ) |
| | |
| | print(f"\\nπ Space deployed!") |
| | print(f"π https://huggingface.co/spaces/{{SPACE_ID}}") |
| | print(f"\\nβ³ It may take a few minutes to build and start.")""")) |
| |
|
| | |
| | cells.append(_md(f"""## π All Done! |
| | |
| | Your merged model **{output_model_name}** is ready. Here's what was created: |
| | |
| | | Output | Link | |
| | |--------|------| |
| | | Model | `https://huggingface.co/{hf_username or 'YOUR_USERNAME'}/{output_model_name}` | |
| | {'| GGUF Files | Same repo (quantized versions) |' if include_quantize else ''} |
| | {'| Chat Space | `https://huggingface.co/spaces/' + (hf_username or 'YOUR_USERNAME') + '/' + output_model_name + '-chat` |' if include_deploy else ''} |
| | |
| | --- |
| | |
| | **Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** β Forge your perfect AI model π₯""")) |
| |
|
| | |
| | notebook = { |
| | "nbformat": 4, |
| | "nbformat_minor": 5, |
| | "metadata": { |
| | "kernelspec": { |
| | "display_name": "Python 3", |
| | "language": "python", |
| | "name": "python3", |
| | }, |
| | "language_info": {"name": "python", "version": "3.10.0"}, |
| | "colab": { |
| | "provenance": [], |
| | "gpuType": "T4", |
| | }, |
| | "accelerator": "GPU", |
| | }, |
| | "cells": cells, |
| | } |
| |
|
| | return notebook |
| |
|
| |
|
| | def _generate_model_card(config: MergeConfig, name: str, username: str) -> str: |
| | """Generate a model card README.md for the merged model.""" |
| | method_info = MERGE_METHODS.get(config.method, {}) |
| | models_list = "\n".join(f"- [{m}](https://huggingface.co/{m})" for m in config.models) |
| | base_link = f"[{config.base_model}](https://huggingface.co/{config.base_model})" if config.base_model else "N/A" |
| |
|
| | return f"""--- |
| | tags: |
| | - merge |
| | - mergekit |
| | - forgekit |
| | base_model: {config.base_model or config.models[0] if config.models else ''} |
| | license: apache-2.0 |
| | --- |
| | |
| | # {name} |
| | |
| | This model was created using **[ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** β an open-source model merging platform. |
| | |
| | ## Merge Details |
| | |
| | | Parameter | Value | |
| | |-----------|-------| |
| | | **Method** | {method_info.get('name', config.method)} | |
| | | **Base Model** | {base_link} | |
| | | **dtype** | {config.dtype} | |
| | |
| | ### Source Models |
| | |
| | {models_list} |
| | |
| | ### Configuration |
| | |
| | ```yaml |
| | {generate_yaml(config)} |
| | ``` |
| | |
| | ## Usage |
| | |
| | ```python |
| | from transformers import AutoTokenizer, AutoModelForCausalLM |
| | |
| | tokenizer = AutoTokenizer.from_pretrained("{username}/{name}" if "{username}" else "{name}") |
| | model = AutoModelForCausalLM.from_pretrained("{username}/{name}" if "{username}" else "{name}") |
| | ``` |
| | |
| | --- |
| | |
| | *Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)* π₯ |
| | """ |
| |
|
| |
|
| | def notebook_to_json(notebook: dict) -> str: |
| | """Serialize notebook to JSON string.""" |
| | return json.dumps(notebook, indent=2, ensure_ascii=False) |
| |
|
| |
|
| | def save_notebook(notebook: dict, path: str): |
| | """Save notebook to .ipynb file.""" |
| | with open(path, "w", encoding="utf-8") as f: |
| | json.dump(notebook, f, indent=2, ensure_ascii=False) |
| |
|