import torch import spaces import gradio as gr import sys import platform import diffusers import transformers import psutil import os import time from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig from diffusers import ZImagePipeline, AutoModel from transformers import BitsAndBytesConfig as TransformersBitsAndBytesConfig latent_history = [] # ============================================================ # LOGGING BUFFER # ============================================================ LOGS = "" def log(msg): global LOGS print(msg) LOGS += msg + "\n" return msg # ============================================================ # SYSTEM METRICS — LIVE GPU + CPU MONITORING # ============================================================ def log_system_stats(tag=""): try: log(f"\n===== šŸ”„ SYSTEM STATS {tag} =====") # ============= GPU STATS ============= if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated(0) / 1e9 reserved = torch.cuda.memory_reserved(0) / 1e9 total = torch.cuda.get_device_properties(0).total_memory / 1e9 free = total - allocated log(f"šŸ’  GPU Total : {total:.2f} GB") log(f"šŸ’  GPU Allocated : {allocated:.2f} GB") log(f"šŸ’  GPU Reserved : {reserved:.2f} GB") log(f"šŸ’  GPU Free : {free:.2f} GB") # ============= CPU STATS ============ cpu = psutil.cpu_percent() ram_used = psutil.virtual_memory().used / 1e9 ram_total = psutil.virtual_memory().total / 1e9 log(f"🧠 CPU Usage : {cpu}%") log(f"🧠 RAM Used : {ram_used:.2f} GB / {ram_total:.2f} GB") except Exception as e: log(f"āš ļø Failed to log system stats: {e}") # ============================================================ # ENVIRONMENT INFO # ============================================================ log("===================================================") log("šŸ” Z-IMAGE-TURBO DEBUGGING + LIVE METRIC LOGGER") log("===================================================\n") log(f"šŸ“Œ PYTHON VERSION : {sys.version.replace(chr(10),' ')}") log(f"šŸ“Œ PLATFORM : {platform.platform()}") log(f"šŸ“Œ TORCH VERSION : {torch.__version__}") log(f"šŸ“Œ TRANSFORMERS VERSION : {transformers.__version__}") log(f"šŸ“Œ DIFFUSERS VERSION : {diffusers.__version__}") log(f"šŸ“Œ CUDA AVAILABLE : {torch.cuda.is_available()}") log_system_stats("AT STARTUP") if not torch.cuda.is_available(): raise RuntimeError("āŒ CUDA Required") device = "cuda" gpu_id = 0 # ============================================================ # MODEL SETTINGS # ============================================================ model_cache = "./weights/" model_id = "Tongyi-MAI/Z-Image-Turbo" torch_dtype = torch.bfloat16 USE_CPU_OFFLOAD = False log("\n===================================================") log("🧠 MODEL CONFIGURATION") log("===================================================") log(f"Model ID : {model_id}") log(f"Model Cache Directory : {model_cache}") log(f"torch_dtype : {torch_dtype}") log(f"USE_CPU_OFFLOAD : {USE_CPU_OFFLOAD}") log_system_stats("BEFORE TRANSFORMER LOAD") # ============================================================ # FUNCTION TO CONVERT LATENTS TO IMAGE # ============================================================ def latent_to_image(latent): try: img_tensor = pipe.vae.decode(latent) img_tensor = (img_tensor / 2 + 0.5).clamp(0, 1) pil_img = T.ToPILImage()(img_tensor[0]) return pil_img except Exception as e: log(f"āš ļø Failed to decode latent: {e}") return None # ============================================================ # SAFE TRANSFORMER INSPECTION # ============================================================ def inspect_transformer(model, name): log(f"\nšŸ” Inspecting {name}") try: candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] blocks = None for attr in candidates: if hasattr(model, attr): blocks = getattr(model, attr) break if blocks is None: log(f"āš ļø No block structure found in {name}") return if hasattr(blocks, "__len__"): log(f"Total Blocks = {len(blocks)}") else: log("āš ļø Blocks exist but are not iterable") for i in range(min(10, len(blocks) if hasattr(blocks, "__len__") else 0)): log(f"Block {i} = {blocks[i].__class__.__name__}") except Exception as e: log(f"āš ļø Transformer inspect error: {e}") # ============================================================ # LOAD TRANSFORMER — WITH LIVE STATS # ============================================================ log("\n===================================================") log("šŸ”§ LOADING TRANSFORMER BLOCK") log("===================================================") log("šŸ“Œ Logging memory before load:") log_system_stats("START TRANSFORMER LOAD") try: quant_cfg = DiffusersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) transformer = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="transformer", quantization_config=quant_cfg, torch_dtype=torch_dtype, device_map=device, ) log("āœ… Transformer loaded successfully.") except Exception as e: log(f"āŒ Transformer load failed: {e}") transformer = None log_system_stats("AFTER TRANSFORMER LOAD") if transformer: inspect_transformer(transformer, "Transformer") # ============================================================ # LOAD TEXT ENCODER # ============================================================ log("\n===================================================") log("šŸ”§ LOADING TEXT ENCODER") log("===================================================") log_system_stats("START TEXT ENCODER LOAD") try: quant_cfg2 = TransformersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) text_encoder = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="text_encoder", quantization_config=quant_cfg2, torch_dtype=torch_dtype, device_map=device, ) log("āœ… Text encoder loaded successfully.") except Exception as e: log(f"āŒ Text encoder load failed: {e}") text_encoder = None log_system_stats("AFTER TEXT ENCODER LOAD") if text_encoder: inspect_transformer(text_encoder, "Text Encoder") # ============================================================ # BUILD PIPELINE # ============================================================ log("\n===================================================") log("šŸ”§ BUILDING PIPELINE") log("===================================================") log_system_stats("START PIPELINE BUILD") try: pipe = ZImagePipeline.from_pretrained( model_id, transformer=transformer, text_encoder=text_encoder, torch_dtype=torch_dtype, ) pipe.to(device) log("āœ… Pipeline built successfully.") except Exception as e: log(f"āŒ Pipeline build failed: {e}") pipe = None log_system_stats("AFTER PIPELINE BUILD") import torch from PIL import Image import io logs = [] latent_gallery = [] import torch from PIL import Image # Global log storage LOGS = [] def log(msg): LOGS.append(msg) print(msg) @spaces.GPU def generate_image(prompt, height, width, steps, seed, guidance_scale=0.0, return_latents=False): """ Generate an image from a prompt. Tries advanced latent-based method; falls back to standard pipeline if anything fails. """ try: generator = torch.Generator(device).manual_seed(int(seed)) # Try advanced latent preparation try: batch_size = 1 num_channels_latents = getattr(pipe.unet, "in_channels", None) if num_channels_latents is None: raise AttributeError("pipe.unet.in_channels not found, fallback to standard pipeline") latents = pipe.prepare_latents( batch_size=batch_size, num_channels=num_channels_latents, height=height, width=width, dtype=torch.float32, device=device, generator=generator ) log(f"āœ… Latents prepared: {latents.shape}") # Generate image using prepared latents output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, latents=latents ) except Exception as e_inner: # If advanced method fails, fallback to standard pipeline log(f"āš ļø Advanced latent method failed: {e_inner}") log("šŸ” Falling back to standard pipeline...") output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator ) image = output.images[0] log("āœ… Inference finished successfully.") if return_latents and 'latents' in locals(): return image, latents, LOGS else: return image, LOGS except Exception as e: log(f"āŒ Inference failed entirely: {e}") return None, LOGS # ============================================================ # UI # ============================================================ with gr.Blocks(title="Z-Image-Turbo Generator") as demo: gr.Markdown("# **šŸš€ Z-Image-Turbo — Final Image & Latents**") with gr.Row(): with gr.Column(scale=1): prompt = gr.Textbox(label="Prompt", value="Realistic mid-aged male image") height = gr.Slider(256, 2048, value=1024, step=8, label="Height") width = gr.Slider(256, 2048, value=1024, step=8, label="Width") steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps") seed = gr.Number(value=42, label="Seed") run_btn = gr.Button("Generate Image") with gr.Column(scale=1): final_image = gr.Image(label="Final Image") latent_gallery = gr.Gallery( label="Latent Steps", columns=4, height=256, preview=True ) logs_box = gr.Textbox(label="Logs", lines=15) run_btn.click( generate_image, inputs=[prompt, height, width, steps, seed], outputs=[final_image, latent_gallery, logs_box] ) demo.launch()