nano-banana / app.py
multimodalart's picture
Update app.py
40d9732 verified
import gradio as gr
from gradio_client import Client, handle_file
from google import genai
from google.genai import types
import os
from typing import Optional, List, Tuple, Union
from huggingface_hub import whoami
from PIL import Image
from io import BytesIO
import tempfile
import ffmpeg
import sqlite3
from datetime import datetime, date
from pathlib import Path
from threading import Lock
# --- Database Setup ---
DATA_DIR = Path("/data")
DATA_DIR.mkdir(exist_ok=True)
DB_PATH = DATA_DIR / "usage_limits.db"
DAILY_LIMIT_STANDARD = 75
DAILY_LIMIT_PRO = 50
EXEMPTED_USERS = ["multimodalart"]
db_lock = Lock()
def init_db():
"""Initialize the SQLite database."""
print(f"Initializing database at: {DB_PATH}")
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
# Check if table exists and what columns it has
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='usage'")
table_exists = cursor.fetchone()
if table_exists:
# Check current schema
cursor.execute("PRAGMA table_info(usage)")
columns = [col[1] for col in cursor.fetchall()]
# Migrate if old schema (only has 'count' column)
if 'count' in columns and 'count_standard' not in columns:
print("Migrating database from old schema to new schema...")
# Rename old count to count_standard, add count_pro
cursor.execute("ALTER TABLE usage RENAME COLUMN count TO count_standard")
cursor.execute("ALTER TABLE usage ADD COLUMN count_pro INTEGER NOT NULL DEFAULT 0")
conn.commit()
print("Database migration completed successfully")
elif 'count_standard' not in columns:
# Table exists but doesn't have the right columns - recreate it
print("Recreating table with new schema...")
cursor.execute("DROP TABLE usage")
cursor.execute('''
CREATE TABLE usage (
username TEXT PRIMARY KEY,
date TEXT NOT NULL,
count_standard INTEGER NOT NULL DEFAULT 0,
count_pro INTEGER NOT NULL DEFAULT 0
)
''')
conn.commit()
print("Database recreated successfully")
else:
print("Database schema is already up to date")
else:
# Create new table with updated schema
cursor.execute('''
CREATE TABLE IF NOT EXISTS usage (
username TEXT PRIMARY KEY,
date TEXT NOT NULL,
count_standard INTEGER NOT NULL DEFAULT 0,
count_pro INTEGER NOT NULL DEFAULT 0
)
''')
conn.commit()
print("Database initialized successfully")
except Exception as e:
print(f"Error initializing database: {e}")
import traceback
traceback.print_exc()
def check_and_update_usage(username: str, use_pro_model: bool, credits_to_use: int = 1) -> bool:
"""
Check if user has reached daily limit and update usage.
Returns True if user can generate, False if limit reached.
credits_to_use: Number of credits to consume (1 for standard/1K, 2 for 2K, 4 for 4K)
"""
# Exempted users bypass all checks
if username in EXEMPTED_USERS:
print(f"User {username} is exempted from rate limits")
return True
limit = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD
count_column = "count_pro" if use_pro_model else "count_standard"
model_name = "PRO" if use_pro_model else "Standard"
with db_lock:
try:
with sqlite3.connect(DB_PATH) as conn:
today = str(date.today())
cursor = conn.cursor()
# Get user record
cursor.execute("SELECT date, count_standard, count_pro FROM usage WHERE username = ?", (username,))
result = cursor.fetchone()
if result is None:
# New user - create record
if use_pro_model:
cursor.execute("INSERT INTO usage (username, date, count_standard, count_pro) VALUES (?, ?, ?, ?)",
(username, today, 0, credits_to_use))
else:
cursor.execute("INSERT INTO usage (username, date, count_standard, count_pro) VALUES (?, ?, ?, ?)",
(username, today, credits_to_use, 0))
conn.commit()
print(f"New user {username}: {credits_to_use}/{limit} ({model_name})")
return True
user_date, user_count_standard, user_count_pro = result
user_count = user_count_pro if use_pro_model else user_count_standard
# Reset if new day
if user_date != today:
if use_pro_model:
cursor.execute("UPDATE usage SET date = ?, count_standard = ?, count_pro = ? WHERE username = ?",
(today, 0, credits_to_use, username))
else:
cursor.execute("UPDATE usage SET date = ?, count_standard = ?, count_pro = ? WHERE username = ?",
(today, credits_to_use, 0, username))
conn.commit()
print(f"User {username} reset for new day: {credits_to_use}/{limit} ({model_name})")
return True
# Check if user has enough credits remaining
if user_count + credits_to_use > limit:
print(f"User {username} insufficient credits: needs {credits_to_use}, has {limit - user_count}/{limit} remaining ({model_name})")
return False
# Increment count by credits used
new_count = user_count + credits_to_use
cursor.execute(f"UPDATE usage SET {count_column} = ? WHERE username = ?",
(new_count, username))
conn.commit()
print(f"User {username} usage: {new_count}/{limit} (used {credits_to_use} credits) ({model_name})")
return True
except Exception as e:
print(f"Error checking usage for {username}: {e}")
import traceback
traceback.print_exc()
# On error, allow the request (fail open)
return True
def get_remaining_generations(username: str, use_pro_model: bool) -> int:
"""Get the number of remaining generations for today."""
# Exempted users have unlimited generations
if username in EXEMPTED_USERS:
return 999999 # Return a large number to indicate unlimited
limit = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD
with db_lock:
try:
with sqlite3.connect(DB_PATH) as conn:
today = str(date.today())
cursor = conn.cursor()
cursor.execute("SELECT date, count_standard, count_pro FROM usage WHERE username = ?", (username,))
result = cursor.fetchone()
if result is None:
return limit
user_date, user_count_standard, user_count_pro = result
user_count = user_count_pro if use_pro_model else user_count_standard
# Reset if new day
if user_date != today:
return limit
return max(0, limit - user_count)
except Exception as e:
print(f"Error getting remaining generations for {username}: {e}")
return limit
# Initialize database on module load
init_db()
# --- Google Gemini API Configuration ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY environment variable not set.")
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
GEMINI_MODEL_NAME = 'gemini-2.5-flash-image'
GEMINI_PRO_MODEL_NAME = 'gemini-3-pro-image-preview'
def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool:
"""Verifies if the user is a Hugging Face PRO user or part of an enterprise org."""
if not token:
return False
if isinstance(token, gr.OAuthToken):
token_str = token.token
elif isinstance(token, str):
token_str = token
else:
return False
try:
user_info = whoami(token=token_str)
return (
user_info.get("isPro", False) or
any(org.get("isEnterprise", False) for org in user_info.get("orgs", []))
)
except Exception as e:
print(f"Could not verify user's PRO/Enterprise status: {e}")
return False
def get_username(token: Optional[Union[gr.OAuthToken, str]]) -> Optional[str]:
"""Get the username from the token."""
if not token:
return None
if isinstance(token, gr.OAuthToken):
token_str = token.token
elif isinstance(token, str):
token_str = token
else:
return None
try:
user_info = whoami(token=token_str)
username = user_info.get("name", None)
print(f"Username: {username}")
return username
except Exception as e:
print(f"Could not get username: {e}")
return None
def get_credit_cost(resolution: str) -> int:
"""Get the credit cost for a given resolution."""
if "4K" in resolution:
return 4
elif "2K" in resolution:
return 2
else: # 1K
return 1
def get_resolution_value(resolution: str) -> str:
"""Extract the resolution value from the dropdown selection."""
if "4K" in resolution:
return "4K"
elif "2K" in resolution:
return "2K"
else:
return "1K"
def _extract_image_data_from_response(response) -> Optional[bytes]:
"""Helper to extract image data from the model's response."""
if hasattr(response, 'candidates') and response.candidates:
for part in response.candidates[0].content.parts:
if hasattr(part, 'inline_data') and hasattr(part.inline_data, 'data'):
return part.inline_data.data
return None
def _get_video_info(video_path: str) -> Tuple[float, Tuple[int, int]]:
"""Instantly gets the framerate and (width, height) of a video using ffprobe."""
probe = ffmpeg.probe(video_path)
video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None)
if not video_stream:
raise ValueError("No video stream found in the file.")
framerate = eval(video_stream['avg_frame_rate'])
resolution = (int(video_stream['width']), int(video_stream['height']))
return framerate, resolution
def _resize_image(image_path: str, target_size: Tuple[int, int]) -> str:
"""Resizes an image to a target size and saves it to a new temp file."""
with Image.open(image_path) as img:
if img.size == target_size:
return image_path
resized_img = img.resize(target_size, Image.Resampling.LANCZOS)
suffix = os.path.splitext(image_path)[1] or ".png"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
resized_img.save(tmp_file.name)
return tmp_file.name
def _trim_first_frame_fast(video_path: str) -> str:
"""Removes exactly the first frame of a video without re-encoding."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
output_path = tmp_output_file.name
try:
framerate, _ = _get_video_info(video_path)
if framerate == 0: raise ValueError("Framerate cannot be zero.")
start_time = 1 / framerate
(
ffmpeg
.input(video_path, ss=start_time)
.output(output_path, c='copy', avoid_negative_ts='make_zero')
.run(overwrite_output=True, quiet=True)
)
return output_path
except Exception as e:
raise RuntimeError(f"FFmpeg trim error: {e}")
def _combine_videos_simple(video1_path: str, video2_path: str) -> str:
"""Combines two videos using the fast concat demuxer."""
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file:
tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n")
tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n")
list_file_path = tmp_list_file.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
output_path = tmp_output_file.name
try:
(
ffmpeg
.input(list_file_path, format='concat', safe=0)
.output(output_path, c='copy')
.run(overwrite_output=True, quiet=True)
)
return output_path
except ffmpeg.Error as e:
raise RuntimeError(f"FFmpeg combine error: {e.stderr.decode()}")
finally:
if os.path.exists(list_file_path):
os.remove(list_file_path)
def _generate_video_segment(input_image_path: str, output_image_path: str, prompt: str, token: str) -> str:
"""Generates a single video segment using the external service."""
video_client = Client("multimodalart/wan-2-2-first-last-frame", token=token)
result = video_client.predict(
start_image_pil=handle_file(input_image_path),
end_image_pil=handle_file(output_image_path),
prompt=prompt, api_name="/generate_video"
)
return result[0]["video"]
def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], aspect_ratio: str, model_selection: str, resolution: str, manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)):
raise gr.Error("Access Denied.")
# Determine if using PRO model based on radio selection
use_pro_model = (model_selection == "Nano Banana PRO")
# Calculate credit cost based on resolution (only for PRO model)
credits_to_use = get_credit_cost(resolution) if use_pro_model else 1
# Check rate limit
username = get_username(oauth_token) or get_username(manual_token)
if not username:
raise gr.Error("Could not identify user.")
can_generate = check_and_update_usage(username, use_pro_model, credits_to_use)
if not can_generate:
# Check if user has quota on the other model
remaining_other = get_remaining_generations(username, not use_pro_model)
limit_current = DAILY_LIMIT_PRO if use_pro_model else DAILY_LIMIT_STANDARD
model_name = "Nano Banana PRO" if use_pro_model else "Nano Banana"
other_model_name = "Nano Banana" if use_pro_model else "Nano Banana PRO"
# Get remaining credits for current model
remaining_current = get_remaining_generations(username, use_pro_model)
if use_pro_model and remaining_current > 0 and remaining_current < credits_to_use:
gr.Info(f"You need {credits_to_use} credits for {get_resolution_value(resolution)} but only have {remaining_current} credits remaining. Try a lower resolution or use Nano Banana.")
if remaining_other > 0:
gr.Info(f"You've reached your daily limit for {model_name}. You still have {remaining_other} generations left with {other_model_name}!")
raise gr.Error(f"Insufficient credits. You need {credits_to_use} credits for this generation.")
try:
contents = [Image.open(image_path[0]) for image_path in images] if images else []
contents.append(prompt)
# Select model based on radio selection
model_name = GEMINI_PRO_MODEL_NAME if use_pro_model else GEMINI_MODEL_NAME
# Create config with aspect ratio and resolution (for PRO model)
if use_pro_model:
# PRO model: use both aspect_ratio and image_size
resolution_value = get_resolution_value(resolution)
if aspect_ratio == "Auto":
generate_content_config = types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"],
image_config=types.ImageConfig(
image_size=resolution_value,
),
)
else:
generate_content_config = types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"],
image_config=types.ImageConfig(
aspect_ratio=aspect_ratio,
image_size=resolution_value,
),
)
else:
# Standard model: only aspect_ratio
if aspect_ratio == "Auto":
generate_content_config = types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"],
)
else:
generate_content_config = types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"],
image_config=types.ImageConfig(
aspect_ratio=aspect_ratio,
),
)
print(f"Generating image for user {username} with prompt {prompt}")
response = client.models.generate_content(
model=model_name,
contents=contents,
config=generate_content_config
)
image_data = _extract_image_data_from_response(response)
if not image_data: raise gr.Error("No image data in response")
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
Image.open(BytesIO(image_data)).save(tmp.name)
output_path = tmp.name
can_create_video = bool(images and len(images) == 1)
can_extend_video = False
if can_create_video and previous_video_path and last_frame_path:
# The crucial check for continuity
if images[0][0] == last_frame_path:
can_extend_video = True
print(f"Image generated at {output_path}")
return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False))
except Exception as e:
raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again")
def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
try:
new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token)
return new_segment_path, new_segment_path, output_image
except Exception as e:
raise gr.Error(f"Video creation failed: {e}")
def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
if not previous_video_path: raise gr.Error("No previous video to extend.")
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
try:
_, target_resolution = _get_video_info(previous_video_path)
resized_input_path = _resize_image(input_image_gallery[0][0], target_resolution)
resized_output_path = _resize_image(output_image, target_resolution)
new_segment_path = _generate_video_segment(resized_input_path, resized_output_path, prompt_input, oauth_token.token)
trimmed_segment_path = _trim_first_frame_fast(new_segment_path)
final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path)
return final_video_path, final_video_path, output_image
except Exception as e:
raise gr.Error(f"Video extension failed: {e}")
css = '''
#sub_title{margin-top: -15px !important}
.tab-wrapper{margin-bottom: -33px !important}
.tabitem{padding: 0px !important}
.fillable{max-width: 980px !important}
.dark .progress-text {color: white}
.logo-dark{display: none}
.dark .logo-dark{display: block !important}
.dark .logo-light{display: none}
.grid-container img{object-fit: contain}
.grid-container {display: grid;grid-template-columns: repeat(2, 1fr)}
.grid-container:has(> .gallery-item:only-child) {grid-template-columns: 1fr}
#wan_ad p{text-align: center;padding: .5em}
'''
with gr.Blocks() as demo:
gr.HTML('''
<img class="logo-dark" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros.png' style='margin: 0 auto; max-width: 650px' />
<img class="logo-light" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros_light.png' style='margin: 0 auto; max-width: 650px' />
''')
gr.HTML("<h3 style='text-align:center'>Hugging Face PRO users can use Google's Nano Banana and Nano Banana PRO on this Space. <a href='http://huggingface.co/subscribe/pro?source=nana_banana' target='_blank'>Subscribe to PRO</a></h3>", elem_id="sub_title")
pro_message = gr.Markdown(visible=False)
main_interface = gr.Column(visible=False)
previous_video_state = gr.State(None)
last_frame_of_video_state = gr.State(None)
with main_interface:
with gr.Row():
with gr.Column(scale=1):
image_input_gallery = gr.Gallery(label="Upload one or more images here. Leave empty for text-to-image", file_types=["image"], height="auto")
prompt_input = gr.Textbox(label="Prompt", placeholder="Turns this photo into a masterpiece")
# Model selection radio
model_radio = gr.Radio(
choices=["Nano Banana", "Nano Banana PRO"],
value="Nano Banana PRO",
label="Model",
)
with gr.Row():
aspect_ratio_dropdown = gr.Dropdown(
label="Aspect Ratio",
choices=["Auto", "1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5", "21:9"],
value="Auto",
interactive=True
)
resolution_dropdown = gr.Dropdown(
label="Resolution",
choices=["1K", "2K", "4K"],
value="1K",
interactive=True,
visible=True
)
generate_button = gr.Button("Generate", variant="primary")
with gr.Column(scale=1):
output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath")
use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary")
with gr.Row():
create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False)
extend_video_button = gr.Button("Extend existing video with new scene 🎞️", variant="secondary", visible=False)
with gr.Group(visible=False) as video_group:
video_output = gr.Video(label="Generated Video", buttons=["download"], autoplay=True)
gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad")
manual_token = gr.Textbox("Manual Token (to use with the API)", visible=False)
gr.Markdown("<h2 style='text-align: center'>Thank you for being a PRO! 🤗</h2>")
login_button = gr.LoginButton()
# Show/hide resolution dropdown based on model selection
def update_resolution_visibility(model_selection):
return gr.update(visible=(model_selection == "Nano Banana PRO"))
model_radio.change(
fn=update_resolution_visibility,
inputs=[model_radio],
outputs=[resolution_dropdown]
)
gr.on(
triggers=[generate_button.click, prompt_input.submit],
fn=unified_image_generator,
inputs=[prompt_input, image_input_gallery, previous_video_state, last_frame_of_video_state, aspect_ratio_dropdown, model_radio, resolution_dropdown, manual_token],
outputs=[output_image, create_video_button, extend_video_button, video_group],
api_visibility="private"
)
use_image_button.click(
fn=lambda img: (
[img] if img else None, None, gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False)
),
inputs=[output_image],
outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group],
api_visibility="private"
)
create_video_button.click(
fn=lambda: gr.update(visible=True), outputs=[video_group],
api_visibility="private"
).then(
fn=create_new_video,
inputs=[image_input_gallery, prompt_input, output_image],
outputs=[video_output, previous_video_state, last_frame_of_video_state],
api_visibility="private"
)
extend_video_button.click(
fn=lambda: gr.update(visible=True), outputs=[video_group],
api_visibility="private"
).then(
fn=extend_existing_video,
inputs=[image_input_gallery, prompt_input, output_image, previous_video_state],
outputs=[video_output, previous_video_state, last_frame_of_video_state],
api_visibility="private"
)
def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None):
if not profile: return gr.update(visible=False), gr.update(visible=False)
if verify_pro_status(oauth_token):
return gr.update(visible=True), gr.update(visible=False)
else:
message = (
"## ✨ Exclusive Access for PRO Users\n\n"
"Thank you for your interest! This app is available exclusively for our Hugging Face **PRO** members.\n\n"
"To unlock this and many other cool stuff, please consider upgrading your account.\n\n"
"### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)"
)
return gr.update(visible=False), gr.update(visible=True, value=message)
demo.load(control_access, inputs=None, outputs=[main_interface, pro_message])
if __name__ == "__main__":
demo.queue(max_size=None, default_concurrency_limit=None).launch(
show_error=True,
theme=gr.themes.Citrus(),
css=css
)