Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import os | |
| import time | |
| from collections import defaultdict | |
| BASE_URL = "https://api.jigsawstack.com/v1" | |
| headers = { | |
| "x-api-key": os.getenv("JIGSAWSTACK_API_KEY") | |
| } | |
| # Rate limiting configuration | |
| request_times = defaultdict(list) | |
| MAX_REQUESTS = 20 # Maximum requests per time window | |
| TIME_WINDOW = 3600 # Time window in seconds (1 hour) | |
| def get_real_ip(request: gr.Request): | |
| """Extract real IP address using x-forwarded-for header or fallback""" | |
| if not request: | |
| return "unknown" | |
| forwarded = request.headers.get("x-forwarded-for") | |
| if forwarded: | |
| ip = forwarded.split(",")[0].strip() # First IP in the list is the client's | |
| else: | |
| ip = request.client.host # fallback | |
| return ip | |
| def check_rate_limit(request: gr.Request): | |
| """Check if the current request exceeds rate limits""" | |
| if not request: | |
| return True, "Rate limit check failed - no request info" | |
| ip = get_real_ip(request) | |
| now = time.time() | |
| # Clean up old timestamps outside the time window | |
| request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
| # Check if rate limit exceeded | |
| if len(request_times[ip]) >= MAX_REQUESTS: | |
| time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
| time_remaining_minutes = round(time_remaining / 60, 1) | |
| time_window_minutes = round(TIME_WINDOW / 60, 1) | |
| return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes." | |
| # Add current request timestamp | |
| request_times[ip].append(now) | |
| return True, "" | |
| # ----------------- JigsawStack API Wrasppers ------------------ | |
| def vocr(source_type, image_url, file_store_key, prompt_str, page_range_str, request: gr.Request): | |
| # Check rate limit first | |
| rate_limit_ok, rate_limit_msg = check_rate_limit(request) | |
| if not rate_limit_ok: | |
| return ( | |
| rate_limit_msg, # status | |
| None, # image | |
| gr.update(visible=False), # context JSON | |
| gr.update(visible=False), # tags | |
| gr.update(visible=False), # has_text | |
| gr.update(visible=False), # sections JSON | |
| ) | |
| def error_response(message, img_src): | |
| return ( | |
| message, | |
| img_src, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False) | |
| ) | |
| image_to_display = image_url if source_type == "URL" else None | |
| try: | |
| payload = {} | |
| # Validate prompts - ensure a prompt is always provided. | |
| if not prompt_str or not prompt_str.strip(): | |
| return error_response("Error: Prompt is required.", image_to_display) | |
| prompts = [p.strip() for p in prompt_str.split(',') if p.strip()] | |
| if not prompts: | |
| return error_response("Error: Prompt cannot be empty or just commas.", image_to_display) | |
| # The API can handle an array of prompts, which is more robust | |
| # and avoids potential issues with the response format. | |
| payload["prompt"] = prompts | |
| # Validate page range | |
| if page_range_str and page_range_str.strip(): | |
| try: | |
| parts = [int(p.strip()) for p in page_range_str.split(',')] | |
| if len(parts) != 2: | |
| raise ValueError("Page range must be two numbers (e.g., 1,10).") | |
| start_page, end_page = parts | |
| if not (start_page > 0 and end_page > 0): | |
| raise ValueError("Page numbers must be positive.") | |
| if start_page > end_page: | |
| raise ValueError("Start page cannot be greater than end page.") | |
| if (end_page - start_page) >= 10: | |
| raise ValueError("Page range cannot span more than 10 pages.") | |
| payload["page_range"] = [start_page, end_page] | |
| except (ValueError, TypeError) as e: | |
| return error_response(f"Error: Invalid page range format - {e}", image_to_display) | |
| if source_type == "URL": | |
| if not image_url or not image_url.strip(): | |
| return error_response("Error: Image URL is required.", image_to_display) | |
| payload["url"] = image_url.strip() | |
| elif source_type == "File Store Key": | |
| if not file_store_key or not file_store_key.strip(): | |
| return error_response("Error: File Store Key is required.", image_to_display) | |
| payload["file_store_key"] = file_store_key.strip() | |
| else: | |
| return error_response("Error: Invalid image source selected.", image_to_display) | |
| response = requests.post(f"{BASE_URL}/vocr", headers=headers, json=payload) | |
| response.raise_for_status() | |
| result = response.json() | |
| if not result.get("success"): | |
| return error_response(f"Error: vOCR failed - {result.get('message', 'Unknown error')}", image_to_display) | |
| context = result.get("context", {}) | |
| tags = ", ".join(result.get("tags", [])) | |
| has_text = str(result.get("has_text", "N/A")) | |
| sections = result.get("sections", []) | |
| status = "✅ Successfully processed image with vOCR." | |
| return ( | |
| status, | |
| image_to_display, | |
| gr.update(value=context, visible=True if context else False), | |
| gr.update(value=tags, visible=True if tags else False), | |
| gr.update(value=has_text, visible=True), | |
| gr.update(value=sections, visible=True if sections else False) | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| return error_response(f"Request failed: {str(e)}", image_to_display) | |
| except Exception as e: | |
| return error_response(f"An unexpected error occurred: {str(e)}", image_to_display) | |
| # ----------------- Gradio UI ------------------ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 24px;'> | |
| <h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 vOCR</h1> | |
| <p style='font-size:1.2em; margin-top: 0;'>Extract text from images with advanced AI models.</p> | |
| <p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/vocr' target='_blank'>documentation</a>.</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### Image Source") | |
| vocr_source_type = gr.Radio( | |
| choices=["URL", "File Store Key"], | |
| label="Choose Image Source", | |
| value="URL" | |
| ) | |
| vocr_image_url = gr.Textbox( | |
| label="Image URL", | |
| placeholder="https://media.snopes.com/2021/08/239918331_10228097135359041_3825446756894757753_n.jpg", | |
| visible=True | |
| ) | |
| vocr_file_key = gr.Textbox( | |
| label="File Store Key", | |
| placeholder="your-file-store-key", | |
| visible=False | |
| ) | |
| vocr_prompts = gr.Textbox( | |
| label="Prompts (comma-separated)", | |
| placeholder="total_price, tax, store_name", | |
| info="Prompts to guide data extraction from the image." | |
| ) | |
| vocr_page_range = gr.Textbox( | |
| label="Page Range (Optional)", | |
| placeholder="e.g., 1,10", | |
| info="For multi-page docs. Max 10 pages." | |
| ) | |
| vocr_btn = gr.Button("Analyze Image", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("#### Analysis Results") | |
| vocr_status = gr.Textbox(label="Status", interactive=False) | |
| vocr_image_display = gr.Image(label="Analyzed Image") | |
| vocr_context = gr.JSON(label="Extracted Context", visible=False) | |
| vocr_tags = gr.Textbox(label="Detected Tags", interactive=False, visible=False) | |
| vocr_has_text = gr.Textbox(label="Text Detected?", interactive=False, visible=False) | |
| vocr_sections = gr.JSON(label="Full OCR Sections", visible=False) | |
| def update_vocr_source(source_type): | |
| is_url = source_type == "URL" | |
| return gr.update(visible=is_url), gr.update(visible=not is_url) | |
| vocr_source_type.change( | |
| update_vocr_source, | |
| inputs=vocr_source_type, | |
| outputs=[vocr_image_url, vocr_file_key] | |
| ) | |
| vocr_btn.click( | |
| vocr, | |
| inputs=[vocr_source_type, vocr_image_url, vocr_file_key, vocr_prompts, vocr_page_range], | |
| outputs=[vocr_status, vocr_image_display, vocr_context, vocr_tags, vocr_has_text, vocr_sections] | |
| ) | |
| demo.launch() |