import gradio as gr import os import csv import fcntl from datetime import datetime import uuid import yaml # You need to install this: pip install pyyaml import glob import random import json import pandas as pd import io # --- Hugging Face Functionality Notes --- # To save results to a private Hugging Face dataset, you must: # 1. Install the required libraries: pip install huggingface_hub datasets # 2. Set the following environment variables before running the script: # - HF_TOKEN: Your Hugging Face access token with write permissions. # - HF_DATASET_ID: The ID of the private dataset repo (e.g., "username/my-dataset"). # If these are not set, saving to HF Hub will be skipped. # --- Start of Local Mode Implementation --- IS_LOCAL_MODE = os.environ.get("GRADIO_LOCAL_MODE", "false").lower() in ["true", "1"] if IS_LOCAL_MODE: print("Running in LOCAL mode. Hugging Face functionalities are disabled.") HfApi = None else: try: from huggingface_hub import HfApi, hf_hub_download print("Hugging Face libraries found. HF push functionality is available.") except ImportError: print("Hugging Face libraries not found. HF push functionality will be disabled.") HfApi = None # --- End of Local Mode Implementation --- # --- Configuration Loading --- def load_config(config_path='config.yaml'): """Loads the UI and criteria configuration from a YAML file.""" try: with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if 'criteria' not in config or not isinstance(config['criteria'], list): raise ValueError("Config must contain a list of 'criteria'.") return config except FileNotFoundError: return None except Exception as e: print(f"ERROR: Could not parse {config_path}: {e}") return None def find_config_files(): """Finds all .yaml and .yml files in the root directory.""" return glob.glob("*.yaml") + glob.glob("*.yml") # --- Static & File I/O Functions --- OUTPUT_CSV = "responses.csv" MAX_CRITERIA = 15 # Maximum number of sliders to support def list_samples(samples_dir): """Lists audio files from a specified directory.""" if not os.path.isdir(samples_dir): print(f"WARNING: Samples directory '{samples_dir}' not found.") return [] files = [f for f in os.listdir(samples_dir) if f.lower().endswith(('.wav', '.mp3', '.ogg', '.flac'))] files.sort() return files def save_responses_to_hf(rows, repo_id: str | None = None, token: str | None = None): """ Append new rows to a CSV file in a private Hugging Face dataset. - Reads the existing CSV (if present). - Appends new rows. - Uploads the updated file back to the repo. Each 'row' should be a dict with consistent keys. NOTE: - Replaces the entire CSV on each update (no true append on the server side). - Use small/medium datasets; large ones should use the `datasets` library instead. """ if HfApi is None: return {"status": "hf_unavailable", "reason": "missing_packages"} token = token or os.environ.get("HF_TOKEN") repo_id = repo_id or os.environ.get("HF_DATASET_ID") if not token or not repo_id: return {"status": "hf_skipped", "reason": "missing_token_or_repo_env"} api = HfApi(token=token) path_in_repo = "data/responses.csv" # fixed CSV location in repo repo_err = None # Ensure dataset exists try: api.create_repo(repo_id=repo_id, repo_type="dataset", private=True, exist_ok=True) except Exception as e: repo_err = str(e) # Try downloading existing CSV existing_df = pd.DataFrame() try: local_path = hf_hub_download( repo_id=repo_id, filename=path_in_repo, repo_type="dataset", token=token, ) existing_df = pd.read_csv(local_path) except Exception as e: print("file", path_in_repo, "couldn't be found / read", str(e)) # File doesn't exist or is unreadable — start fresh pass # Convert new rows to DataFrame and append new_df = pd.DataFrame(rows) combined_df = pd.concat([existing_df, new_df], ignore_index=True) print(combined_df) # Save to memory as CSV csv_buffer = io.StringIO() combined_df.to_csv(csv_buffer, index=False) csv_bytes = csv_buffer.getvalue().encode("utf-8") # Upload the updated CSV try: api.upload_file( path_or_fileobj=csv_bytes, path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", ) except Exception as e: print(str(e)) return {"status": "hf_push_error", "error": str(e), "repo_error": repo_err} return {"status": "hf_pushed", "rows_added": len(rows), "repo": repo_id, "repo_error": repo_err} def _save_responses_to_hf(rows, repo_id: str | None = None, token: str | None = None): """ Push a list of dict rows to a private HF dataset, one JSON file per row. NOTE: This approach saves each response as an individual file. While this prevents data loss from overwriting a single file, be aware of the following: - Performance: Uploading many small files can be slower than a single large one. - Scalability: A very large number of files (e.g., millions) can make the dataset repository unwieldy to browse or clone. - Loading Data: To load this data back into a `datasets.Dataset` object, you will need to point to the specific files, for example: `load_dataset('json', data_files='path/to/your/repo/data/*.json')` """ if HfApi is None: return {"status": "hf_unavailable", "reason": "missing_packages"} token = token or os.environ.get("HF_TOKEN") repo_id = repo_id or os.environ.get("HF_DATASET_ID") if not token or not repo_id: return {"status": "hf_skipped", "reason": "missing_token_or_repo_env"} api = HfApi(token=token) repo_err = None try: api.create_repo(repo_id=repo_id, repo_type="dataset", private=True, exist_ok=True) except Exception as e: repo_err = str(e) # Process each row, uploading it as a separate JSON file num_pushed = 0 errors = [] for row_dict in rows: try: # Create a unique filename. Using a UUID is the most robust method. filename = f"{uuid.uuid4()}.json" # Place files in a 'data' subdirectory to keep the repo root clean. path_in_repo = f"data/{filename}" # Convert the dictionary to JSON bytes for uploading json_bytes = json.dumps(row_dict, indent=2).encode("utf-8") api.upload_file( path_or_obj=json_bytes, path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", ) num_pushed += 1 except Exception as e: errors.append(str(e)) if errors: print("json errors", errors, "repo errors", repo_err) return {"status": "hf_push_error", "pushed": num_pushed, "total": len(rows), "errors": errors, "repo_error": repo_err} return {"status": "hf_pushed", "rows": len(rows), "repo": repo_id, "repo_error": repo_err} def save_response(sample, audio_path, annotator, session_id, user_email, comment, scores, config): """Saves a response row locally and attempts to push to Hugging Face Hub.""" os.makedirs(os.path.dirname(OUTPUT_CSV) or '.', exist_ok=True) criteria_labels = [c['label'] for c in config['criteria']] header = ["timestamp", "sample", "audio_path", "annotator", "session_id", "user_email"] + criteria_labels + ["comment"] active_scores = list(scores)[:len(criteria_labels)] row = [datetime.utcnow().isoformat(), sample, audio_path, annotator, session_id, user_email] + active_scores + [comment] write_header = not os.path.exists(OUTPUT_CSV) with open(OUTPUT_CSV, "a", newline='', encoding='utf-8') as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX) except Exception: pass writer = csv.writer(f) if write_header: writer.writerow(header) writer.writerow(row) try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except Exception: pass # --- Hugging Face Push Logic --- hf_result = None if not IS_LOCAL_MODE: try: hf_record = dict(zip(header, row)) hf_result = save_responses_to_hf([hf_record]) except Exception as e: print(e) hf_result = {"status": "hf_error", "error": str(e)} return {"status": "saved", "sample": sample, "hf": hf_result} # --- Gradio UI Definition --- def make_ui(): def make_explainer_fn(criterion_index): def explainer(value, config): if not config or criterion_index >= len(config.get('criteria', [])): return "" criterion = config['criteria'][criterion_index] try: iv = int(value) except (ValueError, TypeError): iv = value text = criterion['explanations'].get(iv, "No description for this score.") return f"**{criterion['label']} ({iv}/{criterion['max']}):** {text}" return explainer #with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as demo: with gr.Blocks() as demo: # --- STATE MANAGEMENT --- samples_list = gr.State() current_index = gr.State(0) config_state = gr.State() session_id_global = gr.State() # --- SETUP UI (Visible at start) --- with gr.Group() as setup_group: gr.Markdown("# Evaluation Setup") gr.Markdown("Please provide your details and select the evaluation setup to begin.") #config_dropdown = gr.Dropdown(choices=find_config_files(), label="Select Evaluation", value=find_config_files()[0] if find_config_files() else "") config_dropdown = gr.Dropdown(choices=find_config_files(), label="Select Evaluation", value=None) # if find_config_files() else "") instructions_md = gr.Markdown(visible=False, elem_classes="instructions") with gr.Accordion("Annotator Info", open=True): annotator_global = gr.Textbox(label="Annotator ID (automatically generated for you)", lines=1) user_email_global = gr.Textbox(label="User email (optional)", lines=1) start_button = gr.Button("Start Evaluation", variant="primary") config_error_md = gr.Markdown("", visible=False) # --- MAIN EVALUATION UI (Initially hidden) --- with gr.Group(visible=False) as main_group: title_md = gr.Markdown("# Evaluation UI") header_md = gr.Markdown("") progress_md = gr.Markdown("Sample 1 of X") progress_bar = gr.Progress(track_tqdm=False) with gr.Row(): with gr.Column(scale=1, variant='panel'): sample_name_md = gr.Markdown("### Audio File") gr.Markdown("---") evaluation_audio = gr.Audio(label="Audio for Evaluation") gr.Markdown("---") submit_btn = gr.Button("Save & Next", variant="primary", interactive=False) status = gr.Textbox(label="Status", interactive=False) with gr.Column(scale=2, variant='panel'): gr.Markdown("### Scoring Criteria") slider_explanation_md = gr.Markdown("_Move a slider to see the description for each score._") gr.Markdown("---") sliders = [gr.Slider(visible=False, interactive=True) for _ in range(MAX_CRITERIA)] gr.Markdown("---") comment = gr.Textbox(label="Comments (optional)", lines=4, placeholder="Enter any additional feedback here...") # --- UI ELEMENT LISTS --- main_ui_elements = [ title_md, header_md, progress_md, sample_name_md, evaluation_audio, slider_explanation_md, comment, submit_btn, status, *sliders ] # --- LOGIC & EVENTS --- def load_sample(samples, index, config): total_samples = len(samples) updates = {} if index >= total_samples: completion_msg = f"**All {total_samples} samples completed! Thank you!**" for el in main_ui_elements: updates[el] = gr.update(visible=False) updates[progress_md] = gr.update(value=completion_msg, visible=True) updates[status] = gr.update(value="Finished.", visible=True) return updates sample = samples[index] samples_dir = config.get('samples_directory', 'sample-audios') sample_path = os.path.join(samples_dir, sample) sample_exists = os.path.exists(sample_path) updates = { progress_md: gr.update(value=f"Sample **{index + 1}** of **{total_samples}**", visible=True), sample_name_md: gr.update(value=f"### File: `{sample}`", visible=True), evaluation_audio: gr.update(value=sample_path if sample_exists else None, visible=sample_exists), slider_explanation_md: gr.update(value="_Move a slider to see the description for each score._", visible=True), comment: gr.update(value="", visible=True), submit_btn: gr.update(value="Play audio to enable", interactive=False, visible=True), status: gr.update(value="Ready.", visible=True) } num_criteria = len(config['criteria']) for i in range(MAX_CRITERIA): if i < num_criteria: criterion = config['criteria'][i] updates[sliders[i]] = gr.update( label=criterion['label'], minimum=criterion['min'], maximum=criterion['max'], step=criterion['step'], value=criterion['default'], visible=True ) else: updates[sliders[i]] = gr.update(visible=False, value=0) return updates def enable_submit_button(): return gr.update(value="Save & Next", interactive=True) def update_instructions(config_path): if not config_path: return gr.update(value="", visible=False) config = load_config(config_path) if config and 'instructions_markdown' in config: return gr.update(value=config['instructions_markdown'], visible=True) return gr.update(value="", visible=False) def start_session(config_path, annotator_input=None): if not config_path or not os.path.exists(config_path): return {config_error_md: gr.update(value="**Error:** Please select a valid configuration file.", visible=True)} config = load_config(config_path) if config is None: return {config_error_md: gr.update(value=f"**Error:** Could not load or parse `{config_path}`. Check console for details.", visible=True)} samples_dir = config.get('samples_directory', 'sample-audios') should_randomize = config.get('randomize_samples', False) s_list = list_samples(samples_dir) if not s_list: return {config_error_md: gr.update(value=f"**Error:** No audio files found in directory: `{samples_dir}`", visible=True)} if should_randomize: random.shuffle(s_list) session_id = str(uuid.uuid4()) index = 0 updates = { setup_group: gr.update(visible=False), main_group: gr.update(visible=True), config_error_md: gr.update(visible=False), title_md: gr.update(value=f"# {config.get('title', 'Evaluation UI')}"), header_md: gr.update(value=config.get('header_markdown', '')), config_state: config, session_id_global: session_id, samples_list: s_list, current_index: index, } # Determine annotator ID: use provided value or generate a random one if annotator_input and str(annotator_input).strip(): annotator = str(annotator_input).strip() else: annotator = f"anon-{uuid.uuid4().hex[:8]}" # Update annotator textbox in the setup UI so the user sees their assigned ID updates[annotator_global] = gr.update(value=annotator) sample_updates = load_sample(s_list, index, config) updates.update(sample_updates) return updates def save_and_next(index, samples, annotator, sid, email, comment, config, *scores): sample = samples[index] samples_dir = config.get('samples_directory', 'sample-audios') sample_path = os.path.join(samples_dir, sample) save_status = save_response(sample, sample_path, annotator, sid, email, comment, scores, config) next_index = index + 1 total_samples = len(samples) # Update progress bar progress_value = (next_index) / total_samples if total_samples > 0 else 0 progress_bar(progress_value) updates_dict = load_sample(samples, next_index, config) # Provide more detailed status, including HF info if available status_message = f"Saved {sample} locally." if save_status.get('hf'): hf_stat = save_status['hf'].get('status', 'hf_unknown') status_message += f" HF status: {hf_stat}." updates_dict[status] = gr.update(value=status_message) ordered_updates = [updates_dict.get(el) for el in main_ui_elements] return [next_index] + ordered_updates # --- Event Wiring --- config_dropdown.change( update_instructions, inputs=[config_dropdown], outputs=[instructions_md] ).then(None, None, None, js="() => { document.getElementById('component-0').scrollIntoView(); }") start_button.click( start_session, inputs=[config_dropdown, annotator_global], outputs=[ setup_group, main_group, config_error_md, annotator_global, *main_ui_elements, config_state, session_id_global, samples_list, current_index ] ) submit_btn.click( save_and_next, inputs=[current_index, samples_list, annotator_global, session_id_global, user_email_global, comment, config_state, *sliders], outputs=[current_index, *main_ui_elements] ) for i, slider in enumerate(sliders): slider.change(make_explainer_fn(i), inputs=[slider, config_state], outputs=[slider_explanation_md]) evaluation_audio.play(fn=enable_submit_button, inputs=None, outputs=[submit_btn]) demo.load(update_instructions, inputs=config_dropdown, outputs=instructions_md) return demo if __name__ == "__main__": app = make_ui() app.launch(server_name="0.0.0.0", server_port=7860)