Spaces:
Sleeping
Sleeping
| import boto3 | |
| import os | |
| import json | |
| import gradio as gr | |
| from typing import List, Dict, Tuple, Optional, Any | |
| # โโ S3 CONFIG โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| s3 = boto3.client( | |
| "s3", | |
| aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID"), | |
| aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| region_name = os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2"), | |
| ) | |
| BUCKET = "doccano-processed" | |
| INIT_KEY = "gradio/aug25_recleaning_train_and_holdout.json" | |
| VALID_PREFIX = "aug25_recleaning_train_and_holdout/" | |
| # โโ Helpers to load & save from S3 โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def load_initial_data() -> List[Dict]: | |
| obj = s3.get_object(Bucket=BUCKET, Key=INIT_KEY) | |
| data = json.loads(obj['Body'].read()) | |
| # assume ner_text spans use end-index as non-inclusive | |
| for rec in data: | |
| rec.setdefault("validated", False) | |
| return data | |
| def load_all_validations() -> Dict[int, Dict]: | |
| records = {} | |
| pages = s3.get_paginator("list_objects_v2").paginate( | |
| Bucket=BUCKET, Prefix=VALID_PREFIX | |
| ) | |
| for page in pages: | |
| for obj in page.get("Contents", []): | |
| idx = int(os.path.splitext(os.path.basename(obj["Key"]))[0]) | |
| rec = json.loads(s3.get_object(Bucket=BUCKET, Key=obj["Key"])['Body'].read()) | |
| rec.setdefault("validated", True) | |
| records[idx] = rec | |
| return records | |
| def save_single_validation(idx: int, record: Dict): | |
| key = f"{VALID_PREFIX}{idx}.json" | |
| s3.put_object( | |
| Bucket = BUCKET, | |
| Key = key, | |
| Body = json.dumps(record, indent=2).encode('utf-8'), | |
| ContentType = 'application/json' | |
| ) | |
| ##fckxk | |
| class DynamicDataset: | |
| def __init__(self, data: List[Dict]): | |
| self.data = data | |
| self.len = len(data) | |
| self.current = 0 | |
| def example(self, idx: int) -> Dict: | |
| self.current = max(0, min(self.len - 1, idx)) | |
| return self.data[self.current] | |
| def next(self) -> Dict: | |
| if self.current < self.len - 1: | |
| self.current += 1 | |
| return self.data[self.current] | |
| def prev(self) -> Dict: | |
| if self.current > 0: | |
| self.current -= 1 | |
| return self.data[self.current] | |
| def jump_next_unvalidated(self) -> Dict: | |
| for i in range(self.current + 1, self.len): | |
| if not self.data[i]["validated"]: | |
| self.current = i | |
| break | |
| return self.data[self.current] | |
| def jump_prev_unvalidated(self) -> Dict: | |
| for i in range(self.current - 1, -1, -1): | |
| if not self.data[i]["validated"]: | |
| self.current = i | |
| break | |
| return self.data[self.current] | |
| def validate(self): | |
| self.data[self.current]["validated"] = True | |
| # โโ Highlight utils using raw text (half-open intervals) โโโโโโโโโโโโโโโโโโโโโโโ | |
| def prepare_for_highlight(data: Dict) -> List[Tuple[str, Optional[str]]]: | |
| text = data.get("text", "") | |
| # use annotated spans if any, else original ner_text | |
| ner_spans = data.get("ner_annotated", data.get("ner_text", [])) | |
| segments: List[Tuple[str, Optional[str]]] = [] | |
| last_idx = 0 | |
| for start, end, label in sorted(ner_spans, key=lambda x: x[0]): | |
| # slice in [start, end) since end is non-inclusive | |
| if start > last_idx: | |
| segments.append((text[last_idx:start], None)) | |
| segments.append((text[start:end], label)) | |
| last_idx = end | |
| if last_idx < len(text): | |
| segments.append((text[last_idx:], None)) | |
| return segments | |
| def align_spans_to_text(highlighted: List[Dict[str, Any]], text: str) -> List[Tuple[int, int, str]]: | |
| spans: List[Tuple[int, int, str]] = [] | |
| search_start = 0 | |
| for entry in highlighted: | |
| chunk = entry["token"] | |
| label = entry.get("class_or_confidence") or entry.get("class") or entry.get("label") | |
| pos = text.find(chunk, search_start) | |
| if pos >= 0: | |
| # new end is start + len(chunk) | |
| spans.append((pos, pos + len(chunk), label)) | |
| search_start = pos + len(chunk) | |
| else: | |
| print(f"โ ๏ธ Couldnโt align chunk: {chunk!r}") | |
| return spans | |
| # โโ Gradio demo โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def create_demo() -> gr.Blocks: | |
| data = load_initial_data() | |
| validated_store = load_all_validations() | |
| dynamic_dataset = DynamicDataset(data) | |
| def make_info(rec: Dict) -> str: | |
| fn = rec.get("filename", "โ") | |
| pg = rec.get("page", "โ") | |
| sg = rec.get("segment", "โ") | |
| return f"**File:** `{fn}` \n**Page:** `{pg}`\n**sSegment:** `{sg}`" | |
| def load_example(idx: int): | |
| # If thereโs a validated version, show that; otherwise fall back | |
| rec = validated_store.get(idx, dynamic_dataset.example(idx)) | |
| segs = prepare_for_highlight(rec) | |
| return segs, rec.get("validated", False), idx, make_info(rec) | |
| def update_example(highlighted, idx: int): | |
| # Always edit the dynamic data, not the validated copy. | |
| rec = dynamic_dataset.data[idx] | |
| text = rec.get("text", "") | |
| new_spans = align_spans_to_text(highlighted, text) | |
| # store edits as half-open | |
| rec["ner_annotated"] = new_spans | |
| rec["validated"] = False | |
| return prepare_for_highlight(rec), rec["validated"], idx, make_info(rec) | |
| def do_validate(highlighted, idx: int): | |
| # Edit dynamic data first | |
| rec = dynamic_dataset.data[idx] | |
| text = rec.get("text", "") | |
| new_spans = align_spans_to_text(highlighted, text) | |
| rec["ner_annotated"] = new_spans | |
| dynamic_dataset.validate() | |
| # Now push that validated copy to S3 and to validated_store | |
| rec_to_save = rec.copy() | |
| rec_to_save["validated"] = True | |
| save_single_validation(idx, rec_to_save) | |
| validated_store[idx] = rec_to_save | |
| return prepare_for_highlight(rec_to_save), True, make_info(rec_to_save) | |
| def nav(fn): | |
| # Move the index/cursor in dynamic_dataset | |
| _ = fn() | |
| idx = dynamic_dataset.current | |
| # If thereโs a validated version, show that; else show dynamic data | |
| rec = validated_store.get(idx, dynamic_dataset.data[idx]) | |
| segs = prepare_for_highlight(rec) | |
| return segs, rec.get("validated", False), idx, make_info(rec) | |
| with gr.Blocks() as demo: | |
| prog = gr.Slider( | |
| minimum=0, | |
| maximum=dynamic_dataset.len - 1, | |
| value=0, | |
| step=1, | |
| label="Example # (slide to navigate)", | |
| interactive=True, | |
| ) | |
| inp_box = gr.HighlightedText(label="Sentence", interactive=True) | |
| info_md = gr.Markdown(label="Source") | |
| status = gr.Checkbox(label="Validated?", value=False, interactive=False) | |
| gr.Markdown("[๐ Entity Tag Guide](https://huggingface.co/spaces/rafmacalaba/datause-annotation/blob/main/guidelines.md)") | |
| with gr.Row(): | |
| prev_btn = gr.Button("โ๏ธ Previous") | |
| apply_btn = gr.Button("๐ Apply Changes") | |
| next_btn = gr.Button("Next โถ๏ธ") | |
| with gr.Row(): | |
| skip_prev = gr.Button("โฎ๏ธ Prev Unvalidated") | |
| validate_btn = gr.Button("โ Validate") | |
| skip_next = gr.Button("โญ๏ธ Next Unvalidated") | |
| # โโโโโ Wiring events โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| prog.release( | |
| fn=load_example, | |
| inputs=[prog], | |
| outputs=[inp_box, status, prog, info_md], | |
| ) | |
| demo.load(load_example, inputs=prog, outputs=[inp_box, status, prog, info_md]) | |
| apply_btn.click(update_example, inputs=[inp_box, prog], outputs=[inp_box, status, prog, info_md]) | |
| prev_btn.click(lambda: nav(dynamic_dataset.prev), inputs=None, outputs=[inp_box, status, prog, info_md]) | |
| next_btn.click(lambda: nav(dynamic_dataset.next), inputs=None, outputs=[inp_box, status, prog, info_md]) | |
| skip_prev.click(lambda: nav(dynamic_dataset.jump_prev_unvalidated), inputs=None, outputs=[inp_box, status, prog, info_md]) | |
| skip_next.click(lambda: nav(dynamic_dataset.jump_next_unvalidated), inputs=None, outputs=[inp_box, status, prog, info_md]) | |
| validate_btn.click(do_validate, inputs=[inp_box, prog], outputs=[inp_box, status, info_md]) | |
| return demo | |
| if __name__ == "__main__": | |
| create_demo().launch(share=False, debug=True) |