|
|
import os |
|
|
import torch |
|
|
import numpy as np |
|
|
import uuid |
|
|
import requests |
|
|
import time |
|
|
import json |
|
|
from pydub import AudioSegment |
|
|
import wave |
|
|
from nemo.collections.asr.models import EncDecSpeakerLabelModel |
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
import librosa |
|
|
import pandas as pd |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
import re |
|
|
from typing import Dict, List, Tuple |
|
|
import logging |
|
|
import tempfile |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak, Image, HRFlowable |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.units import inch |
|
|
from reportlab.lib import colors |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
import io |
|
|
from transformers import AutoTokenizer, AutoModel, pipeline |
|
|
import spacy |
|
|
import google.generativeai as genai |
|
|
import joblib |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
logging.getLogger("nemo_logger").setLevel(logging.WARNING) |
|
|
|
|
|
|
|
|
OUTPUT_DIR = "./processed_audio" |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
PINECONE_KEY = os.getenv("PINECONE_KEY", "your-pinecone-key") |
|
|
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY", "your-assemblyai-key") |
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "your-gemini-key") |
|
|
|
|
|
def validate_url(url: str) -> bool: |
|
|
try: |
|
|
response = requests.head(url, timeout=5) |
|
|
return response.status_code == 200 |
|
|
except requests.RequestException as e: |
|
|
logger.error(f"URL validation failed for {url}: {str(e)}") |
|
|
return False |
|
|
|
|
|
def download_audio_from_url(url: str) -> str: |
|
|
if not validate_url(url): |
|
|
raise ValueError(f"Audio file not found or inaccessible at {url}") |
|
|
try: |
|
|
temp_dir = tempfile.gettempdir() |
|
|
temp_path = os.path.join(temp_dir, f"{uuid.uuid4()}.tmp_audio") |
|
|
logger.info(f"Downloading audio from {url} to {temp_path}") |
|
|
with requests.get(url, stream=True, timeout=10) as r: |
|
|
r.raise_for_status() |
|
|
with open(temp_path, 'wb') as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
return temp_path |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to download audio from URL {url}: {str(e)}") |
|
|
raise |
|
|
|
|
|
def initialize_services(): |
|
|
try: |
|
|
pc = Pinecone(api_key=PINECONE_KEY) |
|
|
index_name = "interview-speaker-embeddings" |
|
|
if index_name not in pc.list_indexes().names(): |
|
|
pc.create_index(name=index_name, dimension=192, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")) |
|
|
index = pc.Index(index_name) |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
return index, gemini_model |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing services: {str(e)}") |
|
|
raise |
|
|
|
|
|
index, gemini_model = initialize_services() |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
logger.info(f"Using device: {device}") |
|
|
|
|
|
def load_models(): |
|
|
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large", map_location=device) |
|
|
speaker_model.eval() |
|
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
|
|
return speaker_model, nlp |
|
|
|
|
|
speaker_model, nlp = load_models() |
|
|
|
|
|
def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str: |
|
|
|
|
|
try: |
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
if audio.channels > 1: audio = audio.set_channels(1) |
|
|
audio = audio.set_frame_rate(16000) |
|
|
wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav") |
|
|
audio.export(wav_file, format="wav") |
|
|
return wav_file |
|
|
except Exception as e: |
|
|
logger.error(f"Audio conversion failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict: |
|
|
|
|
|
try: |
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
segment = audio[start_ms:end_ms] |
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: |
|
|
segment.export(tmp.name, format="wav") |
|
|
y, sr = librosa.load(tmp.name, sr=16000) |
|
|
os.remove(tmp.name) |
|
|
pitches, _ = librosa.piptrack(y=y, sr=sr) |
|
|
pitches = pitches[pitches > 0] |
|
|
return { |
|
|
'duration': (end_ms - start_ms) / 1000.0, |
|
|
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])), |
|
|
'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])), |
|
|
'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])), |
|
|
'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])), |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Feature extraction failed: {str(e)}") |
|
|
return {} |
|
|
|
|
|
|
|
|
def transcribe(audio_path: str) -> Dict: |
|
|
|
|
|
try: |
|
|
with open(audio_path, 'rb') as f: |
|
|
upload_response = requests.post("https://api.assemblyai.com/v2/upload", headers={"authorization": ASSEMBLYAI_KEY}, data=f) |
|
|
audio_url = upload_response.json()['upload_url'] |
|
|
transcript_response = requests.post("https://api.assemblyai.com/v2/transcript", headers={"authorization": ASSEMBLYAI_KEY}, json={"audio_url": audio_url, "speaker_labels": True, "filter_profanity": True}) |
|
|
transcript_id = transcript_response.json()['id'] |
|
|
while True: |
|
|
result = requests.get(f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers={"authorization": ASSEMBLYAI_KEY}).json() |
|
|
if result['status'] == 'completed': return result |
|
|
elif result['status'] == 'error': raise Exception(f"AssemblyAI Error: {result.get('error')}") |
|
|
time.sleep(5) |
|
|
except Exception as e: |
|
|
logger.error(f"Transcription failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def process_utterance(utterance: Dict, full_audio: AudioSegment) -> Dict: |
|
|
|
|
|
try: |
|
|
start, end = utterance['start'], utterance['end'] |
|
|
segment = full_audio[start:end] |
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: |
|
|
segment.export(tmp.name, format="wav") |
|
|
with torch.no_grad(): |
|
|
embedding = speaker_model.get_embedding(tmp.name).cpu().numpy() |
|
|
os.remove(tmp.name) |
|
|
embedding_list = embedding.flatten().tolist() |
|
|
query_result = index.query(vector=embedding_list, top_k=1, include_metadata=True) |
|
|
if query_result['matches'] and query_result['matches'][0]['score'] > 0.75: |
|
|
speaker_id = query_result['matches'][0]['id'] |
|
|
speaker_name = query_result['matches'][0]['metadata']['speaker_name'] |
|
|
else: |
|
|
speaker_id = f"speaker_{uuid.uuid4().hex[:6]}" |
|
|
speaker_name = f"Speaker_{speaker_id[-4:].upper()}" |
|
|
index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})]) |
|
|
return {**utterance, 'speaker': speaker_name, 'speaker_id': speaker_id} |
|
|
except Exception as e: |
|
|
logger.error(f"Utterance processing failed: {str(e)}") |
|
|
return {**utterance, 'speaker': 'Unknown', 'speaker_id': 'unknown'} |
|
|
|
|
|
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]: |
|
|
|
|
|
try: |
|
|
full_audio = AudioSegment.from_wav(wav_file) |
|
|
utterances = transcript.get('utterances', []) |
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
|
futures = [executor.submit(process_utterance, u, full_audio) for u in utterances] |
|
|
results = [f.result() for f in futures] |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Speaker identification failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def classify_roles(utterances: List[Dict]) -> List[Dict]: |
|
|
|
|
|
results = [] |
|
|
for i, utterance in enumerate(utterances): |
|
|
utterance['role'] = 'Interviewer' if i % 2 == 0 else 'Interviewee' |
|
|
results.append(utterance) |
|
|
return results |
|
|
|
|
|
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: |
|
|
|
|
|
try: |
|
|
y, sr = librosa.load(audio_path, sr=16000) |
|
|
interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee'] |
|
|
if not interviewee_utterances: return {'error': 'No interviewee utterances found'} |
|
|
segments = [y[int(u['start']*sr/1000):int(u['end']*sr/1000)] for u in interviewee_utterances if u['end'] > u['start']] |
|
|
if not segments: return {'error': 'No valid audio segments found'} |
|
|
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) |
|
|
total_words = sum(len(u['text'].split()) for u in interviewee_utterances) |
|
|
speaking_rate = total_words / total_duration if total_duration > 0 else 0 |
|
|
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] |
|
|
filler_count = sum(sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances) |
|
|
filler_ratio = filler_count / total_words if total_words > 0 else 0 |
|
|
pitches, intensities = [], [] |
|
|
for segment in segments: |
|
|
if len(segment) == 0: continue |
|
|
f0, voiced_flag, _ = librosa.pyin(segment, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'), sr=sr) |
|
|
pitches.extend(f0[voiced_flag]) |
|
|
intensities.extend(librosa.feature.rms(y=segment)[0]) |
|
|
pitch_mean = float(np.mean(pitches)) if len(pitches) > 0 else 0.0 |
|
|
intensity_std = float(np.std(intensities)) if len(intensities) > 0 else 0.0 |
|
|
jitter = float(np.mean(np.abs(np.diff(pitches))) / pitch_mean) if len(pitches) > 1 and pitch_mean > 0 else 0.0 |
|
|
shimmer = float(np.mean(np.abs(np.diff(intensities))) / np.mean(intensities)) if len(intensities) > 1 and np.mean(intensities) > 0 else 0.0 |
|
|
anxiety_score = 0.6 * (np.std(pitches)/pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer) |
|
|
confidence_score = 0.7 * (1/(1+intensity_std)) + 0.3 * (1-filler_ratio) |
|
|
return { |
|
|
'speaking_rate': round(speaking_rate, 2), 'filler_ratio': round(filler_ratio, 3), |
|
|
'composite_scores': {'anxiety': round(anxiety_score, 3), 'confidence': round(confidence_score, 3)}, |
|
|
'interpretation': { |
|
|
'anxiety_level': 'High' if anxiety_score > 0.15 else 'Moderate' if anxiety_score > 0.07 else 'Low', |
|
|
'confidence_level': 'High' if confidence_score > 0.75 else 'Moderate' if confidence_score > 0.5 else 'Low', |
|
|
'fluency_level': 'Fluent' if filler_ratio < 0.05 else 'Moderate' |
|
|
} |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Voice analysis failed: {str(e)}") |
|
|
return {'error': str(e)} |
|
|
|
|
|
def calculate_acceptance_probability(analysis_data: Dict) -> float: |
|
|
|
|
|
voice = analysis_data.get('voice_analysis', {}) |
|
|
if 'error' in voice: return 50.0 |
|
|
w_confidence, w_anxiety, w_fluency, w_speaking_rate, w_filler_repetition, w_content_strengths = 0.35, -0.25, 0.2, 0.15, -0.15, 0.25 |
|
|
confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0) |
|
|
anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0) |
|
|
fluency_level = voice.get('interpretation', {}).get('fluency_level', 'Disfluent') |
|
|
speaking_rate = voice.get('speaking_rate', 0.0) |
|
|
filler_ratio = voice.get('filler_ratio', 0.0) |
|
|
repetition_score = voice.get('repetition_score', 0.0) |
|
|
fluency_map = {'Fluent': 1.0, 'Moderate': 0.6, 'Disfluent': 0.2} |
|
|
fluency_val = fluency_map.get(fluency_level, 0.2) |
|
|
ideal_speaking_rate = 2.5 |
|
|
speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate) |
|
|
speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate)) |
|
|
filler_repetition_composite = (filler_ratio + repetition_score) / 2 |
|
|
filler_repetition_score = max(0, 1 - filler_repetition_composite) |
|
|
content_strength_val = 0.85 if analysis_data.get('text_analysis', {}).get('total_duration', 0) > 60 else 0.4 |
|
|
raw_score = (confidence_score * w_confidence + (1 - anxiety_score) * abs(w_anxiety) + fluency_val * w_fluency + speaking_rate_score * w_speaking_rate + filler_repetition_score * abs(w_filler_repetition) + content_strength_val * w_content_strengths) |
|
|
max_possible_score = (w_confidence + abs(w_anxiety) + w_fluency + w_speaking_rate + abs(w_filler_repetition) + w_content_strengths) |
|
|
normalized_score = raw_score / max_possible_score if max_possible_score > 0 else 0.5 |
|
|
acceptance_probability = max(0.0, min(1.0, normalized_score)) |
|
|
return float(f"{acceptance_probability * 100:.2f}") |
|
|
|
|
|
def convert_to_serializable(obj): |
|
|
|
|
|
if isinstance(obj, np.generic): return obj.item() |
|
|
if isinstance(obj, dict): return {k: convert_to_serializable(v) for k, v in obj.items()} |
|
|
if isinstance(obj, list): return [convert_to_serializable(i) for i in obj] |
|
|
if isinstance(obj, np.ndarray): return obj.tolist() |
|
|
return obj |
|
|
|
|
|
|
|
|
def generate_report(analysis_data: Dict, user_id: str) -> str: |
|
|
try: |
|
|
voice = analysis_data.get('voice_analysis', {}) |
|
|
voice_interpretation = "Voice analysis data was not available." |
|
|
if voice and 'error' not in voice: |
|
|
voice_interpretation = ( |
|
|
f"The candidate's voice profile indicates a '{voice.get('interpretation', {}).get('confidence_level', 'N/A').upper()}' confidence level " |
|
|
f"and a '{voice.get('interpretation', {}).get('anxiety_level', 'N/A').upper()}' anxiety level. " |
|
|
f"Fluency was rated as '{voice.get('interpretation', {}).get('fluency_level', 'N/A').upper()}'." |
|
|
) |
|
|
|
|
|
prob = analysis_data.get('acceptance_probability') |
|
|
|
|
|
prompt = f""" |
|
|
**Persona:** You are a Senior HR Partner writing a candidate evaluation memo for the hiring manager. |
|
|
**Task:** Write a professional, objective, and concise evaluation based on the data below. |
|
|
**Tone:** Analytical and formal. |
|
|
|
|
|
**CANDIDATE EVALUATION MEMORANDUM** |
|
|
**CONFIDENTIAL** |
|
|
|
|
|
**Candidate ID:** {user_id} |
|
|
**Analysis Date:** {time.strftime('%Y-%m-%d')} |
|
|
**Estimated Suitability Score:** {prob:.2f}% |
|
|
|
|
|
**1. Overall Recommendation:** |
|
|
Provide a clear, one-sentence recommendation (e.g., "Highly recommend proceeding to the final round," "Recommend with reservations," or "Do not recommend at this time."). Briefly justify the recommendation. |
|
|
|
|
|
**2. Communication & Presentation Style:** |
|
|
- Evaluate the candidate's communication style based on vocal delivery (confidence, clarity, potential nervousness). |
|
|
- **Data for Analysis:** {voice_interpretation} |
|
|
|
|
|
**3. Actionable Next Steps:** |
|
|
- Suggest specific questions or topics for the next interviewer to focus on. |
|
|
- If not recommending, provide a concise, constructive reason. |
|
|
""" |
|
|
response = gemini_model.generate_content(prompt) |
|
|
return response.text |
|
|
except Exception as e: |
|
|
logger.error(f"Report generation failed: {str(e)}") |
|
|
return f"Error generating report: {str(e)}" |
|
|
|
|
|
|
|
|
def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str): |
|
|
try: |
|
|
doc = SimpleDocTemplate(output_path, pagesize=letter, |
|
|
rightMargin=0.75*inch, leftMargin=0.75*inch, |
|
|
topMargin=1.2*inch, bottomMargin=1*inch) |
|
|
styles = getSampleStyleSheet() |
|
|
h1 = ParagraphStyle(name='Heading1', fontSize=18, leading=22, spaceAfter=12, alignment=1, textColor=colors.HexColor('#00205B'), fontName='Helvetica-Bold') |
|
|
h2 = ParagraphStyle(name='Heading2', fontSize=14, leading=18, spaceBefore=12, spaceAfter=8, textColor=colors.HexColor('#003366'), fontName='Helvetica-Bold') |
|
|
body_text = ParagraphStyle(name='BodyText', parent=styles['Normal'], fontSize=10, leading=14, spaceAfter=6, fontName='Helvetica') |
|
|
|
|
|
story = [] |
|
|
def header_footer(canvas, doc): |
|
|
canvas.saveState() |
|
|
canvas.setFont('Helvetica', 9) |
|
|
canvas.setFillColor(colors.grey) |
|
|
canvas.drawString(doc.leftMargin, 0.5 * inch, f"Page {doc.page} | EvalBot Confidential Report") |
|
|
canvas.restoreState() |
|
|
|
|
|
|
|
|
|
|
|
formatted_text = gemini_report_text.replace('\n', '<br/>') |
|
|
formatted_text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', formatted_text) |
|
|
|
|
|
lines = formatted_text.split('<br/>') |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
story.append(Spacer(1, 8)) |
|
|
continue |
|
|
|
|
|
|
|
|
if line.startswith('<b>') and len(line) < 100: |
|
|
story.append(Paragraph(line, h2)) |
|
|
else: |
|
|
story.append(Paragraph(line, body_text)) |
|
|
|
|
|
doc.build(story, onFirstPage=header_footer, onLaterPages=header_footer) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"PDF creation failed: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def process_interview(audio_url: str, user_id: str) -> Dict: |
|
|
local_audio_path = None |
|
|
wav_file = None |
|
|
is_downloaded = False |
|
|
try: |
|
|
logger.info(f"Starting processing for user '{user_id}' URL: {audio_url}") |
|
|
|
|
|
local_audio_path = download_audio_from_url(audio_url) |
|
|
is_downloaded = True |
|
|
|
|
|
wav_file = convert_to_wav(local_audio_path) |
|
|
transcript = transcribe(wav_file) |
|
|
|
|
|
if 'utterances' not in transcript or not transcript['utterances']: |
|
|
raise ValueError("Transcription returned no utterances.") |
|
|
|
|
|
for u in transcript['utterances']: |
|
|
u['prosodic_features'] = extract_prosodic_features(wav_file, u['start'], u['end']) |
|
|
|
|
|
utterances_with_speakers = identify_speakers(transcript, wav_file) |
|
|
|
|
|
|
|
|
for i, u in enumerate(utterances_with_speakers): |
|
|
u['role'] = 'Interviewer' if i % 2 == 0 else 'Interviewee' |
|
|
classified_utterances = utterances_with_speakers |
|
|
|
|
|
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) |
|
|
|
|
|
|
|
|
analysis_data = { |
|
|
'user_id': user_id, |
|
|
'transcript': classified_utterances, |
|
|
'speakers': list(set(u['speaker'] for u in classified_utterances if u['speaker'] != 'Unknown')), |
|
|
'voice_analysis': voice_analysis, |
|
|
'text_analysis': { |
|
|
'total_duration': sum(u.get('prosodic_features',{}).get('duration',0) for u in classified_utterances), |
|
|
'speaker_turns': len(classified_utterances) |
|
|
} |
|
|
} |
|
|
|
|
|
analysis_data['acceptance_probability'] = calculate_acceptance_probability(analysis_data) |
|
|
gemini_report_text = generate_report(analysis_data, user_id) |
|
|
|
|
|
base_name = str(uuid.uuid4()) |
|
|
|
|
|
company_pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_company_report.pdf") |
|
|
json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json") |
|
|
|
|
|
create_pdf_report(analysis_data, company_pdf_path, gemini_report_text) |
|
|
|
|
|
with open(json_path, 'w') as f: |
|
|
json.dump(convert_to_serializable(analysis_data), f, indent=2) |
|
|
|
|
|
logger.info(f"Processing completed for {audio_url}") |
|
|
|
|
|
return { |
|
|
'company_pdf_path': company_pdf_path, |
|
|
'json_path': json_path, |
|
|
'pdf_filename': os.path.basename(company_pdf_path), |
|
|
'json_filename': os.path.basename(json_path) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Processing failed for {audio_url}: {str(e)}", exc_info=True) |
|
|
raise |
|
|
|
|
|
finally: |
|
|
if wav_file and os.path.exists(wav_file): |
|
|
try: os.remove(wav_file) |
|
|
except Exception as e: logger.error(f"Failed to clean up wav file {wav_file}: {str(e)}") |
|
|
if is_downloaded and local_audio_path and os.path.exists(local_audio_path): |
|
|
try: |
|
|
os.remove(local_audio_path) |
|
|
logger.info(f"Cleaned up temporary file: {local_audio_path}") |
|
|
except Exception as e: logger.error(f"Failed to clean up local audio file {local_audio_path}: {str(e)}") |