Spaces:
Running
Running
File size: 8,227 Bytes
a76b710 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
#!/usr/bin/env python
# repair_conversation_ids.py
"""
Script to restore empty conversation_ids in chat history files.
One-time operation with hardcoded paths.
"""
import os
import sys
import json
import codecs
import datetime
import logging
import tempfile
from huggingface_hub import HfApi
from dotenv import load_dotenv
import time
from tenacity import retry, stop_after_attempt, wait_exponential
# Load environment variables
load_dotenv()
# PATHS AND PARAMETERS CONFIGURATION
# =============================
# Modify these values according to your configuration
CHAT_HISTORY_PATH = './chat_history' # Path to local chat history files
DATASET_ID = 'Rulga/status-law-knowledge-base' # HuggingFace dataset ID
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # HuggingFace API access token
if not HF_TOKEN:
raise ValueError("HUGGINGFACE_TOKEN not found in environment variables")
# Dataset paths
DATASET_CHAT_HISTORY_PATH = "chat_history"
DATASET_VECTOR_STORE_PATH = "vector_store"
DATASET_FINE_TUNED_PATH = "fine_tuned_models"
DATASET_ANNOTATIONS_PATH = "annotations"
DATASET_ERROR_LOGS_PATH = "error_logs"
DATASET_PREFERENCES_PATH = "preferences/user_preferences.json"
# If True, script won't make actual changes (test mode)
DRY_RUN = False
# If True, script will update only local files
LOCAL_ONLY = False
# Add temporary directory for downloads
TEMP_DIR = tempfile.mkdtemp()
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("repair_conversation_ids.log", encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Configure stdout encoding
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=60, min=60, max=180)
)
def safe_api_call(func, *args, **kwargs):
"""Wrapper for API calls with retry logic"""
try:
return func(*args, **kwargs)
except Exception as e:
if "429 Client Error: Too Many Requests" in str(e):
logger.warning("Rate limit hit, waiting before retry...")
raise # Let retry handle it
raise # Other errors
def repair_conversation_ids():
"""
Restore conversation_ids in chat history files directly in HuggingFace dataset
"""
try:
api = HfApi(token=HF_TOKEN)
# List all files with retry
files = safe_api_call(
api.list_repo_files,
repo_id=DATASET_ID,
repo_type="dataset"
)
chat_files = [f for f in files
if f.startswith(DATASET_CHAT_HISTORY_PATH) and
f.endswith('.json') and
os.path.basename(f).startswith('None_')]
logger.info(f"Found {len(chat_files)} files with 'None_' prefix in dataset")
repaired_count = 0
skipped_count = 0
error_count = 0
for file_path in chat_files:
try:
# Add delay between files
time.sleep(2) # 2 seconds between files
# Download file content with retry
file_content = safe_api_call(
api.hf_hub_download,
repo_id=DATASET_ID,
repo_type="dataset",
filename=file_path,
local_dir=TEMP_DIR,
local_dir_use_symlinks=False
)
with open(file_content, 'r', encoding='utf-8') as f:
chat_data = json.load(f)
# Generate new ID based on timestamp and file details
timestamp_str = chat_data.get('timestamp', '')
try:
timestamp_dt = datetime.datetime.fromisoformat(timestamp_str)
time_part = timestamp_dt.strftime('%Y%m%d%H%M%S')
except (ValueError, TypeError):
time_part = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
filename = os.path.basename(file_path)
filename_part = os.path.splitext(filename)[0].replace('None_', '')
if len(filename_part) > 10:
filename_part = filename_part[:10]
new_id = f"conv_{time_part}_{filename_part}"
chat_data['conversation_id'] = new_id
if not DRY_RUN:
# Create new filename without None_ prefix
new_filename = filename.replace('None_', '')
new_path = os.path.join(
os.path.dirname(file_path),
new_filename
)
# First move the old file to archive
archive_timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
archive_filename = f"archive/None_{archive_timestamp}_{filename}"
archive_path = os.path.join(DATASET_CHAT_HISTORY_PATH, archive_filename)
# Create archive directory if it doesn't exist
try:
api.upload_file(
path_or_fileobj=b"",
path_in_repo=f"{DATASET_CHAT_HISTORY_PATH}/archive/.gitkeep",
repo_id=DATASET_ID,
repo_type="dataset"
)
except Exception:
pass # Directory might already exist
# Move old file to archive with retry
safe_api_call(
api.upload_file,
path_or_fileobj=file_content,
path_in_repo=archive_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
# Upload updated content with retry
json_content = json.dumps(chat_data, ensure_ascii=False, indent=2)
safe_api_call(
api.upload_file,
path_or_fileobj=json_content.encode('utf-8'),
path_in_repo=new_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
# Only after successful upload of both files, delete the original with retry
safe_api_call(
api.delete_file,
path_in_repo=file_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
logger.info(f"Repaired: {filename} -> {new_filename} (archived as {archive_filename}) - New ID: {new_id}")
repaired_count += 1
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
error_count += 1
continue # Skip to next file on error
logger.info(f"Repair completed: {repaired_count} files repaired, {skipped_count} skipped, {error_count} errors")
return repaired_count
except Exception as e:
logger.error(f"Error accessing dataset: {str(e)}")
return 0
if __name__ == "__main__":
# Display configuration information
logger.info("=== CONFIGURATION ===")
logger.info(f"Chat history path: {CHAT_HISTORY_PATH}")
logger.info(f"Dataset ID: {DATASET_ID}")
logger.info(f"Test mode: {'Yes' if DRY_RUN else 'No'}")
logger.info(f"Local only: {'Yes' if LOCAL_ONLY else 'No'}")
logger.info("==================")
# Start repair process
repaired = repair_conversation_ids()
if DRY_RUN:
logger.info(f"TEST MODE: Would have repaired {repaired} files")
else:
logger.info(f"Successfully repaired {repaired} files")
|