Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import torch | |
| import spaces | |
| from PIL import Image | |
| from diffusers import FlowMatchEulerDiscreteScheduler,DiffusionPipeline | |
| from optimization import optimize_pipeline_ | |
| from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline | |
| from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel | |
| from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 | |
| from huggingface_hub import InferenceClient | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| from safetensors.torch import load_file | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| from basicsr.utils.download_util import load_file_from_url | |
| from realesrgan import RealESRGANer | |
| from realesrgan.archs.srvgg_arch import SRVGGNetCompact | |
| import cv2 | |
| import numpy | |
| import os | |
| import base64 | |
| from io import BytesIO | |
| import json | |
| import time # Added for history update delay | |
| from gradio_client import Client, handle_file | |
| import tempfile | |
| from PIL import Image | |
| import gradio as gr | |
| # --- Upscaling --- | |
| MAX_SEED = np.iinfo(np.int32).max | |
| UPSAMPLER_CACHE = {} | |
| GFPGAN_FACE_ENHANCER = {} | |
| def rnd_string(x): return "".join(random.choice("abcdefghijklmnopqrstuvwxyz_0123456789") for _ in range(x)) | |
| def get_model_and_paths(model_name, denoise_strength): | |
| if model_name in ('RealESRGAN_x4plus', 'RealESRNet_x4plus'): | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4) | |
| netscale = 4 | |
| file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth'] \ | |
| if model_name == 'RealESRGAN_x4plus' else \ | |
| ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.1/RealESRNet_x4plus.pth'] | |
| elif model_name == 'RealESRGAN_x4plus_anime_6B': | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=6, num_grow_ch=32, scale=4) | |
| netscale = 4 | |
| file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.2.4/RealESRGAN_x4plus_anime_6B.pth'] | |
| elif model_name == 'RealESRGAN_x2plus': | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2) | |
| netscale = 2 | |
| file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth'] | |
| elif model_name == 'realesr-general-x4v3': | |
| model = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu') | |
| netscale = 4 | |
| file_url = [ | |
| 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-wdn-x4v3.pth', | |
| 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth' | |
| ] | |
| else: | |
| raise ValueError(f"Unsupported model: {model_name}") | |
| model_path = os.path.join("weights", model_name + ".pth") | |
| if not os.path.isfile(model_path): | |
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| for url in file_url: | |
| model_path = load_file_from_url(url=url, model_dir=os.path.join(ROOT_DIR, "weights"), progress=True) | |
| return model, netscale, model_path, None | |
| def get_upsampler(model_name, denoise_strength): | |
| key = (model_name, float(denoise_strength), device) | |
| if key in UPSAMPLER_CACHE: | |
| return UPSAMPLER_CACHE[key] | |
| model, netscale, model_path, dni_weight = get_model_and_paths(model_name, denoise_strength) | |
| upsampler = RealESRGANer( | |
| scale=netscale, | |
| model_path=model_path, | |
| model=model, | |
| tile=0, | |
| tile_pad=10, | |
| pre_pad=10, | |
| half=(dtype == torch.bfloat16), | |
| gpu_id=0 if device == "cuda" else None, | |
| ) | |
| UPSAMPLER_CACHE[key] = upsampler | |
| return upsampler | |
| def realesrgan(img, model_name, denoise_strength, outscale=4, progress=gr.Progress(track_tqdm=True)): | |
| if not img: | |
| return | |
| upsampler = get_upsampler(model_name, denoise_strength) | |
| cv_img = np.array(img.convert("RGB")) | |
| bgr = cv2.cvtColor(cv_img, cv2.COLOR_RGB2BGR) | |
| try: | |
| output, _ = upsampler.enhance(bgr, outscale=int(outscale)) | |
| except Exception as e: | |
| print("Upscale error:", e) | |
| return img | |
| out_filename = f"output_{rnd_string(8)}.jpg" | |
| cv2.imwrite(out_filename, output) | |
| return out_filename | |
| def turn_into_video(input_images, output_images, prompt, progress=gr.Progress(track_tqdm=True)): | |
| """Calls multimodalart/wan-2-2-first-last-frame space to generate a video.""" | |
| if not input_images or not output_images: | |
| raise gr.Error("Please generate an output image first.") | |
| progress(0.02, desc="Preparing images...") | |
| # Safely extract PIL images from Gradio galleries | |
| def extract_pil(img_entry): | |
| if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image): | |
| return img_entry[0] | |
| elif isinstance(img_entry, Image.Image): | |
| return img_entry | |
| elif isinstance(img_entry, str): | |
| return Image.open(img_entry) | |
| else: | |
| raise gr.Error(f"Unsupported image format: {type(img_entry)}") | |
| start_img = extract_pil(input_images[0]) | |
| end_img = extract_pil(output_images[0]) | |
| progress(0.10, desc="Saving temp files...") | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \ | |
| tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end: | |
| start_img.save(tmp_start.name) | |
| end_img.save(tmp_end.name) | |
| progress(0.20, desc="Connecting to Wan space...") | |
| client = Client("multimodalart/wan-2-2-first-last-frame") | |
| progress(0.35, desc="generating video...") | |
| result = client.predict( | |
| start_image_pil={"image": handle_file(tmp_start.name)}, | |
| end_image_pil={"image": handle_file(tmp_end.name)}, | |
| prompt=prompt or "smooth cinematic transition", | |
| api_name="/generate_video" | |
| ) | |
| progress(0.95, desc="Finalizing...") | |
| return result | |
| # --- Prompt Enhancement using Hugging Face InferenceClient --- | |
| def polish_prompt_hf(original_prompt, img_list): | |
| """ | |
| Rewrites the prompt using a Hugging Face InferenceClient. | |
| """ | |
| # Ensure HF_TOKEN is set | |
| api_key = os.environ.get("HF_TOKEN") | |
| if not api_key: | |
| print("Warning: HF_TOKEN not set. Falling back to original prompt.") | |
| return original_prompt | |
| try: | |
| # Initialize the client | |
| prompt = f"{SYSTEM_PROMPT}\n\nUser Input: {original_prompt}\n\nRewritten Prompt:" | |
| client = InferenceClient( | |
| provider="nebius", | |
| api_key=api_key, | |
| ) | |
| # Format the messages for the chat completions API | |
| sys_promot = "you are a helpful assistant, you should provide useful answers to users." | |
| messages = [ | |
| {"role": "system", "content": sys_promot}, | |
| {"role": "user", "content": []}] | |
| for img in img_list: | |
| messages[1]["content"].append( | |
| {"image": f"data:image/png;base64,{encode_image(img)}"}) | |
| messages[1]["content"].append({"text": f"{prompt}"}) | |
| # Call the API | |
| completion = client.chat.completions.create( | |
| model="Qwen/Qwen2.5-VL-72B-Instruct", | |
| messages=messages, | |
| ) | |
| # Parse the response | |
| result = completion.choices[0].message.content | |
| # Try to extract JSON if present | |
| if '"Rewritten"' in result: | |
| try: | |
| # Clean up the response | |
| result = result.replace('```json', '').replace('```', '') | |
| result_json = json.loads(result) | |
| polished_prompt = result_json.get('Rewritten', result) | |
| except: | |
| polished_prompt = result | |
| else: | |
| polished_prompt = result | |
| polished_prompt = polished_prompt.strip().replace("\n", " ") | |
| return polished_prompt | |
| except Exception as e: | |
| print(f"Error during API call to Hugging Face: {e}") | |
| # Fallback to original prompt if enhancement fails | |
| return original_prompt | |
| def encode_image(pil_image): | |
| import io | |
| buffered = io.BytesIO() | |
| pil_image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| # --- Model Loading --- | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| pipe = QwenImageEditPlusPipeline.from_pretrained("Qwen/Qwen-Image-Edit-2509", | |
| transformer= QwenImageTransformer2DModel.from_pretrained("linoyts/Qwen-Image-Edit-Rapid-AIO", | |
| subfolder='transformer', | |
| torch_dtype=dtype, | |
| device_map='cuda'),torch_dtype=dtype).to(device) | |
| pipe.load_lora_weights( | |
| "lovis93/next-scene-qwen-image-lora-2509", | |
| weight_name="next-scene_lora-v2-3000.safetensors", adapter_name="next-scene" | |
| ) | |
| pipe.set_adapters(["next-scene"], adapter_weights=[1.]) | |
| pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.) | |
| pipe.unload_lora_weights() | |
| # Apply the same optimizations from the first version | |
| pipe.transformer.__class__ = QwenImageTransformer2DModel | |
| pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) | |
| # --- Ahead-of-time compilation --- | |
| optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") | |
| # --- UI Constants and Helpers --- | |
| MAX_SEED = np.iinfo(np.int32).max | |
| # --- Main Inference Function (with hardcoded negative prompt) --- | |
| def infer( | |
| images, | |
| prompt, | |
| seed=42, | |
| randomize_seed=False, | |
| true_guidance_scale=1.0, | |
| num_inference_steps=4, | |
| height=None, | |
| width=None, | |
| num_images_per_prompt=1, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Generates an image using the local Qwen-Image diffusers pipeline. | |
| """ | |
| # Hardcode the negative prompt as requested | |
| negative_prompt = "Vibrant colors, overexposed, static, blurry details, subtitles, style, artwork, painting, image, still, overall grayish, worst quality, low quality, JPEG compression residue, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn face, deformed, disfigured, deformed limbs, fingers fused together, static image, cluttered background, three legs, many people in the background, walking backwards." | |
| rewrite_prompt=False | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| # Set up the generator for reproducibility | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| expected_key = os.environ.get("deepseek_key") | |
| if expected_key not in prompt: | |
| print("❌ Invalid key.") | |
| return None | |
| prompt = prompt.replace(expected_key, "") | |
| # Load input images into PIL Images | |
| pil_images = [] | |
| if images: | |
| for item in images: | |
| try: | |
| if isinstance(item[0], Image.Image): | |
| pil_images.append(item[0].convert("RGB")) | |
| elif isinstance(item[0], str): | |
| pil_images.append(Image.open(item[0]).convert("RGB")) | |
| elif hasattr(item, "name"): | |
| pil_images.append(Image.open(item.name).convert("RGB")) | |
| except Exception: | |
| continue | |
| # --- NEW: Load default image if no input --- | |
| if not pil_images: | |
| default_path = os.path.join(os.path.dirname(__file__), "1.jpg") | |
| if os.path.exists(default_path): | |
| pil_images = [Image.open(default_path).convert("RGB")] | |
| print("Loaded default image: 1.jpg") | |
| else: | |
| raise gr.Error("No input images and '1.jpg' not found in app directory.") | |
| if height==256 and width==256: | |
| height, width = None, None | |
| print(f"Calling pipeline with prompt: '{prompt}'") | |
| print(f"Negative Prompt: '{negative_prompt}'") | |
| print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") | |
| if not prompt or prompt.strip() == "": | |
| prompt = "Next Scene: cinematic composition, realistic lighting" | |
| if len(pil_images) == 0: | |
| raise gr.Error("Please provide at least one input image.") | |
| # Generate the image | |
| image = pipe( | |
| image=pil_images if len(pil_images) > 0 else None, | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| generator=generator, | |
| true_cfg_scale=true_guidance_scale, | |
| num_images_per_prompt=num_images_per_prompt, | |
| ).images | |
| upscaled = realesrgan(image[0], "realesr-general-x4v3", 0.5,2) | |
| # Return images, seed, and make button visible | |
| return image, upscaled, seed | |
| # --- Examples and UI Layout --- | |
| examples = [] | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 1024px; | |
| } | |
| #logo-title { | |
| text-align: center; | |
| } | |
| #logo-title img { | |
| width: 400px; | |
| } | |
| #edit_text{margin-top: -62px !important} | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_images = gr.Gallery(label="Input Images", | |
| show_label=False, | |
| type="pil", | |
| interactive=True) | |
| prompt = gr.Text( | |
| label="Prompt 🪄", | |
| show_label=True, | |
| placeholder="", | |
| ) | |
| run_button = gr.Button("Edit!", variant="primary") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=0, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| with gr.Row(): | |
| true_guidance_scale = gr.Slider( | |
| label="True guidance scale", | |
| minimum=1.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=1.0 | |
| ) | |
| num_inference_steps = gr.Slider( | |
| label="Number of inference steps", | |
| minimum=1, | |
| maximum=40, | |
| step=1, | |
| value=4, | |
| ) | |
| height = gr.Slider( | |
| label="Height", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| width = gr.Slider( | |
| label="Width", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| rewrite_prompt = gr.Checkbox(label="Rewrite prompt", value=False) | |
| with gr.Column(): | |
| result = gr.Gallery(label="Result", show_label=False, type="pil") | |
| upscaled = gr.Image(label="Upscaled") | |
| gr.on( | |
| triggers=[run_button.click, prompt.submit], | |
| fn=infer, | |
| inputs=[ | |
| input_images, | |
| prompt, | |
| seed, | |
| randomize_seed, | |
| true_guidance_scale, | |
| num_inference_steps, | |
| height, | |
| width, | |
| ], | |
| outputs=[result,upscaled, seed], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |