canary_aed_streaming / app /old_session_utils.py
Archime's picture
impl walkthrough
6f523af
raw
history blame
9.2 kB
import os
import json
import uuid
import shutil
from datetime import datetime
from app.logger_config import logger as logging
import gradio as gr
# TMP_DIR = "/tmp/canary_aed_streaming"
TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming")
ACTIVE_SESSIONS_FILE = os.path.join(TMP_DIR, "active_sessions.json")
# ---------------------------
# Helper to manage the JSON
# ---------------------------
def _read_sessions():
if not os.path.exists(ACTIVE_SESSIONS_FILE):
return {}
try:
with open(ACTIVE_SESSIONS_FILE, "r") as f:
return json.load(f)
except Exception:
return {}
def _write_sessions(data):
os.makedirs(os.path.dirname(ACTIVE_SESSIONS_FILE), exist_ok=True)
with open(ACTIVE_SESSIONS_FILE, "w") as f:
json.dump(data, f, indent=2)
# ---------------------------
# LOAD
# ---------------------------
def on_load(request: gr.Request):
"""Called when a new visitor opens the app."""
sid = request.session_hash # ✅ Directly use session_hash as unique ID
sessions = _read_sessions()
sessions[sid] = {
"session_id": sid,
"file": "",
"start_time": datetime.utcnow().strftime("%H:%M:%S"),
"status": "active",
}
_write_sessions(sessions)
logging.info(f"[{sid}] Session registered (on_load).")
return sid, sid # can be used as gr.State + display
# ---------------------------
# UNLOAD
# ---------------------------
def on_unload(request: gr.Request):
"""Called when the visitor closes or refreshes the app."""
sid = request.session_hash
sessions = _read_sessions()
if sid in sessions:
create_stop_flag(sid)
sessions.pop(sid)
_write_sessions(sessions)
remove_session_data(sid)
unregister_session(sid)
logging.info(f"[{sid}] Session removed (on_unload).")
else:
logging.info(f"[{sid}] No active session found to remove.")
def ensure_tmp_dir():
"""Ensures the base temporary directory exists."""
try:
os.makedirs(TMP_DIR, exist_ok=True)
except Exception as e:
logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}")
def reset_all_active_sessions():
"""Removes all temporary session files and folders at startup."""
ensure_tmp_dir()
try:
# --- Remove active sessions file ---
if os.path.exists(ACTIVE_SESSIONS_FILE):
os.remove(ACTIVE_SESSIONS_FILE)
logging.info("Active sessions file reset at startup.")
else:
logging.debug("No active sessions file found to reset.")
# --- Clean progress files ---
for f in os.listdir(TMP_DIR):
if f.startswith("progress_") and f.endswith(".json"):
path = os.path.join(TMP_DIR, f)
try:
os.remove(path)
logging.debug(f"Removed leftover progress file: {f}")
except Exception as e:
logging.warning(f"Failed to remove progress file {f}: {e}")
# --- Clean all flag files (stream + transcribe) ---
for f in os.listdir(TMP_DIR):
if (
f.startswith("stream_stop_flag_")
or f.startswith("transcribe_stop_flag_")
or f.startswith("transcribe_active_")
) and f.endswith(".txt"):
path = os.path.join(TMP_DIR, f)
try:
os.remove(path)
logging.debug(f"Removed leftover flag file: {f}")
except Exception as e:
logging.warning(f"Failed to remove flag file {f}: {e}")
# --- Clean chunk directories ---
for name in os.listdir(TMP_DIR):
path = os.path.join(TMP_DIR, name)
if os.path.isdir(path) and name.startswith("chunks_"):
try:
shutil.rmtree(path)
logging.debug(f"Removed leftover chunk folder: {name}")
except Exception as e:
logging.warning(f"Failed to remove chunk folder {name}: {e}")
logging.info("Temporary session cleanup completed successfully.")
except Exception as e:
logging.error(f"Error resetting active sessions: {e}")
def remove_session_data(session_id: str):
"""Removes all temporary files and data related to a specific session."""
if not session_id:
logging.warning("reset_session() called without a valid session_id.")
return
try:
# --- Remove session from active_sessions.json ---
if os.path.exists(ACTIVE_SESSIONS_FILE):
try:
with open(ACTIVE_SESSIONS_FILE, "r") as f:
data = json.load(f)
if session_id in data:
data.pop(session_id)
with open(ACTIVE_SESSIONS_FILE, "w") as f:
json.dump(data, f, indent=2)
logging.debug(f"[{session_id}] Removed from active_sessions.json.")
except Exception as e:
logging.warning(f"[{session_id}] Failed to update active_sessions.json: {e}")
# --- Define all possible session file patterns ---
files_to_remove = [
f"progress_{session_id}.json",
# f"stream_stop_flag_{session_id}.txt",
f"transcribe_stop_flag_{session_id}.txt",
f"transcribe_active_{session_id}.txt",
]
# --- Remove all temporary files ---
for fname in files_to_remove:
path = os.path.join(TMP_DIR, fname)
if os.path.exists(path):
try:
os.remove(path)
logging.debug(f"[{session_id}] Removed file: {fname}")
except Exception as e:
logging.warning(f"[{session_id}] Failed to remove file {fname}: {e}")
# --- Remove chunk folder if exists ---
chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_id}")
if os.path.isdir(chunk_dir):
try:
shutil.rmtree(chunk_dir)
logging.debug(f"[{session_id}] Removed chunk folder: chunks_{session_id}")
except Exception as e:
logging.warning(f"[{session_id}] Failed to remove chunk folder: {e}")
logging.info(f"[{session_id}] Session fully reset.")
except Exception as e:
logging.error(f"[{session_id}] Error during reset_session: {e}")
def generate_session_id() -> str:
"""Generates a unique session ID."""
sid = str(uuid.uuid4())
logging.debug(f"[{sid}] New session created.")
return sid
def register_session(session_id: str, filepath: str):
"""Registers a new session."""
ensure_tmp_dir()
data = {}
if os.path.exists(ACTIVE_SESSIONS_FILE):
with open(ACTIVE_SESSIONS_FILE, "r") as f:
try:
data = json.load(f)
except Exception:
data = {}
data[session_id] = {
"session_id": session_id,
"file": filepath,
"start_time": datetime.utcnow().strftime("%H:%M:%S"),
"status": "active",
}
with open(ACTIVE_SESSIONS_FILE, "w") as f:
json.dump(data, f)
logging.debug(f"[{session_id}] Session registered in active_sessions.json.")
def unregister_session(session_id: str):
"""Removes a session from the registry."""
if not os.path.exists(ACTIVE_SESSIONS_FILE):
return
try:
with open(ACTIVE_SESSIONS_FILE, "r") as f:
data = json.load(f)
if session_id in data:
data.pop(session_id)
with open(ACTIVE_SESSIONS_FILE, "w") as f:
json.dump(data, f)
logging.debug(f"[{session_id}] Session unregistered.")
except Exception as e:
logging.error(f"[{session_id}] Error unregistering session: {e}")
def get_active_sessions():
"""Returns active sessions as a list of rows for the DataFrame."""
if not os.path.exists(ACTIVE_SESSIONS_FILE):
return []
try:
with open(ACTIVE_SESSIONS_FILE, "r") as f:
data = json.load(f)
rows = [
[
s.get("session_id", ""),
s.get("file", ""),
s.get("start_time", ""),
s.get("status", ""),
]
for s in data.values()
]
return rows
except Exception as e:
logging.error(f"Error reading active sessions: {e}")
return []
def stop_file_path(session_id: str) -> str:
"""Returns the stop-flag file path for a given session."""
ensure_tmp_dir()
return os.path.join(TMP_DIR, f"stream_stop_flag_{session_id}.txt")
def create_stop_flag(session_id: str):
"""Creates a stop-flag file for this session."""
path = stop_file_path(session_id)
with open(path, "w") as f:
f.write("1")
logging.info(f"[{session_id}] Stop flag file created at {path}.")
def clear_stop_flag(session_id: str):
"""Deletes the stop-flag file if it exists."""
path = stop_file_path(session_id)
if os.path.exists(path):
os.remove(path)
logging.debug(f"[{session_id}] Stop flag cleared.")