Spaces:
Paused
Paused
| import os | |
| import gradio as gr | |
| # --- Patch gradio_client boolean-schema bug --- | |
| import gradio_client.utils as gcu | |
| orig_json_schema_to_python_type = gcu._json_schema_to_python_type | |
| def _safe_json_schema_to_python_type(schema, defs): | |
| # Fix: handle boolean schema values for additionalProperties | |
| if isinstance(schema, bool): | |
| # True → any type allowed; False → never allowed | |
| return "Any" if schema else "Never" | |
| return orig_json_schema_to_python_type(schema, defs) | |
| gcu._json_schema_to_python_type = _safe_json_schema_to_python_type | |
| # ------------------------------------------------ | |
| from transformers import Qwen2VLForConditionalGeneration, AutoProcessor | |
| from qwen_vl_utils import process_vision_info | |
| import torch | |
| import base64 | |
| from PIL import Image, ImageDraw | |
| from io import BytesIO | |
| import re | |
| # -------- Runtime / device -------- | |
| # Force CPU usage | |
| device = "cpu" | |
| # Hugging Face Spaces port | |
| PORT = int(os.getenv("PORT", "7860")) | |
| # -------- Model / Processor -------- | |
| # NOTE: device_map=None + .to(device) keeps everything on CPU | |
| models = { | |
| "OS-Copilot/OS-Atlas-Base-7B": Qwen2VLForConditionalGeneration.from_pretrained( | |
| "OS-Copilot/OS-Atlas-Base-7B", | |
| dtype="auto", # use 'dtype' (new) rather than deprecated 'torch_dtype' | |
| device_map=None | |
| ).to(device) | |
| } | |
| processors = { | |
| "OS-Copilot/OS-Atlas-Base-7B": AutoProcessor.from_pretrained("OS-Copilot/OS-Atlas-Base-7B") | |
| } | |
| # -------- Helpers -------- | |
| def image_to_base64(image: Image.Image) -> str: | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| def draw_bounding_boxes(image: Image.Image, bounding_boxes, outline_color="red", line_width=2): | |
| draw = ImageDraw.Draw(image) | |
| for box in bounding_boxes or []: | |
| xmin, ymin, xmax, ymax = box | |
| draw.rectangle([xmin, ymin, xmax, ymax], outline=outline_color, width=line_width) | |
| return image | |
| def rescale_bounding_boxes(bounding_boxes, original_width, original_height, scaled_width=1000, scaled_height=1000): | |
| if not bounding_boxes: | |
| return [] | |
| x_scale = original_width / scaled_width | |
| y_scale = original_height / scaled_height | |
| return [ | |
| [xmin * x_scale, ymin * y_scale, xmax * x_scale, ymax * y_scale] | |
| for (xmin, ymin, xmax, ymax) in bounding_boxes | |
| ] | |
| # -------- Inference -------- | |
| def run_example(image, text_input, model_id="OS-Copilot/OS-Atlas-Base-7B"): | |
| # Basic validation so the Space doesn't 500 | |
| if image is None or (text_input is None or str(text_input).strip() == ""): | |
| return "", [], image | |
| model = models[model_id].eval() | |
| processor = processors[model_id] | |
| prompt = f'In this UI screenshot, what is the position of the element corresponding to the command "{text_input}" (with bbox)?' | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": f"data:image;base64,{image_to_base64(image)}"}, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| } | |
| ] | |
| # Build inputs | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| # Move tensors to CPU explicitly | |
| inputs = {k: (v.to(device) if hasattr(v, "to") else v) for k, v in inputs.items()} | |
| # Generate | |
| with torch.no_grad(): | |
| generated_ids = model.generate(**inputs, max_new_tokens=128) | |
| # Post-process | |
| generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs["input_ids"], generated_ids)] | |
| output_texts = processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=False, clean_up_tokenization_spaces=False | |
| ) | |
| text = output_texts[0] if output_texts else "" | |
| # Parse object_ref and bbox defensively | |
| object_ref_pattern = r"<\|object_ref_start\|>(.*?)<\|object_ref_end\|>" | |
| box_pattern = r"<\|box_start\|>(.*?)<\|box_end\|>" | |
| object_match = re.search(object_ref_pattern, text or "") | |
| box_match = re.search(box_pattern, text or "") | |
| object_ref = object_match.group(1).strip() if object_match else "" | |
| box_content = box_match.group(1).strip() if box_match else "" | |
| boxes = [] | |
| if box_content: | |
| try: | |
| # Expecting "(x1,y1),(x2,y2)" -> convert to [xmin, ymin, xmax, ymax] | |
| parts = [p.strip() for p in box_content.split("),(")] | |
| parts[0] = parts[0].lstrip("(") | |
| parts[-1] = parts[-1].rstrip(")") | |
| coords = [tuple(map(int, p.split(","))) for p in parts] | |
| if len(coords) >= 2: | |
| (x1, y1), (x2, y2) = coords[0], coords[1] | |
| boxes = [[x1, y1, x2, y2]] | |
| except Exception: | |
| boxes = [] | |
| scaled_boxes = rescale_bounding_boxes(boxes, image.width, image.height) if boxes else [] | |
| annotated = draw_bounding_boxes(image.copy(), scaled_boxes) if scaled_boxes else image | |
| return object_ref, scaled_boxes, annotated | |
| # -------- UI -------- | |
| css = """ | |
| #output { | |
| height: 500px; | |
| overflow: auto; | |
| border: 1px solid #ccc; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("# Demo for OS-ATLAS: A Foundation Action Model For Generalist GUI Agents") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_img = gr.Image(label="Input Image", type="pil") | |
| model_selector = gr.Dropdown( | |
| choices=list(models.keys()), | |
| label="Model", | |
| value="OS-Copilot/OS-Atlas-Base-7B" | |
| ) | |
| text_input = gr.Textbox(label="User Prompt") | |
| submit_btn = gr.Button(value="Submit") | |
| with gr.Column(): | |
| model_output_text = gr.Textbox(label="Model Output Text") | |
| model_output_box = gr.Textbox(label="Model Output Box") | |
| annotated_image = gr.Image(label="Annotated Image") | |
| gr.Examples( | |
| examples=[ | |
| ["assets/web_6f93090a-81f6-489e-bb35-1a2838b18c01.png", "select search textfield"], | |
| ["assets/web_6f93090a-81f6-489e-bb35-1a2838b18c01.png", "switch to discussions"], | |
| ], | |
| inputs=[input_img, text_input], | |
| # remove fn/outputs so examples only prefill inputs | |
| ) | |
| submit_btn.click( | |
| run_example, | |
| [input_img, text_input, model_selector], | |
| [model_output_text, model_output_box, annotated_image], | |
| ) | |
| # ---- Make Gradio/Starlette error responses small & safe (no Content-Length drama) ---- | |
| from fastapi import Request | |
| from starlette.responses import PlainTextResponse | |
| app = demo.app # FastAPI app behind Gradio Blocks | |
| async def _catch_all_exceptions(request: Request, exc: Exception): | |
| # Return a very small body so Starlette/Uvicorn never miscounts bytes | |
| return PlainTextResponse("Internal Server Error", status_code=500) | |
| # -------------------------------------------------------------------------------------- | |
| # -------- Launch (Spaces-friendly) -------- | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=PORT, | |
| show_error=False, # avoid large HTML error bodies | |
| debug=False, # avoid big pretty tracebacks (and Content-Length mismatch) | |
| show_api=False # <— key: disables /api/info schema generation | |
| # api_open=False # if your Gradio version expects the old name, use this instead of show_api | |
| ) | |