Spaces:
Running
Running
| import gradio as gr | |
| from gradio_client import Client, handle_file | |
| from google import genai | |
| from google.genai import types | |
| import os | |
| from typing import Optional, List, Tuple, Union | |
| from huggingface_hub import whoami | |
| from PIL import Image | |
| from io import BytesIO | |
| import tempfile | |
| import ffmpeg | |
| import sqlite3 | |
| from datetime import datetime, date | |
| from pathlib import Path | |
| from threading import Lock | |
| # --- Database Setup --- | |
| DATA_DIR = Path("/data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| DB_PATH = DATA_DIR / "usage_limits.db" | |
| DAILY_LIMIT_STANDARD = 75 | |
| DAILY_LIMIT_PRO = 50 | |
| EXEMPTED_USERS = ["multimodalart"] | |
| db_lock = Lock() | |
| def init_db(): | |
| """Initialize the SQLite database.""" | |
| print(f"Initializing database at: {DB_PATH}") | |
| try: | |
| with sqlite3.connect(DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| # Check if table exists and what columns it has | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='usage'") | |
| table_exists = cursor.fetchone() | |
| if table_exists: | |
| # Check current schema | |
| cursor.execute("PRAGMA table_info(usage)") | |
| columns = [col[1] for col in cursor.fetchall()] | |
| # Migrate if old schema (only has 'count' column) | |
| if 'count' in columns and 'count_standard' not in columns: | |
| print("Migrating database from old schema to new schema...") | |
| # Rename old count to count_standard, add count_pro | |
| cursor.execute("ALTER TABLE usage RENAME COLUMN count TO count_standard") | |
| cursor.execute("ALTER TABLE usage ADD COLUMN count_pro INTEGER NOT NULL DEFAULT 0") | |
| conn.commit() | |
| print("Database migration completed successfully") | |
| elif 'count_standard' not in columns: | |
| # Table exists but doesn't have the right columns - recreate it | |
| print("Recreating table with new schema...") | |
| cursor.execute("DROP TABLE usage") | |
| cursor.execute(''' | |
| CREATE TABLE usage ( | |
| username TEXT PRIMARY KEY, | |
| date TEXT NOT NULL, | |
| count_standard INTEGER NOT NULL DEFAULT 0, | |
| count_pro INTEGER NOT NULL DEFAULT 0 | |
| ) | |
| ''') | |
| conn.commit() | |
| print("Database recreated successfully") | |
| else: | |
| print("Database schema is already up to date") | |
| else: | |
| # Create new table with updated schema | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS usage ( | |
| username TEXT PRIMARY KEY, | |
| date TEXT NOT NULL, | |
| count_standard INTEGER NOT NULL DEFAULT 0, | |
| count_pro INTEGER NOT NULL DEFAULT 0 | |
| ) | |
| ''') | |
| conn.commit() | |
| print("Database initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing database: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| def check_and_update_usage(username: str, use_pro_model: bool, credits_to_use: int = 1) -> bool: | |
| """ | |
| Check if user has reached daily limit and update usage. | |
| Returns True if user can generate, False if limit reached. | |
| credits_to_use: Number of credits to consume (1 for standard/1K, 2 for 2K, 4 for 4K) | |
| """ | |
| # Exempted users bypass all checks | |
| if username in EXEMPTED_USERS: | |
| print(f"User {username} is exempted from rate limits") | |
| return True | |
| limit = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD | |
| count_column = "count_pro" if use_pro_model else "count_standard" | |
| model_name = "PRO" if use_pro_model else "Standard" | |
| with db_lock: | |
| try: | |
| with sqlite3.connect(DB_PATH) as conn: | |
| today = str(date.today()) | |
| cursor = conn.cursor() | |
| # Get user record | |
| cursor.execute("SELECT date, count_standard, count_pro FROM usage WHERE username = ?", (username,)) | |
| result = cursor.fetchone() | |
| if result is None: | |
| # New user - create record | |
| if use_pro_model: | |
| cursor.execute("INSERT INTO usage (username, date, count_standard, count_pro) VALUES (?, ?, ?, ?)", | |
| (username, today, 0, credits_to_use)) | |
| else: | |
| cursor.execute("INSERT INTO usage (username, date, count_standard, count_pro) VALUES (?, ?, ?, ?)", | |
| (username, today, credits_to_use, 0)) | |
| conn.commit() | |
| print(f"New user {username}: {credits_to_use}/{limit} ({model_name})") | |
| return True | |
| user_date, user_count_standard, user_count_pro = result | |
| user_count = user_count_pro if use_pro_model else user_count_standard | |
| # Reset if new day | |
| if user_date != today: | |
| if use_pro_model: | |
| cursor.execute("UPDATE usage SET date = ?, count_standard = ?, count_pro = ? WHERE username = ?", | |
| (today, 0, credits_to_use, username)) | |
| else: | |
| cursor.execute("UPDATE usage SET date = ?, count_standard = ?, count_pro = ? WHERE username = ?", | |
| (today, credits_to_use, 0, username)) | |
| conn.commit() | |
| print(f"User {username} reset for new day: {credits_to_use}/{limit} ({model_name})") | |
| return True | |
| # Check if user has enough credits remaining | |
| if user_count + credits_to_use > limit: | |
| print(f"User {username} insufficient credits: needs {credits_to_use}, has {limit - user_count}/{limit} remaining ({model_name})") | |
| return False | |
| # Increment count by credits used | |
| new_count = user_count + credits_to_use | |
| cursor.execute(f"UPDATE usage SET {count_column} = ? WHERE username = ?", | |
| (new_count, username)) | |
| conn.commit() | |
| print(f"User {username} usage: {new_count}/{limit} (used {credits_to_use} credits) ({model_name})") | |
| return True | |
| except Exception as e: | |
| print(f"Error checking usage for {username}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # On error, allow the request (fail open) | |
| return True | |
| def get_remaining_generations(username: str, use_pro_model: bool) -> int: | |
| """Get the number of remaining generations for today.""" | |
| # Exempted users have unlimited generations | |
| if username in EXEMPTED_USERS: | |
| return 999999 # Return a large number to indicate unlimited | |
| limit = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD | |
| with db_lock: | |
| try: | |
| with sqlite3.connect(DB_PATH) as conn: | |
| today = str(date.today()) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT date, count_standard, count_pro FROM usage WHERE username = ?", (username,)) | |
| result = cursor.fetchone() | |
| if result is None: | |
| return limit | |
| user_date, user_count_standard, user_count_pro = result | |
| user_count = user_count_pro if use_pro_model else user_count_standard | |
| # Reset if new day | |
| if user_date != today: | |
| return limit | |
| return max(0, limit - user_count) | |
| except Exception as e: | |
| print(f"Error getting remaining generations for {username}: {e}") | |
| return limit | |
| # Initialize database on module load | |
| init_db() | |
| # --- Google Gemini API Configuration --- | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") | |
| if not GOOGLE_API_KEY: | |
| raise ValueError("GOOGLE_API_KEY environment variable not set.") | |
| client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY")) | |
| GEMINI_MODEL_NAME = 'gemini-2.5-flash-image' | |
| GEMINI_PRO_MODEL_NAME = 'gemini-3-pro-image-preview' | |
| def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: | |
| """Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" | |
| if not token: | |
| return False | |
| if isinstance(token, gr.OAuthToken): | |
| token_str = token.token | |
| elif isinstance(token, str): | |
| token_str = token | |
| else: | |
| return False | |
| try: | |
| user_info = whoami(token=token_str) | |
| return ( | |
| user_info.get("isPro", False) or | |
| any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) | |
| ) | |
| except Exception as e: | |
| print(f"Could not verify user's PRO/Enterprise status: {e}") | |
| return False | |
| def get_username(token: Optional[Union[gr.OAuthToken, str]]) -> Optional[str]: | |
| """Get the username from the token.""" | |
| if not token: | |
| return None | |
| if isinstance(token, gr.OAuthToken): | |
| token_str = token.token | |
| elif isinstance(token, str): | |
| token_str = token | |
| else: | |
| return None | |
| try: | |
| user_info = whoami(token=token_str) | |
| username = user_info.get("name", None) | |
| print(f"Username: {username}") | |
| return username | |
| except Exception as e: | |
| print(f"Could not get username: {e}") | |
| return None | |
| def get_credit_cost(resolution: str) -> int: | |
| """Get the credit cost for a given resolution.""" | |
| if "4K" in resolution: | |
| return 4 | |
| elif "2K" in resolution: | |
| return 2 | |
| else: # 1K | |
| return 1 | |
| def get_resolution_value(resolution: str) -> str: | |
| """Extract the resolution value from the dropdown selection.""" | |
| if "4K" in resolution: | |
| return "4K" | |
| elif "2K" in resolution: | |
| return "2K" | |
| else: | |
| return "1K" | |
| def _extract_image_data_from_response(response) -> Optional[bytes]: | |
| """Helper to extract image data from the model's response.""" | |
| if hasattr(response, 'candidates') and response.candidates: | |
| for part in response.candidates[0].content.parts: | |
| if hasattr(part, 'inline_data') and hasattr(part.inline_data, 'data'): | |
| return part.inline_data.data | |
| return None | |
| def _get_video_info(video_path: str) -> Tuple[float, Tuple[int, int]]: | |
| """Instantly gets the framerate and (width, height) of a video using ffprobe.""" | |
| probe = ffmpeg.probe(video_path) | |
| video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None) | |
| if not video_stream: | |
| raise ValueError("No video stream found in the file.") | |
| framerate = eval(video_stream['avg_frame_rate']) | |
| resolution = (int(video_stream['width']), int(video_stream['height'])) | |
| return framerate, resolution | |
| def _resize_image(image_path: str, target_size: Tuple[int, int]) -> str: | |
| """Resizes an image to a target size and saves it to a new temp file.""" | |
| with Image.open(image_path) as img: | |
| if img.size == target_size: | |
| return image_path | |
| resized_img = img.resize(target_size, Image.Resampling.LANCZOS) | |
| suffix = os.path.splitext(image_path)[1] or ".png" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| resized_img.save(tmp_file.name) | |
| return tmp_file.name | |
| def _trim_first_frame_fast(video_path: str) -> str: | |
| """Removes exactly the first frame of a video without re-encoding.""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: | |
| output_path = tmp_output_file.name | |
| try: | |
| framerate, _ = _get_video_info(video_path) | |
| if framerate == 0: raise ValueError("Framerate cannot be zero.") | |
| start_time = 1 / framerate | |
| ( | |
| ffmpeg | |
| .input(video_path, ss=start_time) | |
| .output(output_path, c='copy', avoid_negative_ts='make_zero') | |
| .run(overwrite_output=True, quiet=True) | |
| ) | |
| return output_path | |
| except Exception as e: | |
| raise RuntimeError(f"FFmpeg trim error: {e}") | |
| def _combine_videos_simple(video1_path: str, video2_path: str) -> str: | |
| """Combines two videos using the fast concat demuxer.""" | |
| with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file: | |
| tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n") | |
| tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n") | |
| list_file_path = tmp_list_file.name | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: | |
| output_path = tmp_output_file.name | |
| try: | |
| ( | |
| ffmpeg | |
| .input(list_file_path, format='concat', safe=0) | |
| .output(output_path, c='copy') | |
| .run(overwrite_output=True, quiet=True) | |
| ) | |
| return output_path | |
| except ffmpeg.Error as e: | |
| raise RuntimeError(f"FFmpeg combine error: {e.stderr.decode()}") | |
| finally: | |
| if os.path.exists(list_file_path): | |
| os.remove(list_file_path) | |
| def _generate_video_segment(input_image_path: str, output_image_path: str, prompt: str, token: str) -> str: | |
| """Generates a single video segment using the external service.""" | |
| video_client = Client("multimodalart/wan-2-2-first-last-frame", token=token) | |
| result = video_client.predict( | |
| start_image_pil=handle_file(input_image_path), | |
| end_image_pil=handle_file(output_image_path), | |
| prompt=prompt, api_name="/generate_video" | |
| ) | |
| return result[0]["video"] | |
| def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], aspect_ratio: str, model_selection: str, resolution: str, manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
| if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)): | |
| raise gr.Error("Access Denied.") | |
| # Determine if using PRO model based on radio selection | |
| use_pro_model = (model_selection == "Nano Banana PRO") | |
| # Calculate credit cost based on resolution (only for PRO model) | |
| credits_to_use = get_credit_cost(resolution) if use_pro_model else 1 | |
| # Check rate limit | |
| username = get_username(oauth_token) or get_username(manual_token) | |
| if not username: | |
| raise gr.Error("Could not identify user.") | |
| can_generate = check_and_update_usage(username, use_pro_model, credits_to_use) | |
| if not can_generate: | |
| # Check if user has quota on the other model | |
| remaining_other = get_remaining_generations(username, not use_pro_model) | |
| limit_current = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD | |
| model_name = "Nano Banana PRO" if use_pro_model else "Nano Banana" | |
| other_model_name = "Nano Banana" if use_pro_model else "Nano Banana PRO" | |
| # Get remaining credits for current model | |
| remaining_current = get_remaining_generations(username, use_pro_model) | |
| if use_pro_model and remaining_current > 0 and remaining_current < credits_to_use: | |
| gr.Info(f"You need {credits_to_use} credits for {get_resolution_value(resolution)} but only have {remaining_current} credits remaining. Try a lower resolution or use Nano Banana.") | |
| if remaining_other > 0: | |
| gr.Info(f"You've reached your daily limit for {model_name}. You still have {remaining_other} generations left with {other_model_name}!") | |
| raise gr.Error(f"Insufficient credits. You need {credits_to_use} credits for this generation.") | |
| try: | |
| contents = [Image.open(image_path[0]) for image_path in images] if images else [] | |
| contents.append(prompt) | |
| # Select model based on radio selection | |
| model_name = GEMINI_PRO_MODEL_NAME if use_pro_model else GEMINI_MODEL_NAME | |
| # Create config with aspect ratio and resolution (for PRO model) | |
| if use_pro_model: | |
| # PRO model: use both aspect_ratio and image_size | |
| resolution_value = get_resolution_value(resolution) | |
| if aspect_ratio == "Auto": | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| image_config=types.ImageConfig( | |
| image_size=resolution_value, | |
| ), | |
| ) | |
| else: | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| image_config=types.ImageConfig( | |
| aspect_ratio=aspect_ratio, | |
| image_size=resolution_value, | |
| ), | |
| ) | |
| else: | |
| # Standard model: only aspect_ratio | |
| if aspect_ratio == "Auto": | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| ) | |
| else: | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| image_config=types.ImageConfig( | |
| aspect_ratio=aspect_ratio, | |
| ), | |
| ) | |
| print(f"Generating image for user {username} with prompt {prompt}") | |
| response = client.models.generate_content( | |
| model=model_name, | |
| contents=contents, | |
| config=generate_content_config | |
| ) | |
| image_data = _extract_image_data_from_response(response) | |
| if not image_data: raise gr.Error("No image data in response") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: | |
| Image.open(BytesIO(image_data)).save(tmp.name) | |
| output_path = tmp.name | |
| can_create_video = bool(images and len(images) == 1) | |
| can_extend_video = False | |
| if can_create_video and previous_video_path and last_frame_path: | |
| # The crucial check for continuity | |
| if images[0][0] == last_frame_path: | |
| can_extend_video = True | |
| print(f"Image generated at {output_path}") | |
| return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False)) | |
| except Exception as e: | |
| raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again") | |
| def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
| if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") | |
| if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") | |
| try: | |
| new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token) | |
| return new_segment_path, new_segment_path, output_image | |
| except Exception as e: | |
| raise gr.Error(f"Video creation failed: {e}") | |
| def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
| if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") | |
| if not previous_video_path: raise gr.Error("No previous video to extend.") | |
| if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") | |
| try: | |
| _, target_resolution = _get_video_info(previous_video_path) | |
| resized_input_path = _resize_image(input_image_gallery[0][0], target_resolution) | |
| resized_output_path = _resize_image(output_image, target_resolution) | |
| new_segment_path = _generate_video_segment(resized_input_path, resized_output_path, prompt_input, oauth_token.token) | |
| trimmed_segment_path = _trim_first_frame_fast(new_segment_path) | |
| final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path) | |
| return final_video_path, final_video_path, output_image | |
| except Exception as e: | |
| raise gr.Error(f"Video extension failed: {e}") | |
| css = ''' | |
| #sub_title{margin-top: -15px !important} | |
| .tab-wrapper{margin-bottom: -33px !important} | |
| .tabitem{padding: 0px !important} | |
| .fillable{max-width: 980px !important} | |
| .dark .progress-text {color: white} | |
| .logo-dark{display: none} | |
| .dark .logo-dark{display: block !important} | |
| .dark .logo-light{display: none} | |
| .grid-container img{object-fit: contain} | |
| .grid-container {display: grid;grid-template-columns: repeat(2, 1fr)} | |
| .grid-container:has(> .gallery-item:only-child) {grid-template-columns: 1fr} | |
| #wan_ad p{text-align: center;padding: .5em} | |
| ''' | |
| with gr.Blocks() as demo: | |
| gr.HTML(''' | |
| <img class="logo-dark" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros.png' style='margin: 0 auto; max-width: 650px' /> | |
| <img class="logo-light" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros_light.png' style='margin: 0 auto; max-width: 650px' /> | |
| ''') | |
| gr.HTML("<h3 style='text-align:center'>Hugging Face PRO users can use Google's Nano Banana and Nano Banana PRO on this Space. <a href='http://huggingface.co/subscribe/pro?source=nana_banana' target='_blank'>Subscribe to PRO</a></h3>", elem_id="sub_title") | |
| pro_message = gr.Markdown(visible=False) | |
| main_interface = gr.Column(visible=False) | |
| previous_video_state = gr.State(None) | |
| last_frame_of_video_state = gr.State(None) | |
| with main_interface: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input_gallery = gr.Gallery(label="Upload one or more images here. Leave empty for text-to-image", file_types=["image"], height="auto") | |
| prompt_input = gr.Textbox(label="Prompt", placeholder="Turns this photo into a masterpiece") | |
| # Model selection radio | |
| model_radio = gr.Radio( | |
| choices=["Nano Banana", "Nano Banana PRO"], | |
| value="Nano Banana PRO", | |
| label="Model", | |
| ) | |
| with gr.Row(): | |
| aspect_ratio_dropdown = gr.Dropdown( | |
| label="Aspect Ratio", | |
| choices=["Auto", "1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5", "21:9"], | |
| value="Auto", | |
| interactive=True | |
| ) | |
| resolution_dropdown = gr.Dropdown( | |
| label="Resolution", | |
| choices=["1K", "2K", "4K"], | |
| value="1K", | |
| interactive=True, | |
| visible=True | |
| ) | |
| generate_button = gr.Button("Generate", variant="primary") | |
| with gr.Column(scale=1): | |
| output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath") | |
| use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary") | |
| with gr.Row(): | |
| create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False) | |
| extend_video_button = gr.Button("Extend existing video with new scene 🎞️", variant="secondary", visible=False) | |
| with gr.Group(visible=False) as video_group: | |
| video_output = gr.Video(label="Generated Video", buttons=["download"], autoplay=True) | |
| gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad") | |
| manual_token = gr.Textbox("Manual Token (to use with the API)", visible=False) | |
| gr.Markdown("<h2 style='text-align: center'>Thank you for being a PRO! 🤗</h2>") | |
| login_button = gr.LoginButton() | |
| # Show/hide resolution dropdown based on model selection | |
| def update_resolution_visibility(model_selection): | |
| return gr.update(visible=(model_selection == "Nano Banana PRO")) | |
| model_radio.change( | |
| fn=update_resolution_visibility, | |
| inputs=[model_radio], | |
| outputs=[resolution_dropdown] | |
| ) | |
| gr.on( | |
| triggers=[generate_button.click, prompt_input.submit], | |
| fn=unified_image_generator, | |
| inputs=[prompt_input, image_input_gallery, previous_video_state, last_frame_of_video_state, aspect_ratio_dropdown, model_radio, resolution_dropdown, manual_token], | |
| outputs=[output_image, create_video_button, extend_video_button, video_group], | |
| api_visibility="private" | |
| ) | |
| use_image_button.click( | |
| fn=lambda img: ( | |
| [img] if img else None, None, gr.update(visible=False), | |
| gr.update(visible=False), gr.update(visible=False) | |
| ), | |
| inputs=[output_image], | |
| outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group], | |
| api_visibility="private" | |
| ) | |
| create_video_button.click( | |
| fn=lambda: gr.update(visible=True), outputs=[video_group], | |
| api_visibility="private" | |
| ).then( | |
| fn=create_new_video, | |
| inputs=[image_input_gallery, prompt_input, output_image], | |
| outputs=[video_output, previous_video_state, last_frame_of_video_state], | |
| api_visibility="private" | |
| ) | |
| extend_video_button.click( | |
| fn=lambda: gr.update(visible=True), outputs=[video_group], | |
| api_visibility="private" | |
| ).then( | |
| fn=extend_existing_video, | |
| inputs=[image_input_gallery, prompt_input, output_image, previous_video_state], | |
| outputs=[video_output, previous_video_state, last_frame_of_video_state], | |
| api_visibility="private" | |
| ) | |
| def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): | |
| if not profile: return gr.update(visible=False), gr.update(visible=False) | |
| if verify_pro_status(oauth_token): | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| message = ( | |
| "## ✨ Exclusive Access for PRO Users\n\n" | |
| "Thank you for your interest! This app is available exclusively for our Hugging Face **PRO** members.\n\n" | |
| "To unlock this and many other cool stuff, please consider upgrading your account.\n\n" | |
| "### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)" | |
| ) | |
| return gr.update(visible=False), gr.update(visible=True, value=message) | |
| demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=None, default_concurrency_limit=None).launch( | |
| show_error=True, | |
| theme=gr.themes.Citrus(), | |
| css=css | |
| ) |