mrbui1990's picture
Update app.py
6182fd5 verified
raw
history blame
16.3 kB
import gradio as gr
import numpy as np
import random
import torch
import spaces
from PIL import Image
from diffusers import FlowMatchEulerDiscreteScheduler,DiffusionPipeline
from optimization import optimize_pipeline_
from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline
from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel
from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3
from huggingface_hub import InferenceClient
import math
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
from basicsr.archs.rrdbnet_arch import RRDBNet
from basicsr.utils.download_util import load_file_from_url
from realesrgan import RealESRGANer
from realesrgan.archs.srvgg_arch import SRVGGNetCompact
import cv2
import numpy
import os
import base64
from io import BytesIO
import json
import time # Added for history update delay
from gradio_client import Client, handle_file
import tempfile
from PIL import Image
import gradio as gr
# --- Upscaling ---
MAX_SEED = np.iinfo(np.int32).max
UPSAMPLER_CACHE = {}
GFPGAN_FACE_ENHANCER = {}
def rnd_string(x): return "".join(random.choice("abcdefghijklmnopqrstuvwxyz_0123456789") for _ in range(x))
def get_model_and_paths(model_name, denoise_strength):
if model_name in ('RealESRGAN_x4plus', 'RealESRNet_x4plus'):
model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4)
netscale = 4
file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth'] \
if model_name == 'RealESRGAN_x4plus' else \
['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.1/RealESRNet_x4plus.pth']
elif model_name == 'RealESRGAN_x4plus_anime_6B':
model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=6, num_grow_ch=32, scale=4)
netscale = 4
file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.2.4/RealESRGAN_x4plus_anime_6B.pth']
elif model_name == 'RealESRGAN_x2plus':
model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2)
netscale = 2
file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth']
elif model_name == 'realesr-general-x4v3':
model = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu')
netscale = 4
file_url = [
'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-wdn-x4v3.pth',
'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth'
]
else:
raise ValueError(f"Unsupported model: {model_name}")
model_path = os.path.join("weights", model_name + ".pth")
if not os.path.isfile(model_path):
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
for url in file_url:
model_path = load_file_from_url(url=url, model_dir=os.path.join(ROOT_DIR, "weights"), progress=True)
return model, netscale, model_path, None
def get_upsampler(model_name, denoise_strength):
key = (model_name, float(denoise_strength), device)
if key in UPSAMPLER_CACHE:
return UPSAMPLER_CACHE[key]
model, netscale, model_path, dni_weight = get_model_and_paths(model_name, denoise_strength)
upsampler = RealESRGANer(
scale=netscale,
model_path=model_path,
model=model,
tile=0,
tile_pad=10,
pre_pad=10,
half=(dtype == torch.bfloat16),
gpu_id=0 if device == "cuda" else None,
)
UPSAMPLER_CACHE[key] = upsampler
return upsampler
@spaces.GPU
def realesrgan(img, model_name, denoise_strength, outscale=4, progress=gr.Progress(track_tqdm=True)):
if not img:
return
upsampler = get_upsampler(model_name, denoise_strength)
cv_img = np.array(img.convert("RGB"))
bgr = cv2.cvtColor(cv_img, cv2.COLOR_RGB2BGR)
try:
output, _ = upsampler.enhance(bgr, outscale=int(outscale))
except Exception as e:
print("Upscale error:", e)
return img
out_filename = f"output_{rnd_string(8)}.jpg"
cv2.imwrite(out_filename, output)
return out_filename
def turn_into_video(input_images, output_images, prompt, progress=gr.Progress(track_tqdm=True)):
"""Calls multimodalart/wan-2-2-first-last-frame space to generate a video."""
if not input_images or not output_images:
raise gr.Error("Please generate an output image first.")
progress(0.02, desc="Preparing images...")
# Safely extract PIL images from Gradio galleries
def extract_pil(img_entry):
if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image):
return img_entry[0]
elif isinstance(img_entry, Image.Image):
return img_entry
elif isinstance(img_entry, str):
return Image.open(img_entry)
else:
raise gr.Error(f"Unsupported image format: {type(img_entry)}")
start_img = extract_pil(input_images[0])
end_img = extract_pil(output_images[0])
progress(0.10, desc="Saving temp files...")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \
tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end:
start_img.save(tmp_start.name)
end_img.save(tmp_end.name)
progress(0.20, desc="Connecting to Wan space...")
client = Client("multimodalart/wan-2-2-first-last-frame")
progress(0.35, desc="generating video...")
result = client.predict(
start_image_pil={"image": handle_file(tmp_start.name)},
end_image_pil={"image": handle_file(tmp_end.name)},
prompt=prompt or "smooth cinematic transition",
api_name="/generate_video"
)
progress(0.95, desc="Finalizing...")
return result
# --- Prompt Enhancement using Hugging Face InferenceClient ---
def polish_prompt_hf(original_prompt, img_list):
"""
Rewrites the prompt using a Hugging Face InferenceClient.
"""
# Ensure HF_TOKEN is set
api_key = os.environ.get("HF_TOKEN")
if not api_key:
print("Warning: HF_TOKEN not set. Falling back to original prompt.")
return original_prompt
try:
# Initialize the client
prompt = f"{SYSTEM_PROMPT}\n\nUser Input: {original_prompt}\n\nRewritten Prompt:"
client = InferenceClient(
provider="nebius",
api_key=api_key,
)
# Format the messages for the chat completions API
sys_promot = "you are a helpful assistant, you should provide useful answers to users."
messages = [
{"role": "system", "content": sys_promot},
{"role": "user", "content": []}]
for img in img_list:
messages[1]["content"].append(
{"image": f"data:image/png;base64,{encode_image(img)}"})
messages[1]["content"].append({"text": f"{prompt}"})
# Call the API
completion = client.chat.completions.create(
model="Qwen/Qwen2.5-VL-72B-Instruct",
messages=messages,
)
# Parse the response
result = completion.choices[0].message.content
# Try to extract JSON if present
if '"Rewritten"' in result:
try:
# Clean up the response
result = result.replace('```json', '').replace('```', '')
result_json = json.loads(result)
polished_prompt = result_json.get('Rewritten', result)
except:
polished_prompt = result
else:
polished_prompt = result
polished_prompt = polished_prompt.strip().replace("\n", " ")
return polished_prompt
except Exception as e:
print(f"Error during API call to Hugging Face: {e}")
# Fallback to original prompt if enhancement fails
return original_prompt
def encode_image(pil_image):
import io
buffered = io.BytesIO()
pil_image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
# --- Model Loading ---
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = QwenImageEditPlusPipeline.from_pretrained("Qwen/Qwen-Image-Edit-2509",
transformer= QwenImageTransformer2DModel.from_pretrained("linoyts/Qwen-Image-Edit-Rapid-AIO",
subfolder='transformer',
torch_dtype=dtype,
device_map='cuda'),torch_dtype=dtype).to(device)
pipe.load_lora_weights(
"lovis93/next-scene-qwen-image-lora-2509",
weight_name="next-scene_lora-v2-3000.safetensors", adapter_name="next-scene"
)
pipe.set_adapters(["next-scene"], adapter_weights=[1.])
pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.)
pipe.unload_lora_weights()
# Apply the same optimizations from the first version
pipe.transformer.__class__ = QwenImageTransformer2DModel
pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3())
# --- Ahead-of-time compilation ---
optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt")
# --- UI Constants and Helpers ---
MAX_SEED = np.iinfo(np.int32).max
# --- Main Inference Function (with hardcoded negative prompt) ---
@spaces.GPU(duration=60)
def infer(
images,
prompt,
seed=42,
randomize_seed=False,
true_guidance_scale=1.0,
num_inference_steps=4,
height=None,
width=None,
num_images_per_prompt=1,
progress=gr.Progress(track_tqdm=True),
):
"""
Generates an image using the local Qwen-Image diffusers pipeline.
"""
# Hardcode the negative prompt as requested
negative_prompt = "Vibrant colors, overexposed, static, blurry details, subtitles, style, artwork, painting, image, still, overall grayish, worst quality, low quality, JPEG compression residue, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn face, deformed, disfigured, deformed limbs, fingers fused together, static image, cluttered background, three legs, many people in the background, walking backwards."
rewrite_prompt=False
if randomize_seed:
seed = random.randint(0, MAX_SEED)
# Set up the generator for reproducibility
generator = torch.Generator(device=device).manual_seed(seed)
expected_key = os.environ.get("deepseek_key")
if expected_key not in prompt:
print("❌ Invalid key.")
return None
prompt = prompt.replace(expected_key, "")
# Load input images into PIL Images
pil_images = []
if images:
for item in images:
try:
if isinstance(item[0], Image.Image):
pil_images.append(item[0].convert("RGB"))
elif isinstance(item[0], str):
pil_images.append(Image.open(item[0]).convert("RGB"))
elif hasattr(item, "name"):
pil_images.append(Image.open(item.name).convert("RGB"))
except Exception:
continue
# --- NEW: Load default image if no input ---
if not pil_images:
default_path = os.path.join(os.path.dirname(__file__), "1.jpg")
if os.path.exists(default_path):
pil_images = [Image.open(default_path).convert("RGB")]
print("Loaded default image: 1.jpg")
else:
raise gr.Error("No input images and '1.jpg' not found in app directory.")
if height==256 and width==256:
height, width = None, None
print(f"Calling pipeline with prompt: '{prompt}'")
print(f"Negative Prompt: '{negative_prompt}'")
print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}")
if not prompt or prompt.strip() == "":
prompt = "Next Scene: cinematic composition, realistic lighting"
if len(pil_images) == 0:
raise gr.Error("Please provide at least one input image.")
# Generate the image
image = pipe(
image=pil_images if len(pil_images) > 0 else None,
prompt=prompt,
height=height,
width=width,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
generator=generator,
true_cfg_scale=true_guidance_scale,
num_images_per_prompt=num_images_per_prompt,
).images
upscaled = realesrgan(image[0], "realesr-general-x4v3", 0.5,2)
# Return images, seed, and make button visible
return image, upscaled, seed
# --- Examples and UI Layout ---
examples = []
css = """
#col-container {
margin: 0 auto;
max-width: 1024px;
}
#logo-title {
text-align: center;
}
#logo-title img {
width: 400px;
}
#edit_text{margin-top: -62px !important}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
with gr.Row():
with gr.Column():
input_images = gr.Gallery(label="Input Images",
show_label=False,
type="pil",
interactive=True)
prompt = gr.Text(
label="Prompt 🪄",
show_label=True,
placeholder="",
)
run_button = gr.Button("Edit!", variant="primary")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
with gr.Row():
true_guidance_scale = gr.Slider(
label="True guidance scale",
minimum=1.0,
maximum=10.0,
step=0.1,
value=1.0
)
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=40,
step=1,
value=4,
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=2048,
step=8,
value=None,
)
width = gr.Slider(
label="Width",
minimum=256,
maximum=2048,
step=8,
value=None,
)
rewrite_prompt = gr.Checkbox(label="Rewrite prompt", value=False)
with gr.Column():
result = gr.Gallery(label="Result", show_label=False, type="pil")
upscaled = gr.Image(label="Upscaled")
gr.on(
triggers=[run_button.click, prompt.submit],
fn=infer,
inputs=[
input_images,
prompt,
seed,
randomize_seed,
true_guidance_scale,
num_inference_steps,
height,
width,
],
outputs=[result,upscaled, seed],
)
if __name__ == "__main__":
demo.launch()