Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| import spaces | |
| import gradio as gr | |
| import sys | |
| import platform | |
| import diffusers | |
| import transformers | |
| import psutil | |
| import os | |
| import time | |
| from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig | |
| from diffusers import ZImagePipeline, AutoModel | |
| from transformers import BitsAndBytesConfig as TransformersBitsAndBytesConfig | |
| latent_history = [] | |
| # ============================================================ | |
| # LOGGING BUFFER | |
| # ============================================================ | |
| LOGS = "" | |
| def log(msg): | |
| global LOGS | |
| print(msg) | |
| LOGS += msg + "\n" | |
| return msg | |
| # ============================================================ | |
| # SYSTEM METRICS β LIVE GPU + CPU MONITORING | |
| # ============================================================ | |
| def log_system_stats(tag=""): | |
| try: | |
| log(f"\n===== π₯ SYSTEM STATS {tag} =====") | |
| # ============= GPU STATS ============= | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated(0) / 1e9 | |
| reserved = torch.cuda.memory_reserved(0) / 1e9 | |
| total = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| free = total - allocated | |
| log(f"π GPU Total : {total:.2f} GB") | |
| log(f"π GPU Allocated : {allocated:.2f} GB") | |
| log(f"π GPU Reserved : {reserved:.2f} GB") | |
| log(f"π GPU Free : {free:.2f} GB") | |
| # ============= CPU STATS ============ | |
| cpu = psutil.cpu_percent() | |
| ram_used = psutil.virtual_memory().used / 1e9 | |
| ram_total = psutil.virtual_memory().total / 1e9 | |
| log(f"π§ CPU Usage : {cpu}%") | |
| log(f"π§ RAM Used : {ram_used:.2f} GB / {ram_total:.2f} GB") | |
| except Exception as e: | |
| log(f"β οΈ Failed to log system stats: {e}") | |
| # ============================================================ | |
| # ENVIRONMENT INFO | |
| # ============================================================ | |
| log("===================================================") | |
| log("π Z-IMAGE-TURBO DEBUGGING + LIVE METRIC LOGGER") | |
| log("===================================================\n") | |
| log(f"π PYTHON VERSION : {sys.version.replace(chr(10),' ')}") | |
| log(f"π PLATFORM : {platform.platform()}") | |
| log(f"π TORCH VERSION : {torch.__version__}") | |
| log(f"π TRANSFORMERS VERSION : {transformers.__version__}") | |
| log(f"π DIFFUSERS VERSION : {diffusers.__version__}") | |
| log(f"π CUDA AVAILABLE : {torch.cuda.is_available()}") | |
| log_system_stats("AT STARTUP") | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError("β CUDA Required") | |
| device = "cuda" | |
| gpu_id = 0 | |
| # ============================================================ | |
| # MODEL SETTINGS | |
| # ============================================================ | |
| model_cache = "./weights/" | |
| model_id = "Tongyi-MAI/Z-Image-Turbo" | |
| torch_dtype = torch.bfloat16 | |
| USE_CPU_OFFLOAD = False | |
| log("\n===================================================") | |
| log("π§ MODEL CONFIGURATION") | |
| log("===================================================") | |
| log(f"Model ID : {model_id}") | |
| log(f"Model Cache Directory : {model_cache}") | |
| log(f"torch_dtype : {torch_dtype}") | |
| log(f"USE_CPU_OFFLOAD : {USE_CPU_OFFLOAD}") | |
| log_system_stats("BEFORE TRANSFORMER LOAD") | |
| # ============================================================ | |
| # FUNCTION TO CONVERT LATENTS TO IMAGE | |
| # ============================================================ | |
| def latent_to_image(latent): | |
| try: | |
| img_tensor = pipe.vae.decode(latent) | |
| img_tensor = (img_tensor / 2 + 0.5).clamp(0, 1) | |
| pil_img = T.ToPILImage()(img_tensor[0]) | |
| return pil_img | |
| except Exception as e: | |
| log(f"β οΈ Failed to decode latent: {e}") | |
| return None | |
| # ============================================================ | |
| # SAFE TRANSFORMER INSPECTION | |
| # ============================================================ | |
| def inspect_transformer(model, name): | |
| log(f"\nπ Inspecting {name}") | |
| try: | |
| candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] | |
| blocks = None | |
| for attr in candidates: | |
| if hasattr(model, attr): | |
| blocks = getattr(model, attr) | |
| break | |
| if blocks is None: | |
| log(f"β οΈ No block structure found in {name}") | |
| return | |
| if hasattr(blocks, "__len__"): | |
| log(f"Total Blocks = {len(blocks)}") | |
| else: | |
| log("β οΈ Blocks exist but are not iterable") | |
| for i in range(min(10, len(blocks) if hasattr(blocks, "__len__") else 0)): | |
| log(f"Block {i} = {blocks[i].__class__.__name__}") | |
| except Exception as e: | |
| log(f"β οΈ Transformer inspect error: {e}") | |
| # ============================================================ | |
| # LOAD TRANSFORMER β WITH LIVE STATS | |
| # ============================================================ | |
| log("\n===================================================") | |
| log("π§ LOADING TRANSFORMER BLOCK") | |
| log("===================================================") | |
| log("π Logging memory before load:") | |
| log_system_stats("START TRANSFORMER LOAD") | |
| try: | |
| quant_cfg = DiffusersBitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch_dtype, | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| transformer = AutoModel.from_pretrained( | |
| model_id, | |
| cache_dir=model_cache, | |
| subfolder="transformer", | |
| quantization_config=quant_cfg, | |
| torch_dtype=torch_dtype, | |
| device_map=device, | |
| ) | |
| log("β Transformer loaded successfully.") | |
| except Exception as e: | |
| log(f"β Transformer load failed: {e}") | |
| transformer = None | |
| log_system_stats("AFTER TRANSFORMER LOAD") | |
| if transformer: | |
| inspect_transformer(transformer, "Transformer") | |
| # ============================================================ | |
| # LOAD TEXT ENCODER | |
| # ============================================================ | |
| log("\n===================================================") | |
| log("π§ LOADING TEXT ENCODER") | |
| log("===================================================") | |
| log_system_stats("START TEXT ENCODER LOAD") | |
| try: | |
| quant_cfg2 = TransformersBitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch_dtype, | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| text_encoder = AutoModel.from_pretrained( | |
| model_id, | |
| cache_dir=model_cache, | |
| subfolder="text_encoder", | |
| quantization_config=quant_cfg2, | |
| torch_dtype=torch_dtype, | |
| device_map=device, | |
| ) | |
| log("β Text encoder loaded successfully.") | |
| except Exception as e: | |
| log(f"β Text encoder load failed: {e}") | |
| text_encoder = None | |
| log_system_stats("AFTER TEXT ENCODER LOAD") | |
| if text_encoder: | |
| inspect_transformer(text_encoder, "Text Encoder") | |
| # ============================================================ | |
| # BUILD PIPELINE | |
| # ============================================================ | |
| log("\n===================================================") | |
| log("π§ BUILDING PIPELINE") | |
| log("===================================================") | |
| log_system_stats("START PIPELINE BUILD") | |
| try: | |
| pipe = ZImagePipeline.from_pretrained( | |
| model_id, | |
| transformer=transformer, | |
| text_encoder=text_encoder, | |
| torch_dtype=torch_dtype, | |
| ) | |
| pipe.to(device) | |
| log("β Pipeline built successfully.") | |
| except Exception as e: | |
| log(f"β Pipeline build failed: {e}") | |
| pipe = None | |
| log_system_stats("AFTER PIPELINE BUILD") | |
| import torch | |
| from PIL import Image | |
| import io | |
| logs = [] | |
| latent_gallery = [] | |
| import torch | |
| from PIL import Image | |
| # Global log storage | |
| LOGS = [] | |
| def log(msg): | |
| LOGS.append(msg) | |
| print(msg) | |
| def generate_image(prompt, height, width, steps, seed, guidance_scale=0.0, return_latents=False): | |
| """ | |
| Generate an image from a prompt. | |
| Tries advanced latent-based method; falls back to standard pipeline if anything fails. | |
| """ | |
| try: | |
| generator = torch.Generator(device).manual_seed(int(seed)) | |
| # Try advanced latent preparation | |
| try: | |
| batch_size = 1 | |
| num_channels_latents = getattr(pipe.unet, "in_channels", None) | |
| if num_channels_latents is None: | |
| raise AttributeError("pipe.unet.in_channels not found, fallback to standard pipeline") | |
| latents = pipe.prepare_latents( | |
| batch_size=batch_size, | |
| num_channels=num_channels_latents, | |
| height=height, | |
| width=width, | |
| dtype=torch.float32, | |
| device=device, | |
| generator=generator | |
| ) | |
| log(f"β Latents prepared: {latents.shape}") | |
| # Generate image using prepared latents | |
| output = pipe( | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| num_inference_steps=steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator, | |
| latents=latents | |
| ) | |
| except Exception as e_inner: | |
| # If advanced method fails, fallback to standard pipeline | |
| log(f"β οΈ Advanced latent method failed: {e_inner}") | |
| log("π Falling back to standard pipeline...") | |
| output = pipe( | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| num_inference_steps=steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator | |
| ) | |
| image = output.images[0] | |
| log("β Inference finished successfully.") | |
| if return_latents and 'latents' in locals(): | |
| return image, latents, LOGS | |
| else: | |
| return image, LOGS | |
| except Exception as e: | |
| log(f"β Inference failed entirely: {e}") | |
| return None, LOGS | |
| # ============================================================ | |
| # UI | |
| # ============================================================ | |
| with gr.Blocks(title="Z-Image-Turbo Generator") as demo: | |
| gr.Markdown("# **π Z-Image-Turbo β Final Image & Latents**") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt = gr.Textbox(label="Prompt", value="Realistic mid-aged male image") | |
| height = gr.Slider(256, 2048, value=1024, step=8, label="Height") | |
| width = gr.Slider(256, 2048, value=1024, step=8, label="Width") | |
| steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps") | |
| seed = gr.Number(value=42, label="Seed") | |
| run_btn = gr.Button("Generate Image") | |
| with gr.Column(scale=1): | |
| final_image = gr.Image(label="Final Image") | |
| latent_gallery = gr.Gallery( | |
| label="Latent Steps", | |
| columns=4, | |
| height=256, | |
| preview=True | |
| ) | |
| logs_box = gr.Textbox(label="Logs", lines=15) | |
| run_btn.click( | |
| generate_image, | |
| inputs=[prompt, height, width, steps, seed], | |
| outputs=[final_image, latent_gallery, logs_box] | |
| ) | |
| demo.launch() |