Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Gradio app for validating dataset mentions from stratified validation sample. | |
| This app allows users to: | |
| 1. Review dataset mentions with context | |
| 2. Validate as dataset or non-dataset | |
| 3. Compare extraction model vs judge (GPT-5.2) | |
| 4. Track validation progress with live statistics | |
| Adapted from annotation_app.py for direct_judge validation workflow. | |
| Usage: | |
| python validation_annotation_app.py --input validation_sample.jsonl | |
| """ | |
| import gradio as gr | |
| import json | |
| import re | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional | |
| import argparse | |
| from datetime import datetime | |
| class ValidationAnnotator: | |
| """ | |
| Handle validation annotation logic and state management. | |
| Note: This works with stratified validation samples from direct_judge outputs. | |
| No 4o data available - only judge (GPT-5.2) verdicts are shown. | |
| """ | |
| def __init__(self, input_file: str): | |
| self.input_file = Path(input_file) | |
| self.output_file = self.input_file.parent / f"{self.input_file.stem}_human_validated.jsonl" | |
| # Load data | |
| self.records = self._load_records() | |
| self.annotations = self._load_annotations() | |
| # Build chunk index for navigation | |
| self._build_chunk_index() | |
| # Current position | |
| self.current_idx = 0 | |
| # Move to first unannotated record | |
| self._find_next_unannotated() | |
| def _load_records(self) -> List[Dict]: | |
| """Load records from input JSONL file.""" | |
| records = [] | |
| with open(self.input_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| records.append(json.loads(line)) | |
| return records | |
| def _build_chunk_index(self): | |
| """Build index mapping chunk_id to record indices.""" | |
| self.chunk_ids = [] # Ordered list of unique chunk_ids | |
| self.chunk_to_indices = {} # chunk_id -> list of record indices | |
| for idx, record in enumerate(self.records): | |
| chunk_id = record.get('chunk_id', f'unknown_{idx}') | |
| if chunk_id not in self.chunk_to_indices: | |
| self.chunk_ids.append(chunk_id) | |
| self.chunk_to_indices[chunk_id] = [] | |
| self.chunk_to_indices[chunk_id].append(idx) | |
| self.total_chunks = len(self.chunk_ids) | |
| self.total_datasets = len(self.records) | |
| def _get_chunk_info(self, idx: int) -> Tuple[int, int, int]: | |
| """Get chunk info for a given record index. | |
| Returns: (chunk_number, dataset_in_chunk, total_in_chunk) | |
| """ | |
| if idx >= len(self.records): | |
| return (0, 0, 0) | |
| record = self.records[idx] | |
| chunk_id = record.get('chunk_id', f'unknown_{idx}') | |
| chunk_number = self.chunk_ids.index(chunk_id) + 1 if chunk_id in self.chunk_ids else 0 | |
| chunk_indices = self.chunk_to_indices.get(chunk_id, [idx]) | |
| dataset_in_chunk = chunk_indices.index(idx) + 1 if idx in chunk_indices else 1 | |
| total_in_chunk = len(chunk_indices) | |
| return (chunk_number, dataset_in_chunk, total_in_chunk) | |
| def _load_annotations(self) -> Dict: | |
| """Load existing annotations if available.""" | |
| annotations = {} | |
| if self.output_file.exists(): | |
| with open(self.output_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| ann = json.loads(line) | |
| annotations[ann['sample_id']] = ann | |
| return annotations | |
| def _save_annotation(self, sample_id: int, verdict: str, notes: str = ""): | |
| """Save a single annotation to file.""" | |
| record = self.records[self.current_idx] | |
| # Determine if extraction/judge said dataset | |
| # Dataset = any tag that is NOT "non-dataset" (includes named, descriptive, vague) | |
| extraction_is_dataset = record['extraction_tag'] != 'non-dataset' | |
| judge_is_dataset = record['judge_tag'] != 'non-dataset' | |
| human_is_dataset = verdict == 'dataset' | |
| annotation = { | |
| 'sample_id': sample_id, | |
| 'text': record['text'], | |
| 'document': record['document'], | |
| 'stratum': record['stratum'], | |
| # Human annotation | |
| 'human_verdict': verdict, # 'dataset' or 'non-dataset' | |
| 'human_notes': notes, | |
| 'annotated_at': datetime.now().isoformat(), | |
| # Original extraction | |
| 'extraction_tag': record['extraction_tag'], | |
| 'extraction_confidence': record['extraction_confidence'], | |
| # Judge (GPT-5.2) | |
| 'judge_tag': record['judge_tag'], | |
| 'judge_confidence': record['judge_confidence'], | |
| 'judge_reasoning': record.get('judge_reasoning', ''), | |
| 'judge_data_type': record.get('judge_data_type', ''), | |
| # Computed agreements | |
| 'human_agrees_extraction': human_is_dataset == extraction_is_dataset, | |
| 'human_agrees_judge': human_is_dataset == judge_is_dataset, | |
| 'extraction_agrees_judge': extraction_is_dataset == judge_is_dataset, | |
| } | |
| # Update in-memory annotations | |
| self.annotations[sample_id] = annotation | |
| # Append to file | |
| with open(self.output_file, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(annotation, ensure_ascii=False) + '\n') | |
| def _extract_context(self, text: str, dataset_name: str, context_sentences: int = 1) -> list: | |
| """ | |
| Extract context around dataset mention and format for highlighting. | |
| Returns: | |
| List of tuples: [(text, label), ...] where label is "DATASET" for the dataset name | |
| """ | |
| if not text: | |
| return [(f"[No context available for '{dataset_name}']", None)] | |
| # Normalize text: remove line breaks and extra whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| dataset_name_clean = re.sub(r'\s+', ' ', dataset_name).strip() | |
| # Find the dataset name in text (case-insensitive) | |
| pattern = re.escape(dataset_name_clean) | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if not match: | |
| # Return full context without highlighting | |
| return [(text[:500] + "..." if len(text) > 500 else text, None)] | |
| # Get position of match | |
| start_pos = match.start() | |
| end_pos = match.end() | |
| # Get context around match | |
| context_start = max(0, start_pos - 200) | |
| context_end = min(len(text), end_pos + 200) | |
| before = ("..." if context_start > 0 else "") + text[context_start:start_pos] | |
| dataset = text[start_pos:end_pos] | |
| after = text[end_pos:context_end] + ("..." if context_end < len(text) else "") | |
| return [ | |
| (before, None), | |
| (dataset, "DATASET"), | |
| (after, None) | |
| ] | |
| def _is_annotated(self, idx: int) -> bool: | |
| """Check if a record has been annotated.""" | |
| sample_id = self.records[idx].get('sample_id', idx) | |
| return sample_id in self.annotations | |
| def _should_skip(self, idx: int) -> bool: | |
| """Check if record is a one-word vague/descriptive that should be skipped.""" | |
| if idx >= len(self.records): | |
| return False | |
| record = self.records[idx] | |
| text = record.get('text', '') | |
| word_count = len(text.split()) | |
| ext_tag = record.get('extraction_tag', '') | |
| judge_tag = record.get('judge_tag', '') | |
| # Skip one-word vague/descriptive mentions | |
| skip_tags = {'vague', 'descriptive'} | |
| if word_count == 1 and (ext_tag in skip_tags or judge_tag in skip_tags): | |
| return True | |
| return False | |
| def _find_next_unannotated(self): | |
| """Find the next unannotated record (skipping one-word vague/descriptive).""" | |
| for i in range(len(self.records)): | |
| if not self._is_annotated(i) and not self._should_skip(i): | |
| self.current_idx = i | |
| return | |
| # All annotated or skippable | |
| self.current_idx = len(self.records) | |
| def get_current_display(self) -> Tuple[str, list, str, str, str, str, Dict]: | |
| """Get current record for display.""" | |
| if self.current_idx >= len(self.records): | |
| return "π All samples validated!", [], "", "", f"Progress: {len(self.annotations)}/{len(self.records)} (100%)", "β Complete", {} | |
| record = self.records[self.current_idx] | |
| # Get context with highlighting | |
| context = self._extract_context( | |
| record.get('full_context', '') or record.get('usage_context', ''), | |
| record['text'] | |
| ) | |
| # Build AI verdicts (Judge only - no 4o in direct_judge) | |
| # Dataset = any tag that is NOT "non-dataset" (includes named, descriptive, vague) | |
| ai_verdicts_str = "" | |
| # Extraction model verdict | |
| # Dataset if tag is NOT "non-dataset" | |
| ext_tag = record['extraction_tag'] | |
| ext_is_dataset = ext_tag != 'non-dataset' | |
| ext_emoji = "β" if ext_is_dataset else "β" | |
| ai_verdicts_str = f"### π€ Extraction Model:\n" | |
| ai_verdicts_str += f"**Verdict:** {ext_emoji} {'Dataset' if ext_is_dataset else 'Non-Dataset'}\n" | |
| ai_verdicts_str += f"**Tag:** `{ext_tag}`\n" | |
| ai_verdicts_str += f"**Confidence:** {record['extraction_confidence']:.1%}\n" | |
| # Judge (GPT-5.2) verdict | |
| # Dataset if tag is NOT "non-dataset" | |
| judge_tag = record['judge_tag'] | |
| judge_is_dataset = judge_tag != 'non-dataset' | |
| judge_emoji = "β" if judge_is_dataset else "β" | |
| ai_verdicts_str += f"\n### π§ββοΈ Judge (GPT-5.2):\n" | |
| ai_verdicts_str += f"**Verdict:** {judge_emoji} {'Dataset' if judge_is_dataset else 'Non-Dataset'}\n" | |
| ai_verdicts_str += f"**Tag:** `{judge_tag}`\n" | |
| ai_verdicts_str += f"**Confidence:** {record['judge_confidence']:.1%}\n" | |
| if record.get('judge_data_type'): | |
| ai_verdicts_str += f"**Data Type:** {record['judge_data_type']}\n" | |
| if record.get('judge_reasoning'): | |
| reasoning = record['judge_reasoning'][:300] | |
| ai_verdicts_str += f"\n*Reasoning:* {reasoning}..." | |
| # Metadata | |
| metadata_parts = [] | |
| metadata_parts.append(f"**Stratum:** `{record['stratum']}`") | |
| metadata_parts.append(f"**Document:** `{record['document'][:50]}...`") | |
| is_primary = record.get('is_primary', True) | |
| metadata_parts.append(f"**Type:** {'Primary sample' if is_primary else 'Sibling (same chunk)'}") | |
| if record.get('geography'): | |
| geo = record['geography'] | |
| if isinstance(geo, dict): | |
| geo = geo.get('text', str(geo)) | |
| metadata_parts.append(f"**Geography:** {geo}") | |
| metadata_str = "\n".join(metadata_parts) | |
| # Get chunk info | |
| chunk_num, ds_in_chunk, total_in_chunk = self._get_chunk_info(self.current_idx) | |
| # Progress: N/N-max datasets | |
| annotated = len(self.annotations) | |
| progress = f"Datasets: {annotated}/{self.total_datasets} ({annotated/self.total_datasets*100:.1f}%)" | |
| # Status | |
| is_annotated = self._is_annotated(self.current_idx) | |
| if is_annotated: | |
| ann = self.annotations.get(record.get('sample_id', self.current_idx), {}) | |
| status = f"β Validated as: {ann.get('human_verdict', 'unknown')}" | |
| else: | |
| status = "β Pending Validation" | |
| # Navigation info with chunk details | |
| nav = { | |
| 'chunk_info': f"Input Text: {chunk_num}/{self.total_chunks}", | |
| 'dataset_in_chunk': f"Dataset: {ds_in_chunk}/{total_in_chunk} in this chunk", | |
| 'record_info': f"Overall: {self.current_idx + 1}/{self.total_datasets}", | |
| 'can_prev': self.current_idx > 0, | |
| 'can_next': self.current_idx < self.total_datasets - 1 | |
| } | |
| return record['text'], context, metadata_str, ai_verdicts_str, progress, status, nav | |
| def annotate(self, verdict: str, notes: str = "") -> Tuple[str, list, str, str, str, str]: | |
| """Annotate current record and move to next.""" | |
| if self.current_idx < len(self.records): | |
| record = self.records[self.current_idx] | |
| sample_id = record.get('sample_id', self.current_idx) | |
| self._save_annotation(sample_id, verdict, notes) | |
| self.next_record() | |
| return self.get_current_display()[:6] | |
| def next_record(self): | |
| """Move to next record.""" | |
| if self.current_idx < len(self.records) - 1: | |
| self.current_idx += 1 | |
| def prev_record(self): | |
| """Move to previous record.""" | |
| if self.current_idx > 0: | |
| self.current_idx -= 1 | |
| def skip_to_next_unannotated(self): | |
| """Skip to next unannotated record (also skipping one-word vague/descriptive).""" | |
| for i in range(self.current_idx + 1, len(self.records)): | |
| if not self._is_annotated(i) and not self._should_skip(i): | |
| self.current_idx = i | |
| return | |
| def get_statistics(self) -> str: | |
| """Get current annotation statistics as markdown.""" | |
| if not self.annotations: | |
| return "_No annotations yet_" | |
| total = len(self.annotations) | |
| human_dataset = sum(1 for a in self.annotations.values() if a['human_verdict'] == 'dataset') | |
| human_non = total - human_dataset | |
| agrees_ext = sum(1 for a in self.annotations.values() if a['human_agrees_extraction']) | |
| agrees_judge = sum(1 for a in self.annotations.values() if a['human_agrees_judge']) | |
| stats = f"""**Annotated:** {total}/{len(self.records)} | |
| **Human Verdicts:** | |
| - Dataset: {human_dataset} | |
| - Non-Dataset: {human_non} | |
| **Agreement Rates:** | |
| - Extraction Model: {agrees_ext/total*100:.1f}% | |
| - Judge (GPT-5.2): {agrees_judge/total*100:.1f}% | |
| """ | |
| return stats | |
| def create_app(input_file: str): | |
| """Create and configure Gradio app.""" | |
| annotator = ValidationAnnotator(input_file) | |
| # Custom CSS for the green button and dark mode toggle | |
| css = """ | |
| #accept_btn { | |
| background-color: #22c55e !important; | |
| color: white !important; | |
| } | |
| #accept_btn:hover { | |
| background-color: #16a34a !important; | |
| } | |
| #theme_toggle { | |
| position: fixed; | |
| top: 10px; | |
| right: 10px; | |
| z-index: 1000; | |
| padding: 8px 16px; | |
| border-radius: 20px; | |
| cursor: pointer; | |
| font-size: 14px; | |
| } | |
| """ | |
| # JavaScript for dark mode toggle | |
| js = """ | |
| function toggleDarkMode() { | |
| const body = document.body; | |
| const isDark = body.classList.contains('dark'); | |
| if (isDark) { | |
| body.classList.remove('dark'); | |
| localStorage.setItem('theme', 'light'); | |
| document.getElementById('theme_toggle').textContent = 'π Dark Mode'; | |
| } else { | |
| body.classList.add('dark'); | |
| localStorage.setItem('theme', 'dark'); | |
| document.getElementById('theme_toggle').textContent = 'βοΈ Light Mode'; | |
| } | |
| } | |
| // Apply saved theme on load | |
| document.addEventListener('DOMContentLoaded', function() { | |
| const savedTheme = localStorage.getItem('theme'); | |
| if (savedTheme === 'dark') { | |
| document.body.classList.add('dark'); | |
| const btn = document.getElementById('theme_toggle'); | |
| if (btn) btn.textContent = 'βοΈ Light Mode'; | |
| } | |
| }); | |
| """ | |
| with gr.Blocks(title="Dataset Annotation Tool", css=css, js=js) as app: | |
| # Theme toggle button | |
| gr.HTML('<button id="theme_toggle" onclick="toggleDarkMode()">π Dark Mode</button>') | |
| gr.Markdown("# π Dataset Annotation Tool") | |
| gr.Markdown("Review and annotate dataset mentions. Each annotation is saved in real-time.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| dataset_name = gr.Textbox(label="Dataset Name", interactive=False, max_lines=2) | |
| context_box = gr.HighlightedText( | |
| label="Context (Β±1 sentence, dataset highlighted)", | |
| color_map={"DATASET": "yellow"}, | |
| show_legend=False, | |
| combine_adjacent=True | |
| ) | |
| metadata_box = gr.Markdown(label="Metadata") | |
| show_ai_checkbox = gr.Checkbox(label="π€ Show what the AI thinks", value=False) | |
| ai_verdicts_box = gr.Markdown(label="AI Analysis", visible=False) | |
| with gr.Column(scale=1): | |
| progress_box = gr.Textbox(label="Progress", interactive=False, lines=1) | |
| chunk_info_box = gr.Textbox(label="Input Text Position", interactive=False, lines=1) | |
| dataset_in_chunk_box = gr.Textbox(label="Dataset in Chunk", interactive=False, lines=1) | |
| status_box = gr.Textbox(label="Status", interactive=False, lines=1) | |
| notes_box = gr.Textbox( | |
| label="Notes (optional)", | |
| placeholder="Add any comments about this dataset...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| accept_btn = gr.Button("β DATASET", variant="primary", size="lg", elem_id="accept_btn") | |
| reject_btn = gr.Button("β NOT A DATASET", variant="stop", size="lg") | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| prev_btn = gr.Button("β Previous", size="sm") | |
| next_btn = gr.Button("Next β", size="sm") | |
| skip_btn = gr.Button("βοΈ Skip to Next Unannotated", size="sm") | |
| gr.Markdown("---") | |
| with gr.Accordion("π Live Statistics", open=True): | |
| stats_box = gr.Markdown() | |
| gr.Markdown("---") | |
| gr.Markdown(f"**Input:** `{Path(input_file).name}`") | |
| gr.Markdown(f"**Output:** `{annotator.output_file.name}`") | |
| nav_state = gr.State({}) | |
| def update_display(): | |
| name, context, metadata, ai_verdicts, progress, status, nav = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, nav, stats | |
| def accept_and_next(notes): | |
| name, context, metadata, ai_verdicts, progress, status = annotator.annotate('dataset', notes) | |
| _, _, _, _, _, _, nav = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, "", nav, stats | |
| def reject_and_next(notes): | |
| name, context, metadata, ai_verdicts, progress, status = annotator.annotate('non-dataset', notes) | |
| _, _, _, _, _, _, nav = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, "", nav, stats | |
| def go_next(): | |
| annotator.next_record() | |
| return update_display() | |
| def go_prev(): | |
| annotator.prev_record() | |
| return update_display() | |
| def skip_unannotated(): | |
| annotator.skip_to_next_unannotated() | |
| return update_display() | |
| def toggle_ai_verdicts(show_ai): | |
| return gr.update(visible=show_ai) | |
| # Outputs - updated with chunk_info and dataset_in_chunk | |
| outputs_list = [dataset_name, context_box, metadata_box, ai_verdicts_box, progress_box, chunk_info_box, dataset_in_chunk_box, status_box, nav_state, stats_box] | |
| outputs_annotate = [dataset_name, context_box, metadata_box, ai_verdicts_box, progress_box, chunk_info_box, dataset_in_chunk_box, status_box, notes_box, nav_state, stats_box] | |
| accept_btn.click(accept_and_next, inputs=[notes_box], outputs=outputs_annotate) | |
| reject_btn.click(reject_and_next, inputs=[notes_box], outputs=outputs_annotate) | |
| next_btn.click(go_next, outputs=outputs_list) | |
| prev_btn.click(go_prev, outputs=outputs_list) | |
| skip_btn.click(skip_unannotated, outputs=outputs_list) | |
| show_ai_checkbox.change(toggle_ai_verdicts, inputs=[show_ai_checkbox], outputs=[ai_verdicts_box]) | |
| app.load(update_display, outputs=outputs_list) | |
| return app | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Validation annotation Gradio app") | |
| parser.add_argument( | |
| "--input", | |
| type=str, | |
| default="/Users/rafaelmacalaba/WBG/monitoring_of_datause/revalidation/analysis/unhcr_reliefweb/validation/validation_sample.jsonl", | |
| help="Input JSONL file with validation samples" | |
| ) | |
| parser.add_argument( | |
| "--share", | |
| action="store_true", | |
| help="Create a public share link" | |
| ) | |
| parser.add_argument( | |
| "--port", | |
| type=int, | |
| default=7860, | |
| help="Port to run the app on (default: 7860)" | |
| ) | |
| args = parser.parse_args() | |
| if not Path(args.input).exists(): | |
| print(f"Error: Input file not found: {args.input}") | |
| print("\nRun the sampling script first:") | |
| print(" python sample_for_validation.py") | |
| return | |
| app = create_app(args.input) | |
| app.launch(share=args.share, server_port=args.port) | |
| if __name__ == "__main__": | |
| main() | |