Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import json | |
| import uuid | |
| import shutil | |
| from datetime import datetime | |
| from app.logger_config import logger as logging | |
| import gradio as gr | |
| # TMP_DIR = "/tmp/canary_aed_streaming" | |
| TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming") | |
| ACTIVE_SESSIONS_FILE = os.path.join(TMP_DIR, "active_sessions.json") | |
| # --------------------------- | |
| # Helper to manage the JSON | |
| # --------------------------- | |
| def _read_sessions(): | |
| if not os.path.exists(ACTIVE_SESSIONS_FILE): | |
| return {} | |
| try: | |
| with open(ACTIVE_SESSIONS_FILE, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| def _write_sessions(data): | |
| os.makedirs(os.path.dirname(ACTIVE_SESSIONS_FILE), exist_ok=True) | |
| with open(ACTIVE_SESSIONS_FILE, "w") as f: | |
| json.dump(data, f, indent=2) | |
| # --------------------------- | |
| # LOAD | |
| # --------------------------- | |
| def on_load(request: gr.Request): | |
| """Called when a new visitor opens the app.""" | |
| sid = request.session_hash # ✅ Directly use session_hash as unique ID | |
| sessions = _read_sessions() | |
| sessions[sid] = { | |
| "session_id": sid, | |
| "file": "", | |
| "start_time": datetime.utcnow().strftime("%H:%M:%S"), | |
| "status": "active", | |
| } | |
| _write_sessions(sessions) | |
| logging.info(f"[{sid}] Session registered (on_load).") | |
| return sid, sid # can be used as gr.State + display | |
| # --------------------------- | |
| # UNLOAD | |
| # --------------------------- | |
| def on_unload(request: gr.Request): | |
| """Called when the visitor closes or refreshes the app.""" | |
| sid = request.session_hash | |
| sessions = _read_sessions() | |
| if sid in sessions: | |
| create_stop_flag(sid) | |
| sessions.pop(sid) | |
| _write_sessions(sessions) | |
| remove_session_data(sid) | |
| unregister_session(sid) | |
| logging.info(f"[{sid}] Session removed (on_unload).") | |
| else: | |
| logging.info(f"[{sid}] No active session found to remove.") | |
| def ensure_tmp_dir(): | |
| """Ensures the base temporary directory exists.""" | |
| try: | |
| os.makedirs(TMP_DIR, exist_ok=True) | |
| except Exception as e: | |
| logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}") | |
| def reset_all_active_sessions(): | |
| """Removes all temporary session files and folders at startup.""" | |
| ensure_tmp_dir() | |
| try: | |
| # --- Remove active sessions file --- | |
| if os.path.exists(ACTIVE_SESSIONS_FILE): | |
| os.remove(ACTIVE_SESSIONS_FILE) | |
| logging.info("Active sessions file reset at startup.") | |
| else: | |
| logging.debug("No active sessions file found to reset.") | |
| # --- Clean progress files --- | |
| for f in os.listdir(TMP_DIR): | |
| if f.startswith("progress_") and f.endswith(".json"): | |
| path = os.path.join(TMP_DIR, f) | |
| try: | |
| os.remove(path) | |
| logging.debug(f"Removed leftover progress file: {f}") | |
| except Exception as e: | |
| logging.warning(f"Failed to remove progress file {f}: {e}") | |
| # --- Clean all flag files (stream + transcribe) --- | |
| for f in os.listdir(TMP_DIR): | |
| if ( | |
| f.startswith("stream_stop_flag_") | |
| or f.startswith("transcribe_stop_flag_") | |
| or f.startswith("transcribe_active_") | |
| ) and f.endswith(".txt"): | |
| path = os.path.join(TMP_DIR, f) | |
| try: | |
| os.remove(path) | |
| logging.debug(f"Removed leftover flag file: {f}") | |
| except Exception as e: | |
| logging.warning(f"Failed to remove flag file {f}: {e}") | |
| # --- Clean chunk directories --- | |
| for name in os.listdir(TMP_DIR): | |
| path = os.path.join(TMP_DIR, name) | |
| if os.path.isdir(path) and name.startswith("chunks_"): | |
| try: | |
| shutil.rmtree(path) | |
| logging.debug(f"Removed leftover chunk folder: {name}") | |
| except Exception as e: | |
| logging.warning(f"Failed to remove chunk folder {name}: {e}") | |
| logging.info("Temporary session cleanup completed successfully.") | |
| except Exception as e: | |
| logging.error(f"Error resetting active sessions: {e}") | |
| def remove_session_data(session_id: str): | |
| """Removes all temporary files and data related to a specific session.""" | |
| if not session_id: | |
| logging.warning("reset_session() called without a valid session_id.") | |
| return | |
| try: | |
| # --- Remove session from active_sessions.json --- | |
| if os.path.exists(ACTIVE_SESSIONS_FILE): | |
| try: | |
| with open(ACTIVE_SESSIONS_FILE, "r") as f: | |
| data = json.load(f) | |
| if session_id in data: | |
| data.pop(session_id) | |
| with open(ACTIVE_SESSIONS_FILE, "w") as f: | |
| json.dump(data, f, indent=2) | |
| logging.debug(f"[{session_id}] Removed from active_sessions.json.") | |
| except Exception as e: | |
| logging.warning(f"[{session_id}] Failed to update active_sessions.json: {e}") | |
| # --- Define all possible session file patterns --- | |
| files_to_remove = [ | |
| f"progress_{session_id}.json", | |
| # f"stream_stop_flag_{session_id}.txt", | |
| f"transcribe_stop_flag_{session_id}.txt", | |
| f"transcribe_active_{session_id}.txt", | |
| ] | |
| # --- Remove all temporary files --- | |
| for fname in files_to_remove: | |
| path = os.path.join(TMP_DIR, fname) | |
| if os.path.exists(path): | |
| try: | |
| os.remove(path) | |
| logging.debug(f"[{session_id}] Removed file: {fname}") | |
| except Exception as e: | |
| logging.warning(f"[{session_id}] Failed to remove file {fname}: {e}") | |
| # --- Remove chunk folder if exists --- | |
| chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_id}") | |
| if os.path.isdir(chunk_dir): | |
| try: | |
| shutil.rmtree(chunk_dir) | |
| logging.debug(f"[{session_id}] Removed chunk folder: chunks_{session_id}") | |
| except Exception as e: | |
| logging.warning(f"[{session_id}] Failed to remove chunk folder: {e}") | |
| logging.info(f"[{session_id}] Session fully reset.") | |
| except Exception as e: | |
| logging.error(f"[{session_id}] Error during reset_session: {e}") | |
| def generate_session_id() -> str: | |
| """Generates a unique session ID.""" | |
| sid = str(uuid.uuid4()) | |
| logging.debug(f"[{sid}] New session created.") | |
| return sid | |
| def register_session(session_id: str, filepath: str): | |
| """Registers a new session.""" | |
| ensure_tmp_dir() | |
| data = {} | |
| if os.path.exists(ACTIVE_SESSIONS_FILE): | |
| with open(ACTIVE_SESSIONS_FILE, "r") as f: | |
| try: | |
| data = json.load(f) | |
| except Exception: | |
| data = {} | |
| data[session_id] = { | |
| "session_id": session_id, | |
| "file": filepath, | |
| "start_time": datetime.utcnow().strftime("%H:%M:%S"), | |
| "status": "active", | |
| } | |
| with open(ACTIVE_SESSIONS_FILE, "w") as f: | |
| json.dump(data, f) | |
| logging.debug(f"[{session_id}] Session registered in active_sessions.json.") | |
| def unregister_session(session_id: str): | |
| """Removes a session from the registry.""" | |
| if not os.path.exists(ACTIVE_SESSIONS_FILE): | |
| return | |
| try: | |
| with open(ACTIVE_SESSIONS_FILE, "r") as f: | |
| data = json.load(f) | |
| if session_id in data: | |
| data.pop(session_id) | |
| with open(ACTIVE_SESSIONS_FILE, "w") as f: | |
| json.dump(data, f) | |
| logging.debug(f"[{session_id}] Session unregistered.") | |
| except Exception as e: | |
| logging.error(f"[{session_id}] Error unregistering session: {e}") | |
| def get_active_sessions(): | |
| """Returns active sessions as a list of rows for the DataFrame.""" | |
| if not os.path.exists(ACTIVE_SESSIONS_FILE): | |
| return [] | |
| try: | |
| with open(ACTIVE_SESSIONS_FILE, "r") as f: | |
| data = json.load(f) | |
| rows = [ | |
| [ | |
| s.get("session_id", ""), | |
| s.get("file", ""), | |
| s.get("start_time", ""), | |
| s.get("status", ""), | |
| ] | |
| for s in data.values() | |
| ] | |
| return rows | |
| except Exception as e: | |
| logging.error(f"Error reading active sessions: {e}") | |
| return [] | |
| def stop_file_path(session_id: str) -> str: | |
| """Returns the stop-flag file path for a given session.""" | |
| ensure_tmp_dir() | |
| return os.path.join(TMP_DIR, f"stream_stop_flag_{session_id}.txt") | |
| def create_stop_flag(session_id: str): | |
| """Creates a stop-flag file for this session.""" | |
| path = stop_file_path(session_id) | |
| with open(path, "w") as f: | |
| f.write("1") | |
| logging.info(f"[{session_id}] Stop flag file created at {path}.") | |
| def clear_stop_flag(session_id: str): | |
| """Deletes the stop-flag file if it exists.""" | |
| path = stop_file_path(session_id) | |
| if os.path.exists(path): | |
| os.remove(path) | |
| logging.debug(f"[{session_id}] Stop flag cleared.") | |