SAGE-Bench / src /oss /oss_submission_handler.py
“pangjh3”
modified: app.py
944f8a1
#!/usr/bin/env python3
"""
OSS提交处理器 - 替换原有的git/http提交方式
在HuggingFace Spaces中直接将提交文件上传到OSS
"""
import os
import sys
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Tuple
# 导入同目录下的oss_file_manager
from .oss_file_manager import OSSFileManager
class OSSSubmissionHandler:
"""OSS提交处理器 - 将用户提交直接上传到OSS"""
def __init__(self, oss_submission_path: str = "atlas_eval/submissions/"):
"""
初始化OSS提交处理器
Args:
oss_submission_path: OSS中存储提交文件的路径
"""
self.oss_path = oss_submission_path
self.oss_manager = OSSFileManager()
print(f"📁 OSS提交路径: oss://opencompass/{oss_submission_path}")
def format_error(self, msg: str) -> str:
"""格式化错误消息"""
return f"<p style='color: red; font-size: 16px;'>{msg}</p>"
def format_success(self, msg: str) -> str:
"""格式化成功消息"""
return f"<p style='color: green; font-size: 16px;'>{msg}</p>"
def format_warning(self, msg: str) -> str:
"""格式化警告消息"""
return f"<p style='color: orange; font-size: 16px;'>{msg}</p>"
def validate_sage_submission(self, submission_data: Dict[str, Any]) -> Tuple[bool, str]:
"""验证SAGE基准提交格式"""
# 检查必需的顶级字段
required_fields = ["submission_org", "submission_email", "predictions"]
for field in required_fields:
if field not in submission_data:
return False, f"缺少必需字段: {field}"
# 验证邮箱格式(基本验证)
email = submission_data["submission_email"]
if "@" not in email or "." not in email:
return False, "邮箱格式无效"
# 验证predictions
predictions = submission_data["predictions"]
if not isinstance(predictions, list) or len(predictions) == 0:
return False, "predictions必须是非空列表"
for i, prediction in enumerate(predictions):
# 检查必需的prediction字段
pred_required_fields = ["original_question_id", "content", "reasoning_content"]
for field in pred_required_fields:
if field not in prediction:
return False, f"预测{i}中缺少字段: {field}"
# 验证content数组
content = prediction["content"]
reasoning_content = prediction["reasoning_content"]
if not isinstance(content, list) or len(content) != 4:
return False, f"预测{i}的content必须是包含4个项目的列表"
if not isinstance(reasoning_content, list):
return False, f"预测{i}的reasoning_content必须是列表类型"
# # reasoning_content可以为空列表,或者包含4个项目
# if len(reasoning_content) != 0 and len(reasoning_content) != 4:
# return False, f"预测{i}的reasoning_content必须是空列表或包含4个项目的列表"
# 验证question ID
if not isinstance(prediction["original_question_id"], int):
return False, f"预测{i}的question ID必须是整数"
return True, "提交格式有效"
def generate_submission_filename(self, submission_data: Dict[str, Any]) -> str:
"""生成提交文件名"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
org_name = submission_data["submission_org"].replace(" ", "_").replace("/", "_").replace("\\", "_")
return f"submission_{org_name}_{timestamp}.json"
def upload_to_oss(self, submission_data: Dict[str, Any], filename: str) -> Tuple[bool, str]:
"""上传提交文件到OSS"""
try:
# 创建临时本地文件
temp_file = f"/tmp/{filename}"
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(submission_data, f, indent=2, ensure_ascii=False)
# 上传到OSS
oss_file_path = f"{self.oss_path}{filename}"
print(f"⬆️ 上传到OSS: {oss_file_path}")
self.oss_manager.upload_file_to_object(
local_file_path=temp_file,
oss_file_path=oss_file_path,
replace=True
)
# 清理临时文件
os.remove(temp_file)
print(f"✅ OSS上传成功: {oss_file_path}")
return True, f"oss://opencompass/{oss_file_path}"
except Exception as e:
print(f"❌ OSS上传失败: {e}")
return False, str(e)
def process_sage_submission(self, submission_file_or_data, org_name=None, email=None) -> str:
"""
处理SAGE基准提交文件 - OSS模式
替换原有的git/http方式,直接上传到OSS
"""
try:
# 处理输入参数 - 可能是文件路径或者已经的数据
if submission_file_or_data is None:
return self.format_error("❌ 没有提供提交数据。")
# 如果是字符串,认为是文件路径
if isinstance(submission_file_or_data, str):
try:
with open(submission_file_or_data, 'r', encoding='utf-8') as f:
content = f.read()
# 解析JSON
submission_data = json.loads(content)
except Exception as e:
return self.format_error(f"❌ 读取文件时出错: {str(e)}")
# 如果是字典,直接使用
elif isinstance(submission_file_or_data, dict):
submission_data = submission_file_or_data
else:
return self.format_error("❌ 无效的提交数据格式。")
# 如果表单提供了组织名和邮箱,使用表单数据
if org_name and email:
submission_data["submission_org"] = org_name.strip()
submission_data["submission_email"] = email.strip()
# 验证提交格式
is_valid, message = self.validate_sage_submission(submission_data)
if not is_valid:
return self.format_error(f"❌ 提交验证失败: {message}")
# 生成文件名
filename = self.generate_submission_filename(submission_data)
# 上传到OSS
success, result = self.upload_to_oss(submission_data, filename)
if not success:
return self.format_error(f"❌ 上传到OSS失败: {result}")
# 生成成功消息
org = submission_data["submission_org"]
email_addr = submission_data["submission_email"]
num_predictions = len(submission_data["predictions"])
success_msg = self.format_success(f"""
🎉 <strong>提交成功!</strong><br><br>
📋 <strong>提交信息:</strong><br>
• 组织: {org}<br>
• 邮箱: {email_addr}<br>
• 预测数量: {num_predictions} 个问题<br>
• 文件名: {filename}<br><br>
🚀 <strong>存储位置:</strong><br>
{result}<br><br>
⚡ <strong>评测状态:</strong><br>
您的提交已成功上传到云存储,自动评测系统将在5-15分钟内开始处理。<br><br>
⏳ <strong>评测流程:</strong><br>
1. 🔍 系统自动检测到新提交<br>
2. ⬇️ 下载并验证提交格式<br>
3. 🔬 使用LLM-as-Judge进行全面评估<br>
4. 📊 计算各科目及总体准确率<br>
5. 🏆 自动更新到排行榜<br><br>
🕐 <strong>预计时间:</strong><br>
评测完成时间约5-15分钟,取决于当前队列长度。<br>
请稍后刷新排行榜查看结果。<br><br>
🧪 感谢您参与SAGE科学推理基准测试!
""")
return success_msg
except Exception as e:
return self.format_error(f"❌ 提交处理失败: {str(e)}")
# 兼容性函数 - 保持与原有代码的接口一致
def process_sage_submission_simple(submission_file, org_name=None, email=None) -> str:
"""
处理SAGE基准提交文件 - OSS模式
这是一个兼容性函数,保持与原有submit.py的接口一致
"""
handler = OSSSubmissionHandler()
return handler.process_sage_submission(submission_file, org_name, email)
def format_error(msg):
return f"<p style='color: red; font-size: 16px;'>{msg}</p>"
def format_success(msg):
return f"<p style='color: green; font-size: 16px;'>{msg}</p>"
def format_warning(msg):
return f"<p style='color: orange; font-size: 16px;'>{msg}</p>"
if __name__ == "__main__":
# 测试代码
print("🧪 测试OSS提交处理器")
# 检查环境变量
required_env_vars = ["OSS_ACCESS_KEY_ID", "OSS_ACCESS_KEY_SECRET"]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
print(f"❌ 缺少必需的环境变量: {missing_vars}")
exit(1)
handler = OSSSubmissionHandler()
print("✅ OSS提交处理器初始化成功")