import os import json import uuid import shutil from datetime import datetime from app.logger_config import logger as logging import gradio as gr # TMP_DIR = "/tmp/canary_aed_streaming" TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming") ACTIVE_SESSIONS_FILE = os.path.join(TMP_DIR, "active_sessions.json") # --------------------------- # Helper to manage the JSON # --------------------------- def _read_sessions(): if not os.path.exists(ACTIVE_SESSIONS_FILE): return {} try: with open(ACTIVE_SESSIONS_FILE, "r") as f: return json.load(f) except Exception: return {} def _write_sessions(data): os.makedirs(os.path.dirname(ACTIVE_SESSIONS_FILE), exist_ok=True) with open(ACTIVE_SESSIONS_FILE, "w") as f: json.dump(data, f, indent=2) # --------------------------- # LOAD # --------------------------- def on_load(request: gr.Request): """Called when a new visitor opens the app.""" session_hash = request.session_hash # ✅ Directly use session_hash as unique ID sessions = _read_sessions() sessions[session_hash] = { "session_hash": session_hash, "file": "", "start_time": datetime.utcnow().strftime("%H:%M:%S"), "status": "active", } _write_sessions(sessions) logging.info(f"[{session_hash}] Session registered (on_load).") return session_hash, session_hash # can be used as gr.State + display # --------------------------- # UNLOAD # --------------------------- def on_unload(request: gr.Request): """Called when the visitor closes or refreshes the app.""" sid = request.session_hash sessions = _read_sessions() if sid in sessions: sessions.pop(sid) _write_sessions(sessions) remove_session_data(sid) unregister_session(sid) logging.info(f"[{sid}] Session removed (on_unload).") else: logging.info(f"[{sid}] No active session found to remove.") def ensure_tmp_dir(): """Ensures the base temporary directory exists.""" try: os.makedirs(TMP_DIR, exist_ok=True) except Exception as e: logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}") def reset_all_active_sessions(): """Removes all temporary session files and folders at startup.""" ensure_tmp_dir() try: # --- Remove active sessions file --- if os.path.exists(ACTIVE_SESSIONS_FILE): os.remove(ACTIVE_SESSIONS_FILE) logging.info("Active sessions file reset at startup.") else: logging.debug("No active sessions file found to reset.") except Exception as e: logging.error(f"Error resetting active sessions: {e}") def remove_session_data(session_id: str): """Removes all temporary files and data related to a specific session.""" if not session_id: logging.warning("reset_session() called without a valid session_id.") return try: # --- Remove session from active_sessions.json --- if os.path.exists(ACTIVE_SESSIONS_FILE): try: with open(ACTIVE_SESSIONS_FILE, "r") as f: data = json.load(f) if session_id in data: data.pop(session_id) with open(ACTIVE_SESSIONS_FILE, "w") as f: json.dump(data, f, indent=2) logging.debug(f"[{session_id}] Removed from active_sessions.json.") except Exception as e: logging.warning(f"[{session_id}] Failed to update active_sessions.json: {e}") # --- Define all possible session file patterns --- files_to_remove = [ f"progress_{session_id}.json", # f"stream_stop_flag_{session_id}.txt", f"transcribe_stop_flag_{session_id}.txt", f"transcribe_active_{session_id}.txt", ] # --- Remove all temporary files --- for fname in files_to_remove: path = os.path.join(TMP_DIR, fname) if os.path.exists(path): try: os.remove(path) logging.debug(f"[{session_id}] Removed file: {fname}") except Exception as e: logging.warning(f"[{session_id}] Failed to remove file {fname}: {e}") # --- Remove chunk folder if exists --- chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_id}") if os.path.isdir(chunk_dir): try: shutil.rmtree(chunk_dir) logging.debug(f"[{session_id}] Removed chunk folder: chunks_{session_id}") except Exception as e: logging.warning(f"[{session_id}] Failed to remove chunk folder: {e}") logging.info(f"[{session_id}] Session fully reset.") except Exception as e: logging.error(f"[{session_id}] Error during reset_session: {e}") def generate_session_id() -> str: """Generates a unique session ID.""" sid = str(uuid.uuid4()) logging.debug(f"[{sid}] New session created.") return sid def register_session(session_id: str, filepath: str): """Registers a new session.""" ensure_tmp_dir() data = {} if os.path.exists(ACTIVE_SESSIONS_FILE): with open(ACTIVE_SESSIONS_FILE, "r") as f: try: data = json.load(f) except Exception: data = {} data[session_id] = { "session_hash": session_id, "file": filepath, "start_time": datetime.utcnow().strftime("%H:%M:%S"), "status": "active", } with open(ACTIVE_SESSIONS_FILE, "w") as f: json.dump(data, f) logging.debug(f"[{session_id}] Session registered in active_sessions.json.") def unregister_session(session_id: str): """Removes a session from the registry.""" if not os.path.exists(ACTIVE_SESSIONS_FILE): return try: with open(ACTIVE_SESSIONS_FILE, "r") as f: data = json.load(f) if session_id in data: data.pop(session_id) with open(ACTIVE_SESSIONS_FILE, "w") as f: json.dump(data, f) logging.debug(f"[{session_id}] Session unregistered.") except Exception as e: logging.error(f"[{session_id}] Error unregistering session: {e}") def get_active_sessions(): """Returns active sessions as a list of rows for the DataFrame.""" if not os.path.exists(ACTIVE_SESSIONS_FILE): return [] try: with open(ACTIVE_SESSIONS_FILE, "r") as f: data = json.load(f) rows = [ [ s.get("session_hash", ""), s.get("file", ""), s.get("start_time", ""), s.get("status", ""), ] for s in data.values() ] return rows except Exception as e: logging.error(f"Error reading active sessions: {e}") return []