import os import subprocess import csv import shutil import threading import logging import signal import sys from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed # ================= 配置区域 ================= # OpenAI API Key OPENAI_API_KEY = "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" # 源文件夹路径 SOURCE_REPOS_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_raw").resolve() # 基础输出路径 BASE_OUTPUT_DIR = Path("~/chemrepo").expanduser().resolve() # 全局失败日志路径 GLOBAL_ERROR_LOG = BASE_OUTPUT_DIR / "failures.log" # CSV 记录路径 CSV_FILE = BASE_OUTPUT_DIR / "run.csv" # 并发数量 MAX_WORKERS = 256 # =========================================== # 设置环境变量 os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY # 确保基础输出目录存在 BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # --- 全局锁与状态追踪 --- # 用于写入 failures.log 的锁 error_log_lock = threading.Lock() # 用于追踪当前正在处理的项目(用于中断时清理) active_projects = set() active_projects_lock = threading.Lock() def add_active_project(name): with active_projects_lock: active_projects.add(name) def remove_active_project(name): with active_projects_lock: active_projects.discard(name) def log_failure_globally(project_name, content, extra_info=""): """将失败信息写入全局日志""" with error_log_lock: with open(GLOBAL_ERROR_LOG, "a", encoding="utf-8") as g_log: g_log.write(f"\n{'='*40}\n") g_log.write(f"PROJECT: {project_name}\n") g_log.write(f"TIME: {datetime.now()}\n") g_log.write(f"STATUS: Failed/Interrupted\n") g_log.write(f"{'='*40}\n") g_log.write(content) if extra_info: g_log.write(f"\n[Details]: {extra_info}\n") g_log.write(f"\n{'='*40}\n") def cleanup_project_folder(project_name): """删除项目输出文件夹""" project_out_dir = BASE_OUTPUT_DIR / project_name if project_out_dir.exists(): try: shutil.rmtree(project_out_dir) print(f"🗑️ Deleted failed/interrupted directory: {project_out_dir}") except OSError as e: print(f"⚠️ Failed to delete directory {project_out_dir}: {e}") def process_single_project(project_path): """ 处理单个项目文件夹的任务函数 """ project_name = project_path.name start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 定义输出路径 project_out_dir = BASE_OUTPUT_DIR / project_name hp_dir = project_out_dir / "hp" mdp_dir = project_out_dir / "mdp" local_log_file = project_out_dir / "process.log" # --- 1. 检查输出文件夹是否存在 (断点续传) --- # 如果 hp 和 mdp 存在,且 mdp 不为空,才算跳过;如果为空,重新跑一遍可能也没意义, # 但根据逻辑这里只要文件夹在就跳过。如果你想重试空项目,可以把这里改一下。 if hp_dir.exists() and mdp_dir.exists(): return { "project": project_name, "status": "Skipped", "start_time": start_time, "end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # 标记为活跃项目 add_active_project(project_name) # 创建项目主目录 project_out_dir.mkdir(parents=True, exist_ok=True) status = "Failed" python_error = None # 使用 try-finally 确保即使线程崩溃也能从 active 列表移除 try: with open(local_log_file, "w", encoding="utf-8") as log_f: try: log_f.write(f"[{datetime.now()}] Processing project: {project_name}\n") # --- 2. 确保 .gitignore 存在 --- gitignore_path = project_path / ".gitignore" if not gitignore_path.exists(): gitignore_path.touch() log_f.write(f"[{datetime.now()}] Created .gitignore file.\n") # --- 3. 构建命令 --- cmd = [ "repoagent", "run", "-m", "gpt-5.1-2025-11-13", "-r", "1", "-tp", str(project_path.absolute()), "--print-hierarchy", "-hp", str(hp_dir), "-mdp", str(mdp_dir) ] log_f.write(f"[{datetime.now()}] Command: {' '.join(cmd)}\n") log_f.write(f"[{datetime.now()}] Starting RepoAgent...\n") log_f.flush() # --- 4. 执行命令 --- subprocess.run(cmd, stdout=log_f, stderr=subprocess.STDOUT, check=True) # --- 5. 检查是否生成了文档 (新增逻辑) --- has_docs = False if mdp_dir.exists(): # 检查目录下是否有任何文件 if any(mdp_dir.iterdir()): has_docs = True if has_docs: status = "Success" log_f.write(f"\n[{datetime.now()}] Completed successfully.\n") else: status = "EmptyProject" log_f.write(f"\n[{datetime.now()}] Finished, but mdp folder is EMPTY. Marked as EmptyProject.\n") except Exception as e: status = "Failed" python_error = str(e) try: log_f.write(f"\n[{datetime.now()}] ERROR: {python_error}\n") except: pass print(f"❌ Error processing {project_name}: {python_error}") # --- 6. 失败处理逻辑 --- if status == "Failed": # 读取日志内容 failed_log_content = "" if local_log_file.exists(): try: with open(local_log_file, "r", encoding="utf-8", errors='ignore') as f: failed_log_content = f.read() except: failed_log_content = "Read Error" # 写入全局日志 log_failure_globally(project_name, failed_log_content, python_error) # 删除文件夹 cleanup_project_folder(project_name) except Exception: # 兜底捕获 pass finally: remove_active_project(project_name) return { "project": project_name, "status": status, "start_time": start_time, "end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def main(): if not SOURCE_REPOS_DIR.exists(): print(f"Error: Source directory {SOURCE_REPOS_DIR} does not exist.") return # CSV 头部 csv_headers = ["project", "status", "start_time", "end_time"] # 初始化 CSV file_exists = CSV_FILE.exists() with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=csv_headers) if not file_exists: writer.writeheader() # --- 1. 获取项目并按首字母排序 --- projects = sorted([p for p in SOURCE_REPOS_DIR.iterdir() if p.is_dir()], key=lambda x: x.name) print(f"Found {len(projects)} projects (Sorted A-Z).\nOutput Dir: {BASE_OUTPUT_DIR}") print(f"Failures Log: {GLOBAL_ERROR_LOG}") print(f"Starting concurrent processing with {MAX_WORKERS} workers...\n") print(f"💡 Press Ctrl+C to stop. Interrupted projects will be cleaned up automatically.\n") executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) try: future_to_project = {executor.submit(process_single_project, p): p for p in projects} with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=csv_headers) for future in as_completed(future_to_project): result = future.result() writer.writerow(result) f.flush() # 控制台输出增加 EmptyProject 的显示 if result["status"] == "Success": print(f"✅ {result['project']} Finished.") elif result["status"] == "EmptyProject": print(f"⚠️ {result['project']} Finished (Empty - No Docs Generated).") elif result["status"] == "Skipped": print(f"⏭️ {result['project']} Skipped.") else: print(f"❌ {result['project']} Failed.") except KeyboardInterrupt: print("\n\n🛑 KeyboardInterrupt detected! Stopping workers...") executor.shutdown(wait=False, cancel_futures=True) print("🧹 Cleaning up active incomplete projects...") with active_projects_lock: projects_to_clean = list(active_projects) for proj_name in projects_to_clean: log_failure_globally(proj_name, "Process terminated by User (KeyboardInterrupt).") cleanup_project_folder(proj_name) print("Done. Exiting.") sys.exit(0) print(f"\nAll tasks completed. \nCSV: {CSV_FILE}") if __name__ == "__main__": main()