sudanl commited on
Commit
9845917
·
1 Parent(s): ff8fd92

refactor: Replace complex OSS manager with simplified version - remove compassflow dependency

Browse files
src/oss/oss_file_manager.py CHANGED
@@ -1,517 +1,254 @@
1
- # %%
2
- import datetime as dt
3
- import os
4
- import re
5
- from logging import Logger
6
- from multiprocessing import Pool
7
- from multiprocessing.pool import ThreadPool
8
- from pathlib import Path
9
- from typing import List, Union
10
 
 
11
  import oss2
 
 
 
 
12
  from loguru import logger
13
- from oss2.credentials import EnvironmentVariableCredentialsProvider
14
- from tqdm import tqdm
15
-
16
- from compassflow.constants import DATADIR
17
- from compassflow.oss.oss import OssBucket
18
- from compassflow.utils import starstarmap
19
 
20
 
21
  class OSSFileManager:
 
 
22
  def __init__(
23
  self,
24
  oss_access_key_id: str = None,
25
  oss_access_key_secret: str = None,
26
- region: str = "http://oss-cn-shanghai.aliyuncs.com",
27
- bucket_name: str = "opencompass",
28
- oss_block_name: str = None,
29
- logger: Logger = logger,
30
- ) -> None:
31
- """OSS File Manager
32
-
33
  Args:
34
- oss_access_key_id (str, optional): _description_. Defaults to None.
35
- oss_access_key_secret (str, optional): _description_. Defaults to None.
36
- region (_type_, optional): _description_. Defaults to 'http://oss-cn-shanghai.aliyuncs.com'.
37
- bucket_name (str, optional): _description_. Defaults to 'opencompass'.
38
- oss_block_name: oss_block_name which is defined in the prefect
39
- logger (Logger, optional): _description_. Defaults to logger.
40
  """
41
- self.logger = logger
42
- if oss_block_name is not None:
43
- oss_bucket = OssBucket.load(oss_block_name)
44
- self._bucket = oss_bucket._get_bucket()
45
- return
46
-
47
- # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录RAM控制台创建RAM账号。
48
- if oss_access_key_id is not None and oss_access_key_secret is not None:
49
- os.environ["OSS_ACCESS_KEY_ID"] = oss_access_key_id
50
- os.environ["OSS_ACCESS_KEY_SECRET"] = oss_access_key_secret
51
-
52
- if (
53
- os.getenv("OSS_ACCESS_KEY_ID") is None
54
- or os.getenv("OSS_ACCESS_KEY_SECRET") is None
55
- ):
56
- raise ValueError("Access Key ID and Access Key Secret cannot be empty.")
57
-
58
- auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
59
-
60
- # Endpoint以杭州为例,其它Region请按实际情况填写。
61
- # 填写Bucket名称,例如examplebucket。
62
- bucket = oss2.Bucket(
63
- auth=auth,
64
- endpoint=region,
65
- bucket_name=bucket_name,
66
- )
67
-
68
- self._bucket = bucket
69
-
70
- def list_latest_files_by_date(
71
- self,
72
- object_dir: str = "",
73
- delimiter: str = "/",
74
- start_date: Union[str, dt.date, dt.datetime] = None,
75
- end_date: Union[str, dt.date, dt.datetime] = None,
76
- max_num_files: int = 10,
77
- date_pattern: str = r"([0-9]{4}-[0-9]{2}-[0-9]{2})",
78
- file_date_format: str = "%Y-%m-%d",
79
- suffix: str = "",
80
- ) -> List[Union[str, Path]]:
81
- """List the latest files by date in an OSS bucket directory
82
-
83
  Args:
84
- object_dir (str, optional): _description_. Defaults to ''.
85
- delimiter (str, optional): _description_. Defaults to '/'.
86
- start_date (Union[str, dt.date], optional): _description_. Defaults to None.
87
- end_date (Union[str, dt.date], optional): _description_. Defaults to None.
88
- max_num_files (int, optional): _description_. Defaults to 10.
89
- date_pattern (str, optional): _description_. Defaults to r'^([0-9]{4}-[0-9]{2}-[0-9]{2})'.
90
- suffix (str, optional): _description_. Defaults to ''.
91
-
92
  Returns:
93
- List[Union[str, Path]]: _description_
94
  """
95
- if start_date is not None:
96
- if isinstance(start_date, str):
97
- start_date = dt.datetime.strptime(start_date.replace("-", ""), "%Y%m%d")
98
-
99
- if isinstance(start_date, dt.date):
100
- start_date = dt.datetime(
101
- start_date.year, start_date.month, start_date.day, 0, 0, 0
102
- )
103
-
104
- if end_date is not None:
105
- if isinstance(end_date, str):
106
- end_date = dt.datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
107
-
108
- if isinstance(end_date, dt.date):
109
- end_date = dt.datetime(
110
- end_date.year, end_date.month, end_date.day, 0, 0, 0
111
- )
112
-
113
- object_iter = oss2.ObjectIterator(
114
- bucket=self._bucket, prefix=object_dir, delimiter=delimiter
115
- )
116
-
117
- root_dir = Path(object_dir)
118
- filenames = []
119
- for filename in object_iter:
120
- filename = filename.key.replace(object_dir, "")
121
- # print(filename)
122
-
123
- if filename.endswith(suffix):
124
- # Match date pattern in filename
125
- date_search = re.search(date_pattern, filename)
126
- if date_search:
127
- file_date = dt.datetime.strptime(
128
- date_search.group(1), file_date_format
129
- )
130
- else:
131
- self.logger.warning(
132
- f"date pattern doesn't match, skipping file {filename}"
133
- )
134
  continue
135
-
136
- # Check if file date within specified range
137
- if start_date is not None:
138
- if start_date > file_date:
139
- continue
140
-
141
- if end_date is not None:
142
- if end_date < file_date:
143
- continue
144
-
145
- filepath = root_dir / filename
146
- # name_tstamp_tuple = (filepath, os.path.getmtime(filepath))
147
- # filenames.append(name_tstamp_tuple)
148
- filenames.append(str(filepath))
149
- # sort by tstamp
150
- # filenames = sorted(filenames, key=lambda x: x[1])
151
- filenames = sorted(filenames)
152
- # filenames = [x[0] for x in filenames]
153
-
154
- max_num_files = max_num_files or len(filenames)
155
- filenames = filenames[-max_num_files:]
156
-
157
- self.logger.info(f"{filenames=}")
158
-
159
- return filenames
160
-
161
- def download_object_to_file(
162
- self,
163
- oss_file_path: str | Path,
164
- local_file_path: str | Path,
165
- replace: bool = False,
166
- make_dir: bool = False,
167
- print_logs: bool = True,
168
- ):
169
- """Download a single OSS object to local file.
170
-
171
  Args:
172
- oss_file_path (str): _description_
173
- local_file_path (str): _description_
174
- replace (bool, optional): _description_. Defaults to False.
175
- make_dir (bool, optional): Whether to create intermediate dirs if they don't exist. Defaults to False.
176
- print_logs: bool, optional): Whether to print logs. Defaults to True.
177
  """
178
- if isinstance(local_file_path, str):
179
- local_file_path = Path(local_file_path)
180
-
181
- if not replace:
182
- if local_file_path.exists():
183
- if print_logs:
184
- err_msg = f"{local_file_path} already exists, skipping file..."
185
- self.logger.info(err_msg)
186
-
187
- return
188
-
189
- if print_logs:
190
- if local_file_path.exists():
191
- err_msg = f"{local_file_path} already exists, replacing file..."
192
- self.logger.info(err_msg)
193
-
194
- if make_dir:
195
- os.makedirs(local_file_path.parent, exist_ok=True)
196
-
197
- self._bucket.get_object_to_file(
198
- key=str(oss_file_path),
199
- filename=local_file_path,
200
- )
201
-
202
- def download_objects_to_files(
203
- self,
204
- file_download_mapping: list[tuple[str | Path, str | Path]],
205
- oss_base_dir: str = None,
206
- local_base_dir: str = None,
207
- replace: bool = True,
208
- num_threads: int = 1,
209
- **kwargs,
210
- ) -> None:
211
- """Download objects from OSS to local storage.
212
-
213
  Args:
214
- file_download_mapping (list[tuple[str | Path, str | Path]]): A list of file path pairs that maps the OSS file path (to download) to the local file path (download location).
215
- oss_base_dir (str): OSS directory path to be prepended to all OSS file paths.
216
- local_base_dir (str, optional): Base directory path to be prepended to all local file paths.
217
- replace (bool, optional): _description_. Defaults to True.
218
- num_threads (int, optional): _description_. Defaults to 1.
219
- **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
220
  """
221
- if isinstance(oss_base_dir, str):
222
- oss_base_dir = Path(oss_base_dir)
223
-
224
- if isinstance(local_base_dir, str):
225
- local_base_dir = Path(local_base_dir)
226
-
227
- if not isinstance(file_download_mapping, list):
228
- raise TypeError("file_download_mapping must be a list of 2 value tuples.")
229
-
230
- for item in file_download_mapping:
231
- if not isinstance(item, tuple):
232
- raise TypeError(
233
- "Each item in the file_download_mapping list must be a 2 value tuple."
234
- )
235
-
236
- if len(item) != 2:
237
- raise ValueError(
238
- "Each tuple in the file_download_mapping list must be length 2."
239
- )
240
-
241
- if num_threads == 1:
242
- for local_file_path, oss_file_path in file_download_mapping:
243
- if local_base_dir is not None:
244
- local_file_path = local_base_dir / local_file_path
245
-
246
- if oss_base_dir is not None:
247
- oss_file_path = oss_base_dir / oss_file_path
248
-
249
- self.download_object_to_file(
250
- oss_file_path=oss_file_path,
251
- local_file_path=local_file_path,
252
- replace=replace,
253
- )
254
-
255
- return
256
-
257
- # Start multithreaded process if num_threads > 1
258
- with ThreadPool(num_threads) as p:
259
- pool_args_list = []
260
- for local_file_path, oss_file_path in file_download_mapping:
261
- if local_base_dir is not None:
262
- local_file_path = local_base_dir / local_file_path
263
-
264
- if oss_base_dir is not None:
265
- oss_file_path = oss_base_dir / oss_file_path
266
-
267
- args_dict = dict(
268
- oss_file_path=oss_file_path,
269
- local_file_path=local_file_path,
270
- replace=replace,
271
- print_logs=True,
272
- )
273
- pool_args_list.append(args_dict)
274
-
275
- ret_all = list(
276
- tqdm(
277
- starstarmap(
278
- pool=p,
279
- fn=self.download_object_to_file,
280
- kwargs_iter=pool_args_list,
281
- ),
282
- total=len(file_download_mapping),
283
- )
284
- )
285
-
286
- def download_latest_objects_to_dir(
287
- self,
288
- oss_object_dir: str,
289
- local_dir: str,
290
- start_date: Union[str, dt.date, dt.datetime] = None,
291
- end_date: Union[str, dt.date, dt.datetime] = None,
292
- date_pattern: str = r"([0-9]{4}-[0-9]{2}-[0-9]{2})",
293
- file_date_format: str = "%Y-%m-%d",
294
- max_num_files: int = 5,
295
- replace: bool = True,
296
- make_dir: bool = True,
297
- delimiter: str = "/",
298
- num_threads: int = 1,
299
- suffix: str = "",
300
- **kwargs,
301
- ) -> None:
302
- """Download the latest objects from oss to local directory
303
-
304
  Args:
305
- oss_object_dir (str): _description_
306
- local_dir (str): _description_
307
- start_date (Union[str, dt.date, dt.datetime], optional): _description_. Defaults to None.
308
- end_date (Union[str, dt.date, dt.datetime], optional): _description_. Defaults to None.
309
- max_num_files (int, optional): _description_. Defaults to 5.
310
- replace (bool, optional): _description_. Defaults to True.
311
- delimiter (str, optional): _description_. Defaults to '/'.
312
- num_threads (int, optional): _description_. Defaults to 1.
313
- suffix (str, optional): _description_. Defaults to ''.
314
- **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
315
  """
316
- if isinstance(local_dir, str):
317
- local_dir = Path(local_dir)
318
-
319
- oss_file_list = self.list_latest_files_by_date(
320
- object_dir=oss_object_dir,
321
- delimiter=delimiter,
322
- start_date=start_date,
323
- end_date=end_date,
324
- date_pattern=date_pattern,
325
- file_date_format=file_date_format,
326
- max_num_files=max_num_files,
327
- suffix=suffix,
328
- **kwargs,
329
- )
330
-
331
- if num_threads == 1:
332
- for oss_file_path in oss_file_list:
333
- file_name = Path(oss_file_path).name
334
- local_file_path = local_dir / file_name
335
-
336
- self.download_object_to_file(
337
- oss_file_path=oss_file_path,
338
- local_file_path=local_file_path,
339
- replace=replace,
340
- make_dir=make_dir,
341
- )
342
-
343
- return
344
-
345
- # Start multithreaded process if num_threads > 1
346
- with ThreadPool(num_threads) as p:
347
- pool_args_list = []
348
- for oss_file_path in oss_file_list:
349
- file_name = Path(oss_file_path).name
350
- local_file_path = local_dir / file_name
351
-
352
- args_dict = dict(
353
- oss_file_path=str(oss_file_path),
354
- local_file_path=str(local_file_path),
355
- replace=replace,
356
- make_dir=make_dir,
357
- print_logs=True,
358
- )
359
- pool_args_list.append(args_dict)
360
-
361
- ret_all = list(
362
- tqdm(
363
- starstarmap(
364
- pool=p,
365
- fn=self.download_object_to_file,
366
- kwargs_iter=pool_args_list,
367
- ),
368
- total=len(oss_file_list),
369
- )
370
- )
371
-
372
- def upload_file_to_object(
373
- self,
374
- local_file_path: str,
375
- oss_file_path: str | Path,
376
- replace: bool = False,
377
- print_logs: bool = True,
378
- ):
379
- """Upload a single local file to OSS
380
-
381
  Args:
382
- oss_file_path (str): _description_
383
- local_file_path (str): _description_
384
- replace (bool, optional): _description_. Defaults to False.
 
385
  """
386
- if isinstance(local_file_path, Path):
387
- local_file_path = str(local_file_path)
388
-
389
- if isinstance(oss_file_path, Path):
390
- oss_file_path = str(oss_file_path)
391
-
392
- # Check if file already exists
393
- is_file_exists = self._bucket.object_exists(
394
- key=oss_file_path,
395
- )
396
-
397
- if is_file_exists:
398
- if replace:
399
- if print_logs:
400
- err_msg = f"{oss_file_path} already exists, replacing file..."
401
- self.logger.info(err_msg)
402
-
403
- self._bucket.put_object_from_file(
404
- key=str(oss_file_path),
405
- filename=local_file_path,
406
- )
407
-
408
- else:
409
- if print_logs:
410
- err_msg = f"{oss_file_path} already exists, skipping file..."
411
- self.logger.info(err_msg)
412
-
413
- return
414
-
415
- self._bucket.put_object_from_file(
416
- key=oss_file_path,
417
- filename=local_file_path,
418
- )
419
-
420
- def upload_files_to_objects(
421
- self,
422
- file_upload_mapping: list[tuple[str | Path, str | Path]],
423
- local_base_dir: str = None,
424
- oss_base_dir: str = None,
425
- replace: bool = True,
426
- num_threads: int = 1,
427
- **kwargs,
428
- ) -> None:
429
- """Upload files from local storage to OSS.
430
-
431
  Args:
432
- file_upload_mapping (list[tuple[str | Path, str | Path]]): A list of file path pairs that maps the local file path (to upload) to the OSS file path (upload location).
433
- oss_base_dir (str): OSS directory path to be prepended to all OSS file paths.
434
- local_base_dir (str, optional): Base directory path to be prepended to all local file paths.
435
- replace (bool, optional): _description_. Defaults to True.
436
- num_threads (int, optional): _description_. Defaults to 1.
437
- **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
438
  """
439
- if isinstance(oss_base_dir, str):
440
- oss_base_dir = Path(oss_base_dir)
441
-
442
- if isinstance(local_base_dir, str):
443
- local_base_dir = Path(local_base_dir)
444
-
445
- if not isinstance(file_upload_mapping, list):
446
- raise TypeError("file_upload_mapping must be a list of 2 value tuples.")
447
-
448
- for item in file_upload_mapping:
449
- if not isinstance(item, tuple):
450
- raise TypeError(
451
- "Each item in the file_upload_mapping list must be a 2 value tuple."
452
- )
453
-
454
- if len(item) != 2:
455
- raise ValueError(
456
- "Each tuple in the file_upload_mapping list must be length 2."
457
- )
458
-
459
- if num_threads == 1:
460
- for local_file_path, oss_file_path in file_upload_mapping:
461
- if local_base_dir is not None:
462
- local_file_path = local_base_dir / local_file_path
463
-
464
- if oss_base_dir is not None:
465
- oss_file_path = oss_base_dir / oss_file_path
466
 
467
- self.upload_file_to_object(
468
- oss_file_path=oss_file_path,
469
- local_file_path=local_file_path,
470
- replace=replace,
471
- )
472
 
473
- return
 
 
 
474
 
475
- # Start multithreaded process if num_threads > 1
476
- with ThreadPool(num_threads) as p:
477
- pool_args_list = []
478
- for local_file_path, oss_file_path in file_upload_mapping:
479
- if local_base_dir is not None:
480
- local_file_path = local_base_dir / local_file_path
481
 
482
- if oss_base_dir is not None:
483
- oss_file_path = oss_base_dir / oss_file_path
484
-
485
- args_dict = dict(
486
- oss_file_path=oss_file_path,
487
- local_file_path=local_file_path,
488
- replace=replace,
489
- print_logs=True,
490
- )
491
- pool_args_list.append(args_dict)
492
-
493
- ret_all = list(
494
- tqdm(
495
- starstarmap(
496
- pool=p,
497
- fn=self.upload_file_to_object,
498
- kwargs_iter=pool_args_list,
499
- ),
500
- total=len(file_upload_mapping),
501
- )
502
- )
503
-
504
- @property
505
- def bucket(self) -> oss2.Bucket:
506
- return self._bucket
507
-
508
-
509
- # %%
510
  if __name__ == "__main__":
511
- # %% Initialize
512
- oss_file_manager = OSSFileManager(logger=logger)
513
-
514
- # %% List the latest files by date (based on file suffix) in an OSS directory
515
- oss_file_manager.list_latest_files_by_date(
516
- "compass-arena/dev/data/conversations/", max_num_files=5
517
- )
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ 简化的OSS文件管理器 - 专为SAGE-Bench HuggingFace Space设计
4
+ 移除了对 compassflow 的依赖,只保留必需的OSS功能
5
+ """
 
 
 
 
6
 
7
+ import os
8
  import oss2
9
+ import json
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ from typing import List, Dict, Optional
13
  from loguru import logger
 
 
 
 
 
 
14
 
15
 
16
  class OSSFileManager:
17
+ """简化的OSS文件管理器"""
18
+
19
  def __init__(
20
  self,
21
  oss_access_key_id: str = None,
22
  oss_access_key_secret: str = None,
23
+ oss_region: str = None,
24
+ oss_bucket_name: str = None
25
+ ):
26
+ """
27
+ 初始化OSS文件管理器
28
+
 
29
  Args:
30
+ oss_access_key_id: OSS访问密钥ID
31
+ oss_access_key_secret: OSS访问密钥Secret
32
+ oss_region: OSS区域端点
33
+ oss_bucket_name: OSS存储桶名称
 
 
34
  """
35
+ # 从环境变量获取配置
36
+ self.access_key_id = oss_access_key_id or os.getenv('OSS_ACCESS_KEY_ID')
37
+ self.access_key_secret = oss_access_key_secret or os.getenv('OSS_ACCESS_KEY_SECRET')
38
+ self.region = oss_region or os.getenv('OSS_REGION', 'http://oss-cn-shanghai.aliyuncs.com')
39
+ self.bucket_name = oss_bucket_name or os.getenv('OSS_BUCKET_NAME', 'opencompass')
40
+
41
+ if not self.access_key_id or not self.access_key_secret:
42
+ raise ValueError("OSS访问密钥未设置。请设置 OSS_ACCESS_KEY_ID OSS_ACCESS_KEY_SECRET 环境变量。")
43
+
44
+ # 初始化OSS客户端
45
+ auth = oss2.Auth(self.access_key_id, self.access_key_secret)
46
+ self.bucket = oss2.Bucket(auth, self.region, self.bucket_name)
47
+
48
+ logger.info(f"OSS初始化成功: {self.bucket_name} @ {self.region}")
49
+
50
+ def list_files(
51
+ self,
52
+ oss_dir: str = "",
53
+ after_date: datetime = None,
54
+ file_extension: str = None
55
+ ) -> List[Dict]:
56
+ """
57
+ 列出OSS目录中的文件
58
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  Args:
60
+ oss_dir: OSS目录路径
61
+ after_date: 只返回此日期之后的文件
62
+ file_extension: 文件扩展名过滤 ( ".json")
63
+
 
 
 
 
64
  Returns:
65
+ 文件信息列表
66
  """
67
+ try:
68
+ files = []
69
+
70
+ # 确保目录路径以 / 结尾
71
+ if oss_dir and not oss_dir.endswith('/'):
72
+ oss_dir += '/'
73
+
74
+ # 列出对象
75
+ for obj in oss2.ObjectIterator(self.bucket, prefix=oss_dir):
76
+ # 跳过目录本身
77
+ if obj.key.endswith('/'):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  continue
79
+
80
+ # 文件扩展名过滤
81
+ if file_extension and not obj.key.endswith(file_extension):
82
+ continue
83
+
84
+ # 日期过滤
85
+ if after_date and obj.last_modified < after_date:
86
+ continue
87
+
88
+ file_info = {
89
+ 'key': obj.key,
90
+ 'name': os.path.basename(obj.key),
91
+ 'size': obj.size,
92
+ 'last_modified': obj.last_modified,
93
+ 'etag': obj.etag
94
+ }
95
+ files.append(file_info)
96
+
97
+ logger.info(f"找到 {len(files)} 个文件在 {oss_dir}")
98
+ return files
99
+
100
+ except Exception as e:
101
+ logger.error(f"列出文件失败: {e}")
102
+ raise
103
+
104
+ def download_file(self, oss_file_path: str, local_file_path: str) -> bool:
105
+ """
106
+ 从OSS下载文件到本地
107
+
 
 
 
 
 
 
 
108
  Args:
109
+ oss_file_path: OSS文件路径
110
+ local_file_path: 本地文件路径
111
+
112
+ Returns:
113
+ 下载是否成功
114
  """
115
+ try:
116
+ # 确保本地目录存在
117
+ local_dir = os.path.dirname(local_file_path)
118
+ if local_dir:
119
+ os.makedirs(local_dir, exist_ok=True)
120
+
121
+ # 下载文件
122
+ self.bucket.get_object_to_file(oss_file_path, local_file_path)
123
+
124
+ logger.info(f"下载成功: {oss_file_path} -> {local_file_path}")
125
+ return True
126
+
127
+ except Exception as e:
128
+ logger.error(f"下载文件失败: {oss_file_path} -> {local_file_path}, 错误: {e}")
129
+ return False
130
+
131
+ def upload_file_to_object(
132
+ self,
133
+ local_file_path: str,
134
+ oss_file_path: str,
135
+ replace: bool = False
136
+ ) -> bool:
137
+ """
138
+ 上传本地文件到OSS
139
+
 
 
 
 
 
 
 
 
 
 
140
  Args:
141
+ local_file_path: 本地文件路径
142
+ oss_file_path: OSS文件路径
143
+ replace: 是否替换已存在的文件
144
+
145
+ Returns:
146
+ 上传是否成功
147
  """
148
+ try:
149
+ # 检查本地文件是否存在
150
+ if not os.path.exists(local_file_path):
151
+ logger.error(f"本地文件不存在: {local_file_path}")
152
+ return False
153
+
154
+ # 检查OSS文件是否存在
155
+ if not replace and self.bucket.object_exists(oss_file_path):
156
+ logger.warning(f"OSS文件已存在: {oss_file_path}")
157
+ return False
158
+
159
+ # 上传文件
160
+ self.bucket.put_object_from_file(oss_file_path, local_file_path)
161
+
162
+ logger.info(f"上传成功: {local_file_path} -> {oss_file_path}")
163
+ return True
164
+
165
+ except Exception as e:
166
+ logger.error(f"上传文件失败: {local_file_path} -> {oss_file_path}, 错误: {e}")
167
+ return False
168
+
169
+ def file_exists(self, oss_file_path: str) -> bool:
170
+ """
171
+ 检查OSS文件是否存在
172
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
173
  Args:
174
+ oss_file_path: OSS文件路径
175
+
176
+ Returns:
177
+ 文件是否存在
 
 
 
 
 
 
178
  """
179
+ try:
180
+ return self.bucket.object_exists(oss_file_path)
181
+ except Exception as e:
182
+ logger.error(f"检查文件存在性失败: {oss_file_path}, 错误: {e}")
183
+ return False
184
+
185
+ def get_file_info(self, oss_file_path: str) -> Optional[Dict]:
186
+ """
187
+ 获取OSS文件信息
188
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
  Args:
190
+ oss_file_path: OSS文件路径
191
+
192
+ Returns:
193
+ 文件信息字典
194
  """
195
+ try:
196
+ obj = self.bucket.get_object_meta(oss_file_path)
197
+
198
+ return {
199
+ 'key': oss_file_path,
200
+ 'name': os.path.basename(oss_file_path),
201
+ 'size': obj.content_length,
202
+ 'last_modified': obj.last_modified,
203
+ 'etag': obj.etag,
204
+ 'content_type': obj.content_type
205
+ }
206
+
207
+ except oss2.exceptions.NoSuchKey:
208
+ logger.warning(f"文件不存在: {oss_file_path}")
209
+ return None
210
+ except Exception as e:
211
+ logger.error(f"获取文件信息失败: {oss_file_path}, 错误: {e}")
212
+ return None
213
+
214
+ def delete_file(self, oss_file_path: str) -> bool:
215
+ """
216
+ 删除OSS文件
217
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  Args:
219
+ oss_file_path: OSS文件路径
220
+
221
+ Returns:
222
+ 删除是否成功
 
 
223
  """
224
+ try:
225
+ self.bucket.delete_object(oss_file_path)
226
+ logger.info(f"删除成功: {oss_file_path}")
227
+ return True
228
+
229
+ except Exception as e:
230
+ logger.error(f"删除文件失败: {oss_file_path}, 错误: {e}")
231
+ return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
 
 
 
 
 
 
233
 
234
+ # 兼容性别名 - 保持与原始���码的兼容性
235
+ class SimpleOSSManager(OSSFileManager):
236
+ """兼容性别名"""
237
+ pass
238
 
 
 
 
 
 
 
239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  if __name__ == "__main__":
241
+ # 测试代码
242
+ try:
243
+ manager = OSSFileManager()
244
+ print("✅ OSS文件管理器初始化成功")
245
+
246
+ # 测试列出文件
247
+ files = manager.list_files("atlas_eval/submissions/", file_extension=".json")
248
+ print(f"📁 找到 {len(files)} 个提交文件")
249
+
250
+ for file_info in files[:3]: # 只显示前3个
251
+ print(f" - {file_info['name']} ({file_info['size']} bytes)")
252
+
253
+ except Exception as e:
254
+ print(f"❌ 测试失败: {e}")
src/oss/oss_file_manager_old.py ADDED
@@ -0,0 +1,517 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # %%
2
+ import datetime as dt
3
+ import os
4
+ import re
5
+ from logging import Logger
6
+ from multiprocessing import Pool
7
+ from multiprocessing.pool import ThreadPool
8
+ from pathlib import Path
9
+ from typing import List, Union
10
+
11
+ import oss2
12
+ from loguru import logger
13
+ from oss2.credentials import EnvironmentVariableCredentialsProvider
14
+ from tqdm import tqdm
15
+
16
+ from compassflow.constants import DATADIR
17
+ from compassflow.oss.oss import OssBucket
18
+ from compassflow.utils import starstarmap
19
+
20
+
21
+ class OSSFileManager:
22
+ def __init__(
23
+ self,
24
+ oss_access_key_id: str = None,
25
+ oss_access_key_secret: str = None,
26
+ region: str = "http://oss-cn-shanghai.aliyuncs.com",
27
+ bucket_name: str = "opencompass",
28
+ oss_block_name: str = None,
29
+ logger: Logger = logger,
30
+ ) -> None:
31
+ """OSS File Manager
32
+
33
+ Args:
34
+ oss_access_key_id (str, optional): _description_. Defaults to None.
35
+ oss_access_key_secret (str, optional): _description_. Defaults to None.
36
+ region (_type_, optional): _description_. Defaults to 'http://oss-cn-shanghai.aliyuncs.com'.
37
+ bucket_name (str, optional): _description_. Defaults to 'opencompass'.
38
+ oss_block_name: oss_block_name which is defined in the prefect
39
+ logger (Logger, optional): _description_. Defaults to logger.
40
+ """
41
+ self.logger = logger
42
+ if oss_block_name is not None:
43
+ oss_bucket = OssBucket.load(oss_block_name)
44
+ self._bucket = oss_bucket._get_bucket()
45
+ return
46
+
47
+ # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录RAM控制台创建RAM账号。
48
+ if oss_access_key_id is not None and oss_access_key_secret is not None:
49
+ os.environ["OSS_ACCESS_KEY_ID"] = oss_access_key_id
50
+ os.environ["OSS_ACCESS_KEY_SECRET"] = oss_access_key_secret
51
+
52
+ if (
53
+ os.getenv("OSS_ACCESS_KEY_ID") is None
54
+ or os.getenv("OSS_ACCESS_KEY_SECRET") is None
55
+ ):
56
+ raise ValueError("Access Key ID and Access Key Secret cannot be empty.")
57
+
58
+ auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
59
+
60
+ # Endpoint以杭州为例,其它Region请按实际情况填写。
61
+ # 填写Bucket名称,例如examplebucket。
62
+ bucket = oss2.Bucket(
63
+ auth=auth,
64
+ endpoint=region,
65
+ bucket_name=bucket_name,
66
+ )
67
+
68
+ self._bucket = bucket
69
+
70
+ def list_latest_files_by_date(
71
+ self,
72
+ object_dir: str = "",
73
+ delimiter: str = "/",
74
+ start_date: Union[str, dt.date, dt.datetime] = None,
75
+ end_date: Union[str, dt.date, dt.datetime] = None,
76
+ max_num_files: int = 10,
77
+ date_pattern: str = r"([0-9]{4}-[0-9]{2}-[0-9]{2})",
78
+ file_date_format: str = "%Y-%m-%d",
79
+ suffix: str = "",
80
+ ) -> List[Union[str, Path]]:
81
+ """List the latest files by date in an OSS bucket directory
82
+
83
+ Args:
84
+ object_dir (str, optional): _description_. Defaults to ''.
85
+ delimiter (str, optional): _description_. Defaults to '/'.
86
+ start_date (Union[str, dt.date], optional): _description_. Defaults to None.
87
+ end_date (Union[str, dt.date], optional): _description_. Defaults to None.
88
+ max_num_files (int, optional): _description_. Defaults to 10.
89
+ date_pattern (str, optional): _description_. Defaults to r'^([0-9]{4}-[0-9]{2}-[0-9]{2})'.
90
+ suffix (str, optional): _description_. Defaults to ''.
91
+
92
+ Returns:
93
+ List[Union[str, Path]]: _description_
94
+ """
95
+ if start_date is not None:
96
+ if isinstance(start_date, str):
97
+ start_date = dt.datetime.strptime(start_date.replace("-", ""), "%Y%m%d")
98
+
99
+ if isinstance(start_date, dt.date):
100
+ start_date = dt.datetime(
101
+ start_date.year, start_date.month, start_date.day, 0, 0, 0
102
+ )
103
+
104
+ if end_date is not None:
105
+ if isinstance(end_date, str):
106
+ end_date = dt.datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
107
+
108
+ if isinstance(end_date, dt.date):
109
+ end_date = dt.datetime(
110
+ end_date.year, end_date.month, end_date.day, 0, 0, 0
111
+ )
112
+
113
+ object_iter = oss2.ObjectIterator(
114
+ bucket=self._bucket, prefix=object_dir, delimiter=delimiter
115
+ )
116
+
117
+ root_dir = Path(object_dir)
118
+ filenames = []
119
+ for filename in object_iter:
120
+ filename = filename.key.replace(object_dir, "")
121
+ # print(filename)
122
+
123
+ if filename.endswith(suffix):
124
+ # Match date pattern in filename
125
+ date_search = re.search(date_pattern, filename)
126
+ if date_search:
127
+ file_date = dt.datetime.strptime(
128
+ date_search.group(1), file_date_format
129
+ )
130
+ else:
131
+ self.logger.warning(
132
+ f"date pattern doesn't match, skipping file {filename}"
133
+ )
134
+ continue
135
+
136
+ # Check if file date within specified range
137
+ if start_date is not None:
138
+ if start_date > file_date:
139
+ continue
140
+
141
+ if end_date is not None:
142
+ if end_date < file_date:
143
+ continue
144
+
145
+ filepath = root_dir / filename
146
+ # name_tstamp_tuple = (filepath, os.path.getmtime(filepath))
147
+ # filenames.append(name_tstamp_tuple)
148
+ filenames.append(str(filepath))
149
+ # sort by tstamp
150
+ # filenames = sorted(filenames, key=lambda x: x[1])
151
+ filenames = sorted(filenames)
152
+ # filenames = [x[0] for x in filenames]
153
+
154
+ max_num_files = max_num_files or len(filenames)
155
+ filenames = filenames[-max_num_files:]
156
+
157
+ self.logger.info(f"{filenames=}")
158
+
159
+ return filenames
160
+
161
+ def download_object_to_file(
162
+ self,
163
+ oss_file_path: str | Path,
164
+ local_file_path: str | Path,
165
+ replace: bool = False,
166
+ make_dir: bool = False,
167
+ print_logs: bool = True,
168
+ ):
169
+ """Download a single OSS object to local file.
170
+
171
+ Args:
172
+ oss_file_path (str): _description_
173
+ local_file_path (str): _description_
174
+ replace (bool, optional): _description_. Defaults to False.
175
+ make_dir (bool, optional): Whether to create intermediate dirs if they don't exist. Defaults to False.
176
+ print_logs: bool, optional): Whether to print logs. Defaults to True.
177
+ """
178
+ if isinstance(local_file_path, str):
179
+ local_file_path = Path(local_file_path)
180
+
181
+ if not replace:
182
+ if local_file_path.exists():
183
+ if print_logs:
184
+ err_msg = f"{local_file_path} already exists, skipping file..."
185
+ self.logger.info(err_msg)
186
+
187
+ return
188
+
189
+ if print_logs:
190
+ if local_file_path.exists():
191
+ err_msg = f"{local_file_path} already exists, replacing file..."
192
+ self.logger.info(err_msg)
193
+
194
+ if make_dir:
195
+ os.makedirs(local_file_path.parent, exist_ok=True)
196
+
197
+ self._bucket.get_object_to_file(
198
+ key=str(oss_file_path),
199
+ filename=local_file_path,
200
+ )
201
+
202
+ def download_objects_to_files(
203
+ self,
204
+ file_download_mapping: list[tuple[str | Path, str | Path]],
205
+ oss_base_dir: str = None,
206
+ local_base_dir: str = None,
207
+ replace: bool = True,
208
+ num_threads: int = 1,
209
+ **kwargs,
210
+ ) -> None:
211
+ """Download objects from OSS to local storage.
212
+
213
+ Args:
214
+ file_download_mapping (list[tuple[str | Path, str | Path]]): A list of file path pairs that maps the OSS file path (to download) to the local file path (download location).
215
+ oss_base_dir (str): OSS directory path to be prepended to all OSS file paths.
216
+ local_base_dir (str, optional): Base directory path to be prepended to all local file paths.
217
+ replace (bool, optional): _description_. Defaults to True.
218
+ num_threads (int, optional): _description_. Defaults to 1.
219
+ **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
220
+ """
221
+ if isinstance(oss_base_dir, str):
222
+ oss_base_dir = Path(oss_base_dir)
223
+
224
+ if isinstance(local_base_dir, str):
225
+ local_base_dir = Path(local_base_dir)
226
+
227
+ if not isinstance(file_download_mapping, list):
228
+ raise TypeError("file_download_mapping must be a list of 2 value tuples.")
229
+
230
+ for item in file_download_mapping:
231
+ if not isinstance(item, tuple):
232
+ raise TypeError(
233
+ "Each item in the file_download_mapping list must be a 2 value tuple."
234
+ )
235
+
236
+ if len(item) != 2:
237
+ raise ValueError(
238
+ "Each tuple in the file_download_mapping list must be length 2."
239
+ )
240
+
241
+ if num_threads == 1:
242
+ for local_file_path, oss_file_path in file_download_mapping:
243
+ if local_base_dir is not None:
244
+ local_file_path = local_base_dir / local_file_path
245
+
246
+ if oss_base_dir is not None:
247
+ oss_file_path = oss_base_dir / oss_file_path
248
+
249
+ self.download_object_to_file(
250
+ oss_file_path=oss_file_path,
251
+ local_file_path=local_file_path,
252
+ replace=replace,
253
+ )
254
+
255
+ return
256
+
257
+ # Start multithreaded process if num_threads > 1
258
+ with ThreadPool(num_threads) as p:
259
+ pool_args_list = []
260
+ for local_file_path, oss_file_path in file_download_mapping:
261
+ if local_base_dir is not None:
262
+ local_file_path = local_base_dir / local_file_path
263
+
264
+ if oss_base_dir is not None:
265
+ oss_file_path = oss_base_dir / oss_file_path
266
+
267
+ args_dict = dict(
268
+ oss_file_path=oss_file_path,
269
+ local_file_path=local_file_path,
270
+ replace=replace,
271
+ print_logs=True,
272
+ )
273
+ pool_args_list.append(args_dict)
274
+
275
+ ret_all = list(
276
+ tqdm(
277
+ starstarmap(
278
+ pool=p,
279
+ fn=self.download_object_to_file,
280
+ kwargs_iter=pool_args_list,
281
+ ),
282
+ total=len(file_download_mapping),
283
+ )
284
+ )
285
+
286
+ def download_latest_objects_to_dir(
287
+ self,
288
+ oss_object_dir: str,
289
+ local_dir: str,
290
+ start_date: Union[str, dt.date, dt.datetime] = None,
291
+ end_date: Union[str, dt.date, dt.datetime] = None,
292
+ date_pattern: str = r"([0-9]{4}-[0-9]{2}-[0-9]{2})",
293
+ file_date_format: str = "%Y-%m-%d",
294
+ max_num_files: int = 5,
295
+ replace: bool = True,
296
+ make_dir: bool = True,
297
+ delimiter: str = "/",
298
+ num_threads: int = 1,
299
+ suffix: str = "",
300
+ **kwargs,
301
+ ) -> None:
302
+ """Download the latest objects from oss to local directory
303
+
304
+ Args:
305
+ oss_object_dir (str): _description_
306
+ local_dir (str): _description_
307
+ start_date (Union[str, dt.date, dt.datetime], optional): _description_. Defaults to None.
308
+ end_date (Union[str, dt.date, dt.datetime], optional): _description_. Defaults to None.
309
+ max_num_files (int, optional): _description_. Defaults to 5.
310
+ replace (bool, optional): _description_. Defaults to True.
311
+ delimiter (str, optional): _description_. Defaults to '/'.
312
+ num_threads (int, optional): _description_. Defaults to 1.
313
+ suffix (str, optional): _description_. Defaults to ''.
314
+ **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
315
+ """
316
+ if isinstance(local_dir, str):
317
+ local_dir = Path(local_dir)
318
+
319
+ oss_file_list = self.list_latest_files_by_date(
320
+ object_dir=oss_object_dir,
321
+ delimiter=delimiter,
322
+ start_date=start_date,
323
+ end_date=end_date,
324
+ date_pattern=date_pattern,
325
+ file_date_format=file_date_format,
326
+ max_num_files=max_num_files,
327
+ suffix=suffix,
328
+ **kwargs,
329
+ )
330
+
331
+ if num_threads == 1:
332
+ for oss_file_path in oss_file_list:
333
+ file_name = Path(oss_file_path).name
334
+ local_file_path = local_dir / file_name
335
+
336
+ self.download_object_to_file(
337
+ oss_file_path=oss_file_path,
338
+ local_file_path=local_file_path,
339
+ replace=replace,
340
+ make_dir=make_dir,
341
+ )
342
+
343
+ return
344
+
345
+ # Start multithreaded process if num_threads > 1
346
+ with ThreadPool(num_threads) as p:
347
+ pool_args_list = []
348
+ for oss_file_path in oss_file_list:
349
+ file_name = Path(oss_file_path).name
350
+ local_file_path = local_dir / file_name
351
+
352
+ args_dict = dict(
353
+ oss_file_path=str(oss_file_path),
354
+ local_file_path=str(local_file_path),
355
+ replace=replace,
356
+ make_dir=make_dir,
357
+ print_logs=True,
358
+ )
359
+ pool_args_list.append(args_dict)
360
+
361
+ ret_all = list(
362
+ tqdm(
363
+ starstarmap(
364
+ pool=p,
365
+ fn=self.download_object_to_file,
366
+ kwargs_iter=pool_args_list,
367
+ ),
368
+ total=len(oss_file_list),
369
+ )
370
+ )
371
+
372
+ def upload_file_to_object(
373
+ self,
374
+ local_file_path: str,
375
+ oss_file_path: str | Path,
376
+ replace: bool = False,
377
+ print_logs: bool = True,
378
+ ):
379
+ """Upload a single local file to OSS
380
+
381
+ Args:
382
+ oss_file_path (str): _description_
383
+ local_file_path (str): _description_
384
+ replace (bool, optional): _description_. Defaults to False.
385
+ """
386
+ if isinstance(local_file_path, Path):
387
+ local_file_path = str(local_file_path)
388
+
389
+ if isinstance(oss_file_path, Path):
390
+ oss_file_path = str(oss_file_path)
391
+
392
+ # Check if file already exists
393
+ is_file_exists = self._bucket.object_exists(
394
+ key=oss_file_path,
395
+ )
396
+
397
+ if is_file_exists:
398
+ if replace:
399
+ if print_logs:
400
+ err_msg = f"{oss_file_path} already exists, replacing file..."
401
+ self.logger.info(err_msg)
402
+
403
+ self._bucket.put_object_from_file(
404
+ key=str(oss_file_path),
405
+ filename=local_file_path,
406
+ )
407
+
408
+ else:
409
+ if print_logs:
410
+ err_msg = f"{oss_file_path} already exists, skipping file..."
411
+ self.logger.info(err_msg)
412
+
413
+ return
414
+
415
+ self._bucket.put_object_from_file(
416
+ key=oss_file_path,
417
+ filename=local_file_path,
418
+ )
419
+
420
+ def upload_files_to_objects(
421
+ self,
422
+ file_upload_mapping: list[tuple[str | Path, str | Path]],
423
+ local_base_dir: str = None,
424
+ oss_base_dir: str = None,
425
+ replace: bool = True,
426
+ num_threads: int = 1,
427
+ **kwargs,
428
+ ) -> None:
429
+ """Upload files from local storage to OSS.
430
+
431
+ Args:
432
+ file_upload_mapping (list[tuple[str | Path, str | Path]]): A list of file path pairs that maps the local file path (to upload) to the OSS file path (upload location).
433
+ oss_base_dir (str): OSS directory path to be prepended to all OSS file paths.
434
+ local_base_dir (str, optional): Base directory path to be prepended to all local file paths.
435
+ replace (bool, optional): _description_. Defaults to True.
436
+ num_threads (int, optional): _description_. Defaults to 1.
437
+ **kwargs: Additional keyword arguments passed to `list_latest_files_by_date`
438
+ """
439
+ if isinstance(oss_base_dir, str):
440
+ oss_base_dir = Path(oss_base_dir)
441
+
442
+ if isinstance(local_base_dir, str):
443
+ local_base_dir = Path(local_base_dir)
444
+
445
+ if not isinstance(file_upload_mapping, list):
446
+ raise TypeError("file_upload_mapping must be a list of 2 value tuples.")
447
+
448
+ for item in file_upload_mapping:
449
+ if not isinstance(item, tuple):
450
+ raise TypeError(
451
+ "Each item in the file_upload_mapping list must be a 2 value tuple."
452
+ )
453
+
454
+ if len(item) != 2:
455
+ raise ValueError(
456
+ "Each tuple in the file_upload_mapping list must be length 2."
457
+ )
458
+
459
+ if num_threads == 1:
460
+ for local_file_path, oss_file_path in file_upload_mapping:
461
+ if local_base_dir is not None:
462
+ local_file_path = local_base_dir / local_file_path
463
+
464
+ if oss_base_dir is not None:
465
+ oss_file_path = oss_base_dir / oss_file_path
466
+
467
+ self.upload_file_to_object(
468
+ oss_file_path=oss_file_path,
469
+ local_file_path=local_file_path,
470
+ replace=replace,
471
+ )
472
+
473
+ return
474
+
475
+ # Start multithreaded process if num_threads > 1
476
+ with ThreadPool(num_threads) as p:
477
+ pool_args_list = []
478
+ for local_file_path, oss_file_path in file_upload_mapping:
479
+ if local_base_dir is not None:
480
+ local_file_path = local_base_dir / local_file_path
481
+
482
+ if oss_base_dir is not None:
483
+ oss_file_path = oss_base_dir / oss_file_path
484
+
485
+ args_dict = dict(
486
+ oss_file_path=oss_file_path,
487
+ local_file_path=local_file_path,
488
+ replace=replace,
489
+ print_logs=True,
490
+ )
491
+ pool_args_list.append(args_dict)
492
+
493
+ ret_all = list(
494
+ tqdm(
495
+ starstarmap(
496
+ pool=p,
497
+ fn=self.upload_file_to_object,
498
+ kwargs_iter=pool_args_list,
499
+ ),
500
+ total=len(file_upload_mapping),
501
+ )
502
+ )
503
+
504
+ @property
505
+ def bucket(self) -> oss2.Bucket:
506
+ return self._bucket
507
+
508
+
509
+ # %%
510
+ if __name__ == "__main__":
511
+ # %% Initialize
512
+ oss_file_manager = OSSFileManager(logger=logger)
513
+
514
+ # %% List the latest files by date (based on file suffix) in an OSS directory
515
+ oss_file_manager.list_latest_files_by_date(
516
+ "compass-arena/dev/data/conversations/", max_num_files=5
517
+ )
src/oss/oss_file_manager_simple.py ADDED
@@ -0,0 +1,254 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ 简化的OSS文件管理器 - 专为SAGE-Bench HuggingFace Space设计
4
+ 移除了对 compassflow 的依赖,只保留必需的OSS功能
5
+ """
6
+
7
+ import os
8
+ import oss2
9
+ import json
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ from typing import List, Dict, Optional
13
+ from loguru import logger
14
+
15
+
16
+ class OSSFileManager:
17
+ """简化的OSS文件管理器"""
18
+
19
+ def __init__(
20
+ self,
21
+ oss_access_key_id: str = None,
22
+ oss_access_key_secret: str = None,
23
+ oss_region: str = None,
24
+ oss_bucket_name: str = None
25
+ ):
26
+ """
27
+ 初始化OSS文件管理器
28
+
29
+ Args:
30
+ oss_access_key_id: OSS访问密钥ID
31
+ oss_access_key_secret: OSS访问密钥Secret
32
+ oss_region: OSS区域端点
33
+ oss_bucket_name: OSS存储桶名称
34
+ """
35
+ # 从环境变量获取配置
36
+ self.access_key_id = oss_access_key_id or os.getenv('OSS_ACCESS_KEY_ID')
37
+ self.access_key_secret = oss_access_key_secret or os.getenv('OSS_ACCESS_KEY_SECRET')
38
+ self.region = oss_region or os.getenv('OSS_REGION', 'http://oss-cn-shanghai.aliyuncs.com')
39
+ self.bucket_name = oss_bucket_name or os.getenv('OSS_BUCKET_NAME', 'opencompass')
40
+
41
+ if not self.access_key_id or not self.access_key_secret:
42
+ raise ValueError("OSS访问密钥未设置。请设置 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET 环境变量。")
43
+
44
+ # 初始化OSS客户端
45
+ auth = oss2.Auth(self.access_key_id, self.access_key_secret)
46
+ self.bucket = oss2.Bucket(auth, self.region, self.bucket_name)
47
+
48
+ logger.info(f"OSS初始化成功: {self.bucket_name} @ {self.region}")
49
+
50
+ def list_files(
51
+ self,
52
+ oss_dir: str = "",
53
+ after_date: datetime = None,
54
+ file_extension: str = None
55
+ ) -> List[Dict]:
56
+ """
57
+ 列出OSS目录中的文件
58
+
59
+ Args:
60
+ oss_dir: OSS目录路径
61
+ after_date: 只返回此日期之后的文件
62
+ file_extension: 文件扩展名过滤 (如 ".json")
63
+
64
+ Returns:
65
+ 文件信息列表
66
+ """
67
+ try:
68
+ files = []
69
+
70
+ # 确保目录路径以 / 结尾
71
+ if oss_dir and not oss_dir.endswith('/'):
72
+ oss_dir += '/'
73
+
74
+ # 列出对象
75
+ for obj in oss2.ObjectIterator(self.bucket, prefix=oss_dir):
76
+ # 跳过目录本身
77
+ if obj.key.endswith('/'):
78
+ continue
79
+
80
+ # 文件扩展名过滤
81
+ if file_extension and not obj.key.endswith(file_extension):
82
+ continue
83
+
84
+ # 日期过滤
85
+ if after_date and obj.last_modified < after_date:
86
+ continue
87
+
88
+ file_info = {
89
+ 'key': obj.key,
90
+ 'name': os.path.basename(obj.key),
91
+ 'size': obj.size,
92
+ 'last_modified': obj.last_modified,
93
+ 'etag': obj.etag
94
+ }
95
+ files.append(file_info)
96
+
97
+ logger.info(f"找到 {len(files)} 个文件在 {oss_dir}")
98
+ return files
99
+
100
+ except Exception as e:
101
+ logger.error(f"列出文件失败: {e}")
102
+ raise
103
+
104
+ def download_file(self, oss_file_path: str, local_file_path: str) -> bool:
105
+ """
106
+ 从OSS下载文件到本地
107
+
108
+ Args:
109
+ oss_file_path: OSS文件路径
110
+ local_file_path: 本地文件路径
111
+
112
+ Returns:
113
+ 下载是否成功
114
+ """
115
+ try:
116
+ # 确保本地目录存在
117
+ local_dir = os.path.dirname(local_file_path)
118
+ if local_dir:
119
+ os.makedirs(local_dir, exist_ok=True)
120
+
121
+ # 下载文件
122
+ self.bucket.get_object_to_file(oss_file_path, local_file_path)
123
+
124
+ logger.info(f"下载成功: {oss_file_path} -> {local_file_path}")
125
+ return True
126
+
127
+ except Exception as e:
128
+ logger.error(f"下载文件失败: {oss_file_path} -> {local_file_path}, 错误: {e}")
129
+ return False
130
+
131
+ def upload_file_to_object(
132
+ self,
133
+ local_file_path: str,
134
+ oss_file_path: str,
135
+ replace: bool = False
136
+ ) -> bool:
137
+ """
138
+ 上传本地文件到OSS
139
+
140
+ Args:
141
+ local_file_path: 本地文件路径
142
+ oss_file_path: OSS文件路径
143
+ replace: 是否替换已存在的文件
144
+
145
+ Returns:
146
+ 上传是否成功
147
+ """
148
+ try:
149
+ # 检查本地文件是否存在
150
+ if not os.path.exists(local_file_path):
151
+ logger.error(f"本地文件不存在: {local_file_path}")
152
+ return False
153
+
154
+ # 检查OSS文件是否存在
155
+ if not replace and self.bucket.object_exists(oss_file_path):
156
+ logger.warning(f"OSS文件已存在: {oss_file_path}")
157
+ return False
158
+
159
+ # 上传文件
160
+ self.bucket.put_object_from_file(oss_file_path, local_file_path)
161
+
162
+ logger.info(f"上传成功: {local_file_path} -> {oss_file_path}")
163
+ return True
164
+
165
+ except Exception as e:
166
+ logger.error(f"上传文件失败: {local_file_path} -> {oss_file_path}, 错误: {e}")
167
+ return False
168
+
169
+ def file_exists(self, oss_file_path: str) -> bool:
170
+ """
171
+ 检查OSS文件是否存在
172
+
173
+ Args:
174
+ oss_file_path: OSS文件路径
175
+
176
+ Returns:
177
+ 文件是否存在
178
+ """
179
+ try:
180
+ return self.bucket.object_exists(oss_file_path)
181
+ except Exception as e:
182
+ logger.error(f"检查文件存在性失败: {oss_file_path}, 错误: {e}")
183
+ return False
184
+
185
+ def get_file_info(self, oss_file_path: str) -> Optional[Dict]:
186
+ """
187
+ 获取OSS文件信息
188
+
189
+ Args:
190
+ oss_file_path: OSS文件路径
191
+
192
+ Returns:
193
+ 文件信息字典
194
+ """
195
+ try:
196
+ obj = self.bucket.get_object_meta(oss_file_path)
197
+
198
+ return {
199
+ 'key': oss_file_path,
200
+ 'name': os.path.basename(oss_file_path),
201
+ 'size': obj.content_length,
202
+ 'last_modified': obj.last_modified,
203
+ 'etag': obj.etag,
204
+ 'content_type': obj.content_type
205
+ }
206
+
207
+ except oss2.exceptions.NoSuchKey:
208
+ logger.warning(f"文件不存在: {oss_file_path}")
209
+ return None
210
+ except Exception as e:
211
+ logger.error(f"获取文件信息失败: {oss_file_path}, 错误: {e}")
212
+ return None
213
+
214
+ def delete_file(self, oss_file_path: str) -> bool:
215
+ """
216
+ 删除OSS文件
217
+
218
+ Args:
219
+ oss_file_path: OSS文件路径
220
+
221
+ Returns:
222
+ 删除是否成功
223
+ """
224
+ try:
225
+ self.bucket.delete_object(oss_file_path)
226
+ logger.info(f"删除成功: {oss_file_path}")
227
+ return True
228
+
229
+ except Exception as e:
230
+ logger.error(f"删除文件失败: {oss_file_path}, 错误: {e}")
231
+ return False
232
+
233
+
234
+ # 兼容性别名 - 保持与原始代码的兼容性
235
+ class SimpleOSSManager(OSSFileManager):
236
+ """兼容性别名"""
237
+ pass
238
+
239
+
240
+ if __name__ == "__main__":
241
+ # 测试代码
242
+ try:
243
+ manager = OSSFileManager()
244
+ print("✅ OSS文件管理器初始化成功")
245
+
246
+ # 测试列出文件
247
+ files = manager.list_files("atlas_eval/submissions/", file_extension=".json")
248
+ print(f"📁 找到 {len(files)} 个提交文件")
249
+
250
+ for file_info in files[:3]: # 只显示前3个
251
+ print(f" - {file_info['name']} ({file_info['size']} bytes)")
252
+
253
+ except Exception as e:
254
+ print(f"❌ 测试失败: {e}")