Spaces:
Paused
Paused
File size: 7,553 Bytes
fb9ecd6 7fa80e9 c12d71c fb9ecd6 de6a4a2 7fa80e9 dc3c3e6 7fa80e9 dc3c3e6 de6a4a2 fb9ecd6 dc3c3e6 de6a4a2 dc3c3e6 fb9ecd6 de6a4a2 fb9ecd6 de6a4a2 fb9ecd6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 560a2a7 dc3c3e6 de6a4a2 fb9ecd6 de6a4a2 560a2a7 de6a4a2 dc3c3e6 fb9ecd6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 de6a4a2 dc3c3e6 de6a4a2 fb9ecd6 dc3c3e6 de6a4a2 fb9ecd6 dc3c3e6 fb9ecd6 de6a4a2 dc3c3e6 de6a4a2 fb9ecd6 de6a4a2 fb9ecd6 de6a4a2 fb9ecd6 398c325 fb9ecd6 398c325 d1e80e2 398c325 247df58 398c325 fb9ecd6 de6a4a2 52f78d8 dc3c3e6 5dcf285 dc3c3e6 247df58 dc3c3e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import os
import gradio as gr
# --- Patch gradio_client boolean-schema bug ---
import gradio_client.utils as gcu
orig_json_schema_to_python_type = gcu._json_schema_to_python_type
def _safe_json_schema_to_python_type(schema, defs):
# Fix: handle boolean schema values for additionalProperties
if isinstance(schema, bool):
# True → any type allowed; False → never allowed
return "Any" if schema else "Never"
return orig_json_schema_to_python_type(schema, defs)
gcu._json_schema_to_python_type = _safe_json_schema_to_python_type
# ------------------------------------------------
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import base64
from PIL import Image, ImageDraw
from io import BytesIO
import re
# -------- Runtime / device --------
# Force CPU usage
device = "cpu"
# Hugging Face Spaces port
PORT = int(os.getenv("PORT", "7860"))
# -------- Model / Processor --------
# NOTE: device_map=None + .to(device) keeps everything on CPU
models = {
"OS-Copilot/OS-Atlas-Base-7B": Qwen2VLForConditionalGeneration.from_pretrained(
"OS-Copilot/OS-Atlas-Base-7B",
dtype="auto", # use 'dtype' (new) rather than deprecated 'torch_dtype'
device_map=None
).to(device)
}
processors = {
"OS-Copilot/OS-Atlas-Base-7B": AutoProcessor.from_pretrained("OS-Copilot/OS-Atlas-Base-7B")
}
# -------- Helpers --------
def image_to_base64(image: Image.Image) -> str:
buffered = BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
def draw_bounding_boxes(image: Image.Image, bounding_boxes, outline_color="red", line_width=2):
draw = ImageDraw.Draw(image)
for box in bounding_boxes or []:
xmin, ymin, xmax, ymax = box
draw.rectangle([xmin, ymin, xmax, ymax], outline=outline_color, width=line_width)
return image
def rescale_bounding_boxes(bounding_boxes, original_width, original_height, scaled_width=1000, scaled_height=1000):
if not bounding_boxes:
return []
x_scale = original_width / scaled_width
y_scale = original_height / scaled_height
return [
[xmin * x_scale, ymin * y_scale, xmax * x_scale, ymax * y_scale]
for (xmin, ymin, xmax, ymax) in bounding_boxes
]
# -------- Inference --------
def run_example(image, text_input, model_id="OS-Copilot/OS-Atlas-Base-7B"):
# Basic validation so the Space doesn't 500
if image is None or (text_input is None or str(text_input).strip() == ""):
return "", [], image
model = models[model_id].eval()
processor = processors[model_id]
prompt = f'In this UI screenshot, what is the position of the element corresponding to the command "{text_input}" (with bbox)?'
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": f"data:image;base64,{image_to_base64(image)}"},
{"type": "text", "text": prompt},
],
}
]
# Build inputs
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
# Move tensors to CPU explicitly
inputs = {k: (v.to(device) if hasattr(v, "to") else v) for k, v in inputs.items()}
# Generate
with torch.no_grad():
generated_ids = model.generate(**inputs, max_new_tokens=128)
# Post-process
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs["input_ids"], generated_ids)]
output_texts = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=False, clean_up_tokenization_spaces=False
)
text = output_texts[0] if output_texts else ""
# Parse object_ref and bbox defensively
object_ref_pattern = r"<\|object_ref_start\|>(.*?)<\|object_ref_end\|>"
box_pattern = r"<\|box_start\|>(.*?)<\|box_end\|>"
object_match = re.search(object_ref_pattern, text or "")
box_match = re.search(box_pattern, text or "")
object_ref = object_match.group(1).strip() if object_match else ""
box_content = box_match.group(1).strip() if box_match else ""
boxes = []
if box_content:
try:
# Expecting "(x1,y1),(x2,y2)" -> convert to [xmin, ymin, xmax, ymax]
parts = [p.strip() for p in box_content.split("),(")]
parts[0] = parts[0].lstrip("(")
parts[-1] = parts[-1].rstrip(")")
coords = [tuple(map(int, p.split(","))) for p in parts]
if len(coords) >= 2:
(x1, y1), (x2, y2) = coords[0], coords[1]
boxes = [[x1, y1, x2, y2]]
except Exception:
boxes = []
scaled_boxes = rescale_bounding_boxes(boxes, image.width, image.height) if boxes else []
annotated = draw_bounding_boxes(image.copy(), scaled_boxes) if scaled_boxes else image
return object_ref, scaled_boxes, annotated
# -------- UI --------
css = """
#output {
height: 500px;
overflow: auto;
border: 1px solid #ccc;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# Demo for OS-ATLAS: A Foundation Action Model For Generalist GUI Agents")
with gr.Row():
with gr.Column():
input_img = gr.Image(label="Input Image", type="pil")
model_selector = gr.Dropdown(
choices=list(models.keys()),
label="Model",
value="OS-Copilot/OS-Atlas-Base-7B"
)
text_input = gr.Textbox(label="User Prompt")
submit_btn = gr.Button(value="Submit")
with gr.Column():
model_output_text = gr.Textbox(label="Model Output Text")
model_output_box = gr.Textbox(label="Model Output Box")
annotated_image = gr.Image(label="Annotated Image")
gr.Examples(
examples=[
["assets/web_6f93090a-81f6-489e-bb35-1a2838b18c01.png", "select search textfield"],
["assets/web_6f93090a-81f6-489e-bb35-1a2838b18c01.png", "switch to discussions"],
],
inputs=[input_img, text_input],
# remove fn/outputs so examples only prefill inputs
)
submit_btn.click(
run_example,
[input_img, text_input, model_selector],
[model_output_text, model_output_box, annotated_image],
)
# ---- Make Gradio/Starlette error responses small & safe (no Content-Length drama) ----
from fastapi import Request
from starlette.responses import PlainTextResponse
app = demo.app # FastAPI app behind Gradio Blocks
@app.exception_handler(Exception)
async def _catch_all_exceptions(request: Request, exc: Exception):
# Return a very small body so Starlette/Uvicorn never miscounts bytes
return PlainTextResponse("Internal Server Error", status_code=500)
# --------------------------------------------------------------------------------------
# -------- Launch (Spaces-friendly) --------
demo.queue().launch(
server_name="0.0.0.0",
server_port=PORT,
show_error=False, # avoid large HTML error bodies
debug=False, # avoid big pretty tracebacks (and Content-Length mismatch)
show_api=False # <— key: disables /api/info schema generation
# api_open=False # if your Gradio version expects the old name, use this instead of show_api
)
|