|
|
import os |
|
|
import subprocess |
|
|
import csv |
|
|
import shutil |
|
|
import threading |
|
|
import logging |
|
|
import signal |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
|
|
|
|
|
|
OPENAI_API_KEY = "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" |
|
|
|
|
|
|
|
|
SOURCE_REPOS_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_raw").resolve() |
|
|
|
|
|
|
|
|
BASE_OUTPUT_DIR = Path("~/chemrepo").expanduser().resolve() |
|
|
|
|
|
|
|
|
GLOBAL_ERROR_LOG = BASE_OUTPUT_DIR / "failures.log" |
|
|
|
|
|
|
|
|
CSV_FILE = BASE_OUTPUT_DIR / "run.csv" |
|
|
|
|
|
|
|
|
MAX_WORKERS = 256 |
|
|
|
|
|
|
|
|
|
|
|
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY |
|
|
|
|
|
|
|
|
BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
error_log_lock = threading.Lock() |
|
|
|
|
|
active_projects = set() |
|
|
active_projects_lock = threading.Lock() |
|
|
|
|
|
def add_active_project(name): |
|
|
with active_projects_lock: |
|
|
active_projects.add(name) |
|
|
|
|
|
def remove_active_project(name): |
|
|
with active_projects_lock: |
|
|
active_projects.discard(name) |
|
|
|
|
|
def log_failure_globally(project_name, content, extra_info=""): |
|
|
"""将失败信息写入全局日志""" |
|
|
with error_log_lock: |
|
|
with open(GLOBAL_ERROR_LOG, "a", encoding="utf-8") as g_log: |
|
|
g_log.write(f"\n{'='*40}\n") |
|
|
g_log.write(f"PROJECT: {project_name}\n") |
|
|
g_log.write(f"TIME: {datetime.now()}\n") |
|
|
g_log.write(f"STATUS: Failed/Interrupted\n") |
|
|
g_log.write(f"{'='*40}\n") |
|
|
g_log.write(content) |
|
|
if extra_info: |
|
|
g_log.write(f"\n[Details]: {extra_info}\n") |
|
|
g_log.write(f"\n{'='*40}\n") |
|
|
|
|
|
def cleanup_project_folder(project_name): |
|
|
"""删除项目输出文件夹""" |
|
|
project_out_dir = BASE_OUTPUT_DIR / project_name |
|
|
if project_out_dir.exists(): |
|
|
try: |
|
|
shutil.rmtree(project_out_dir) |
|
|
print(f"🗑️ Deleted failed/interrupted directory: {project_out_dir}") |
|
|
except OSError as e: |
|
|
print(f"⚠️ Failed to delete directory {project_out_dir}: {e}") |
|
|
|
|
|
def process_single_project(project_path): |
|
|
""" |
|
|
处理单个项目文件夹的任务函数 |
|
|
""" |
|
|
project_name = project_path.name |
|
|
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
|
|
|
project_out_dir = BASE_OUTPUT_DIR / project_name |
|
|
hp_dir = project_out_dir / "hp" |
|
|
mdp_dir = project_out_dir / "mdp" |
|
|
local_log_file = project_out_dir / "process.log" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if hp_dir.exists() and mdp_dir.exists(): |
|
|
return { |
|
|
"project": project_name, |
|
|
"status": "Skipped", |
|
|
"start_time": start_time, |
|
|
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
} |
|
|
|
|
|
|
|
|
add_active_project(project_name) |
|
|
|
|
|
|
|
|
project_out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
status = "Failed" |
|
|
python_error = None |
|
|
|
|
|
|
|
|
try: |
|
|
with open(local_log_file, "w", encoding="utf-8") as log_f: |
|
|
try: |
|
|
log_f.write(f"[{datetime.now()}] Processing project: {project_name}\n") |
|
|
|
|
|
|
|
|
gitignore_path = project_path / ".gitignore" |
|
|
if not gitignore_path.exists(): |
|
|
gitignore_path.touch() |
|
|
log_f.write(f"[{datetime.now()}] Created .gitignore file.\n") |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"repoagent", "run", |
|
|
"-m", "gpt-5.1-2025-11-13", |
|
|
"-r", "1", |
|
|
"-tp", str(project_path.absolute()), |
|
|
"--print-hierarchy", |
|
|
"-hp", str(hp_dir), |
|
|
"-mdp", str(mdp_dir) |
|
|
] |
|
|
|
|
|
log_f.write(f"[{datetime.now()}] Command: {' '.join(cmd)}\n") |
|
|
log_f.write(f"[{datetime.now()}] Starting RepoAgent...\n") |
|
|
log_f.flush() |
|
|
|
|
|
|
|
|
subprocess.run(cmd, stdout=log_f, stderr=subprocess.STDOUT, check=True) |
|
|
|
|
|
|
|
|
has_docs = False |
|
|
if mdp_dir.exists(): |
|
|
|
|
|
if any(mdp_dir.iterdir()): |
|
|
has_docs = True |
|
|
|
|
|
if has_docs: |
|
|
status = "Success" |
|
|
log_f.write(f"\n[{datetime.now()}] Completed successfully.\n") |
|
|
else: |
|
|
status = "EmptyProject" |
|
|
log_f.write(f"\n[{datetime.now()}] Finished, but mdp folder is EMPTY. Marked as EmptyProject.\n") |
|
|
|
|
|
except Exception as e: |
|
|
status = "Failed" |
|
|
python_error = str(e) |
|
|
try: log_f.write(f"\n[{datetime.now()}] ERROR: {python_error}\n") |
|
|
except: pass |
|
|
print(f"❌ Error processing {project_name}: {python_error}") |
|
|
|
|
|
|
|
|
if status == "Failed": |
|
|
|
|
|
failed_log_content = "" |
|
|
if local_log_file.exists(): |
|
|
try: |
|
|
with open(local_log_file, "r", encoding="utf-8", errors='ignore') as f: |
|
|
failed_log_content = f.read() |
|
|
except: failed_log_content = "Read Error" |
|
|
|
|
|
|
|
|
log_failure_globally(project_name, failed_log_content, python_error) |
|
|
|
|
|
|
|
|
cleanup_project_folder(project_name) |
|
|
|
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
finally: |
|
|
remove_active_project(project_name) |
|
|
|
|
|
return { |
|
|
"project": project_name, |
|
|
"status": status, |
|
|
"start_time": start_time, |
|
|
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
} |
|
|
|
|
|
def main(): |
|
|
if not SOURCE_REPOS_DIR.exists(): |
|
|
print(f"Error: Source directory {SOURCE_REPOS_DIR} does not exist.") |
|
|
return |
|
|
|
|
|
|
|
|
csv_headers = ["project", "status", "start_time", "end_time"] |
|
|
|
|
|
|
|
|
file_exists = CSV_FILE.exists() |
|
|
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=csv_headers) |
|
|
if not file_exists: |
|
|
writer.writeheader() |
|
|
|
|
|
|
|
|
projects = sorted([p for p in SOURCE_REPOS_DIR.iterdir() if p.is_dir()], key=lambda x: x.name) |
|
|
|
|
|
print(f"Found {len(projects)} projects (Sorted A-Z).\nOutput Dir: {BASE_OUTPUT_DIR}") |
|
|
print(f"Failures Log: {GLOBAL_ERROR_LOG}") |
|
|
print(f"Starting concurrent processing with {MAX_WORKERS} workers...\n") |
|
|
print(f"💡 Press Ctrl+C to stop. Interrupted projects will be cleaned up automatically.\n") |
|
|
|
|
|
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) |
|
|
|
|
|
try: |
|
|
future_to_project = {executor.submit(process_single_project, p): p for p in projects} |
|
|
|
|
|
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=csv_headers) |
|
|
|
|
|
for future in as_completed(future_to_project): |
|
|
result = future.result() |
|
|
writer.writerow(result) |
|
|
f.flush() |
|
|
|
|
|
|
|
|
if result["status"] == "Success": |
|
|
print(f"✅ {result['project']} Finished.") |
|
|
elif result["status"] == "EmptyProject": |
|
|
print(f"⚠️ {result['project']} Finished (Empty - No Docs Generated).") |
|
|
elif result["status"] == "Skipped": |
|
|
print(f"⏭️ {result['project']} Skipped.") |
|
|
else: |
|
|
print(f"❌ {result['project']} Failed.") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\n🛑 KeyboardInterrupt detected! Stopping workers...") |
|
|
|
|
|
executor.shutdown(wait=False, cancel_futures=True) |
|
|
|
|
|
print("🧹 Cleaning up active incomplete projects...") |
|
|
with active_projects_lock: |
|
|
projects_to_clean = list(active_projects) |
|
|
|
|
|
for proj_name in projects_to_clean: |
|
|
log_failure_globally(proj_name, "Process terminated by User (KeyboardInterrupt).") |
|
|
cleanup_project_folder(proj_name) |
|
|
|
|
|
print("Done. Exiting.") |
|
|
sys.exit(0) |
|
|
|
|
|
print(f"\nAll tasks completed. \nCSV: {CSV_FILE}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |