File size: 8,336 Bytes
de863da
 
 
 
 
 
6ebef0b
de863da
6ebef0b
de863da
9745155
 
de863da
 
 
 
da8386d
 
 
6ebef0b
 
 
 
 
 
 
 
da8386d
 
 
 
 
 
 
2b822a9
da8386d
 
 
 
 
 
 
 
 
2b822a9
da8386d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de863da
da8386d
 
de863da
 
da8386d
 
 
 
 
 
 
 
 
 
de863da
da8386d
de863da
da8386d
de863da
da8386d
6ebef0b
7581420
 
 
 
 
 
 
 
256331a
de863da
256331a
 
 
7581420
 
9745155
2b822a9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
Module for analyzing chat history and extracting useful data for training
"""

import json
import logging
from typing import List, Dict, Any, Tuple, Optional
from collections import Counter, defaultdict
import re
from datetime import datetime
from src.knowledge_base.dataset import DatasetManager

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChatAnalyzer:
    """Chat history analyzer"""
    
    def __init__(self, dataset_manager: Optional['DatasetManager'] = None):
        """
        Initialize chat analyzer
        
        Args:
            dataset_manager: Dataset manager for getting chat history
        """
        self.dataset_manager = dataset_manager or DatasetManager()
        self.history = []

    def analyze_chats(self) -> str:
        """
        Analyzes chat history and returns a report
        """
        try:
            success, history = self.dataset_manager.get_chat_history()  # Changed from load_chat_history to get_chat_history
            
            if not success:
                return "Failed to load chat history"
            
            if not history:
                return "No chat history available for analysis"
            
            # Basic analysis
            total_chats = len(history)
            total_messages = sum(len(chat.get("messages", [])) for chat in history)
            avg_messages = total_messages / total_chats if total_chats > 0 else 0
            
            report = f"""
### Chat Analysis Report

- Total conversations: {total_chats}
- Total messages: {total_messages}
- Average messages per conversation: {avg_messages:.1f}
            """
            
            return report
            
        except Exception as e:
            return f"Error during analysis: {str(e)}"

    def extract_question_answer_pairs(self, min_question_length: int = 10) -> List[Dict[str, str]]:
        """
        Extract question-answer pairs from chat history
        
        Args:
            min_question_length: Minimum question length to include in the sample
            
        Returns:
            List of question-answer pairs in format [{"question": "...", "answer": "..."}]
        """
        chat_data = self.get_chat_data()
        qa_pairs = []
        
        for chat in chat_data:
            messages = chat.get("messages", [])
            
            # Go through messages and collect question-answer pairs
            for i in range(len(messages) - 1):
                if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
                    question = messages[i].get("content", "").strip()
                    answer = messages[i+1].get("content", "").strip()
                    
                    # Filter by question length
                    if len(question) >= min_question_length and answer:
                        qa_pairs.append({
                            "question": question,
                            "answer": answer
                        })
        
        return qa_pairs
    
    def analyze_common_questions(self, top_n: int = 10) -> List[Tuple[str, int]]:
        """
        Analysis of most frequently asked questions
        
        Args:
            top_n: Number of most popular questions to return
            
        Returns:
            List of tuples (question, count)
        """
        qa_pairs = self.extract_question_answer_pairs()
        
        # Extract only questions
        questions = [qa["question"] for qa in qa_pairs]
        
        # Preprocess questions for better grouping
        processed_questions = []
        for q in questions:
            # Convert to lowercase
            q = q.lower()
            # Remove punctuation and extra spaces
            q = re.sub(r'[^\w\s]', ' ', q)
            q = re.sub(r'\s+', ' ', q).strip()
            processed_questions.append(q)
        
        # Count question frequency
        question_counter = Counter(processed_questions)
        
        # Get top_n most frequent questions
        return question_counter.most_common(top_n)
    
    def analyze_user_satisfaction(self) -> Dict[str, Any]:
        """
        Analysis of user satisfaction based on chat history
        
        Returns:
            Dictionary with satisfaction metrics
        """
        chat_data = self.get_chat_data()
        
        # Initialize metrics
        metrics = {
            "total_conversations": len(chat_data),
            "avg_messages_per_conversation": 0,
            "avg_conversation_duration": 0,  # in seconds
            "follow_up_questions_rate": 0,   # percentage of dialogs with follow-up questions
        }
        
        if not chat_data:
            return metrics
        
        # Calculate averages
        metrics["avg_messages_per_conversation"] = total_messages / len(chat_data)
        metrics["follow_up_questions_rate"] = conversations_with_followups / len(chat_data) * 100
        
        # Calculate average duration if data exists
        if total_duration > 0:
            metrics["avg_conversation_duration"] = total_duration / len(chat_data)
        
        return metrics
    
    def extract_failed_questions(self) -> List[str]:
        """
        Extract questions that the bot failed to answer satisfactorily
        
        Returns:
            List of questions that need improvement
        """
        chat_data = self.get_chat_data()
        failed_questions = []
        
        # Keywords indicating unsatisfactory response
        failure_indicators = [
            "don't know", "cannot answer", "unable to answer", 
            "I don't have information", "no data available"
        ]
        
        for chat in chat_data:
            messages = chat.get("messages", [])
            
            for i in range(len(messages) - 1):
                if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
                    question = messages[i].get("content", "").strip()
                    answer = messages[i+1].get("content", "").strip().lower()
                    
                    # Check if answer contains failure indicators
                    if any(indicator in answer for indicator in failure_indicators):
                        failed_questions.append(question)
        
        return failed_questions
    
    def export_training_data(self, output_file: str) -> Tuple[bool, str]:
        """
        Export training data in JSONL format
        
        Args:
            output_file: Path to output file
            
        Returns:
            (success, message)
        """
        try:
            qa_pairs = self.extract_question_answer_pairs()
            
            if not qa_pairs:
                logger.warning("Not enough data for export")
                return False, "Not enough data for export"
            
            logger.info(f"Found {len(qa_pairs)} question-answer pairs for export")
            
            with open(output_file, "w", encoding="utf-8") as f:
                for pair in qa_pairs:
                    training_example = {
                        "messages": [
                            {"role": "user", "content": pair["question"]},
                            {"role": "assistant", "content": pair["answer"]}
                        ]
                    }
                    f.write(json.dumps(training_example, ensure_ascii=False) + "\n")
            
            logger.info(f"Data successfully exported to {output_file}")
            return True, f"Training data successfully exported to {output_file}. Exported {len(qa_pairs)} examples."
            
        except Exception as e:
            logger.error(f"Error during data export: {str(e)}")
            return False, f"Error exporting training data: {str(e)}"

    def get_chat_data(self) -> List[Dict[str, Any]]:
        """
        Get all chat data from dataset
        
        Returns:
            List of chat histories
        """
        success, chat_data = self.dataset_manager.get_chat_history()
        if not success:
            logger.error(f"Failed to get chat history: {chat_data}")
        if not chat_data:
            logger.warning("Chat data is empty")
        return chat_data if success and chat_data else []