Spaces:
Running
Running
File size: 8,336 Bytes
de863da 6ebef0b de863da 6ebef0b de863da 9745155 de863da da8386d 6ebef0b da8386d 2b822a9 da8386d 2b822a9 da8386d de863da da8386d de863da da8386d de863da da8386d de863da da8386d de863da da8386d 6ebef0b 7581420 256331a de863da 256331a 7581420 9745155 2b822a9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
"""
Module for analyzing chat history and extracting useful data for training
"""
import json
import logging
from typing import List, Dict, Any, Tuple, Optional
from collections import Counter, defaultdict
import re
from datetime import datetime
from src.knowledge_base.dataset import DatasetManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChatAnalyzer:
"""Chat history analyzer"""
def __init__(self, dataset_manager: Optional['DatasetManager'] = None):
"""
Initialize chat analyzer
Args:
dataset_manager: Dataset manager for getting chat history
"""
self.dataset_manager = dataset_manager or DatasetManager()
self.history = []
def analyze_chats(self) -> str:
"""
Analyzes chat history and returns a report
"""
try:
success, history = self.dataset_manager.get_chat_history() # Changed from load_chat_history to get_chat_history
if not success:
return "Failed to load chat history"
if not history:
return "No chat history available for analysis"
# Basic analysis
total_chats = len(history)
total_messages = sum(len(chat.get("messages", [])) for chat in history)
avg_messages = total_messages / total_chats if total_chats > 0 else 0
report = f"""
### Chat Analysis Report
- Total conversations: {total_chats}
- Total messages: {total_messages}
- Average messages per conversation: {avg_messages:.1f}
"""
return report
except Exception as e:
return f"Error during analysis: {str(e)}"
def extract_question_answer_pairs(self, min_question_length: int = 10) -> List[Dict[str, str]]:
"""
Extract question-answer pairs from chat history
Args:
min_question_length: Minimum question length to include in the sample
Returns:
List of question-answer pairs in format [{"question": "...", "answer": "..."}]
"""
chat_data = self.get_chat_data()
qa_pairs = []
for chat in chat_data:
messages = chat.get("messages", [])
# Go through messages and collect question-answer pairs
for i in range(len(messages) - 1):
if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
question = messages[i].get("content", "").strip()
answer = messages[i+1].get("content", "").strip()
# Filter by question length
if len(question) >= min_question_length and answer:
qa_pairs.append({
"question": question,
"answer": answer
})
return qa_pairs
def analyze_common_questions(self, top_n: int = 10) -> List[Tuple[str, int]]:
"""
Analysis of most frequently asked questions
Args:
top_n: Number of most popular questions to return
Returns:
List of tuples (question, count)
"""
qa_pairs = self.extract_question_answer_pairs()
# Extract only questions
questions = [qa["question"] for qa in qa_pairs]
# Preprocess questions for better grouping
processed_questions = []
for q in questions:
# Convert to lowercase
q = q.lower()
# Remove punctuation and extra spaces
q = re.sub(r'[^\w\s]', ' ', q)
q = re.sub(r'\s+', ' ', q).strip()
processed_questions.append(q)
# Count question frequency
question_counter = Counter(processed_questions)
# Get top_n most frequent questions
return question_counter.most_common(top_n)
def analyze_user_satisfaction(self) -> Dict[str, Any]:
"""
Analysis of user satisfaction based on chat history
Returns:
Dictionary with satisfaction metrics
"""
chat_data = self.get_chat_data()
# Initialize metrics
metrics = {
"total_conversations": len(chat_data),
"avg_messages_per_conversation": 0,
"avg_conversation_duration": 0, # in seconds
"follow_up_questions_rate": 0, # percentage of dialogs with follow-up questions
}
if not chat_data:
return metrics
# Calculate averages
metrics["avg_messages_per_conversation"] = total_messages / len(chat_data)
metrics["follow_up_questions_rate"] = conversations_with_followups / len(chat_data) * 100
# Calculate average duration if data exists
if total_duration > 0:
metrics["avg_conversation_duration"] = total_duration / len(chat_data)
return metrics
def extract_failed_questions(self) -> List[str]:
"""
Extract questions that the bot failed to answer satisfactorily
Returns:
List of questions that need improvement
"""
chat_data = self.get_chat_data()
failed_questions = []
# Keywords indicating unsatisfactory response
failure_indicators = [
"don't know", "cannot answer", "unable to answer",
"I don't have information", "no data available"
]
for chat in chat_data:
messages = chat.get("messages", [])
for i in range(len(messages) - 1):
if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
question = messages[i].get("content", "").strip()
answer = messages[i+1].get("content", "").strip().lower()
# Check if answer contains failure indicators
if any(indicator in answer for indicator in failure_indicators):
failed_questions.append(question)
return failed_questions
def export_training_data(self, output_file: str) -> Tuple[bool, str]:
"""
Export training data in JSONL format
Args:
output_file: Path to output file
Returns:
(success, message)
"""
try:
qa_pairs = self.extract_question_answer_pairs()
if not qa_pairs:
logger.warning("Not enough data for export")
return False, "Not enough data for export"
logger.info(f"Found {len(qa_pairs)} question-answer pairs for export")
with open(output_file, "w", encoding="utf-8") as f:
for pair in qa_pairs:
training_example = {
"messages": [
{"role": "user", "content": pair["question"]},
{"role": "assistant", "content": pair["answer"]}
]
}
f.write(json.dumps(training_example, ensure_ascii=False) + "\n")
logger.info(f"Data successfully exported to {output_file}")
return True, f"Training data successfully exported to {output_file}. Exported {len(qa_pairs)} examples."
except Exception as e:
logger.error(f"Error during data export: {str(e)}")
return False, f"Error exporting training data: {str(e)}"
def get_chat_data(self) -> List[Dict[str, Any]]:
"""
Get all chat data from dataset
Returns:
List of chat histories
"""
success, chat_data = self.dataset_manager.get_chat_history()
if not success:
logger.error(f"Failed to get chat history: {chat_data}")
if not chat_data:
logger.warning("Chat data is empty")
return chat_data if success and chat_data else []
|