Spaces:
Sleeping
Sleeping
| # help me write a python script to download a file from remote server and storage in /tmp | |
| # if the file has already exist in /tmp, just skip download step | |
| # the next step, create the volume, and extract the downloaded data into that volume | |
| # the final step, cleanup /tmp | |
| # http://172.16.15.118:9557/repository/atalink-hf-models/backups/data_cache_backup.tar.gz | |
| # http://172.16.15.118:9557/repository/atalink-hf-models/backups/data_gfpgan_backup.tar.gz | |
| import os | |
| import requests | |
| import subprocess | |
| from tqdm import tqdm | |
| import tempfile | |
| DOWNLOADS = [ | |
| ( | |
| "http://172.16.15.118:9557/repository/atalink-hf-models/backups/data_cache_backup.tar.gz", | |
| 3, | |
| ), | |
| ( | |
| "http://172.16.15.118:9557/repository/atalink-hf-models/backups/data_gfpgan_backup.tar.gz", | |
| 4, | |
| ), | |
| # "https://huggingface.co/hynt/F5-TTS-Vietnamese-ViVoice/resolve/main/config.json" | |
| ] | |
| # Sử dụng tmp hệ thống (Linux: /tmp, Windows: %TEMP%) | |
| TMP_DIR = os.path.join(tempfile.gettempdir(), "atalink_tmp") | |
| os.makedirs(TMP_DIR, exist_ok=True) | |
| VOLUME_PREFIX = "atalink_" | |
| def download_file(url, dest_folder): | |
| filename = os.path.basename(url) | |
| dest_path = os.path.join(dest_folder, filename) | |
| if os.path.exists(dest_path): | |
| print( | |
| f"✅ [SKIP] {filename} already exists at \033[96m{dest_path}\033[0m, skipping download." | |
| ) | |
| return dest_path, False # not downloaded this run | |
| print( | |
| f"\n📥 [START] Downloading: \033[1;36m{filename}\033[0m → \033[96m{dest_path}\033[0m" | |
| ) | |
| with requests.get(url, stream=True) as r: | |
| r.raise_for_status() | |
| total = int(r.headers.get("content-length", 0)) | |
| chunk_size = 1024 * 1024 # 1MB | |
| with open(dest_path, "wb") as f, tqdm( | |
| total=total, unit="B", unit_scale=True, desc=filename | |
| ) as pbar: | |
| for chunk in r.iter_content(chunk_size=chunk_size): | |
| if chunk: | |
| f.write(chunk) | |
| pbar.update(len(chunk)) | |
| print( | |
| f"✅ [DONE] Downloaded: \033[1;32m{filename}\033[0m → \033[96m{dest_path}\033[0m\n" | |
| ) | |
| return dest_path, True # downloaded in this run | |
| def volume_exists(volume_name: str) -> bool: | |
| """Return True if a Docker volume exists.""" | |
| try: | |
| subprocess.run( | |
| [ | |
| "docker", | |
| "volume", | |
| "inspect", | |
| volume_name, | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| return True | |
| except subprocess.CalledProcessError: | |
| return False | |
| def create_volume_and_extract(tar_path, volume_name, extract_path=1): | |
| # Create Docker volume only if it doesn't exist | |
| if volume_exists(volume_name): | |
| print(f"✅ [VOLUME] Exists: \033[1;33m{volume_name}\033[0m — skipping create") | |
| else: | |
| subprocess.run(["docker", "volume", "create", volume_name], check=True) | |
| print(f"🚀 [VOLUME] Created: \033[1;33m{volume_name}\033[0m") | |
| # Convert host path để Docker hiểu cả Windows & Linux | |
| host_path = os.path.abspath(os.path.dirname(tar_path)) | |
| host_path = host_path.replace("\\", "/") | |
| # Extract tar.gz into the volume using a temporary container | |
| print( | |
| f"\n📦 [EXTRACT] Extracting \033[1;36m{os.path.basename(tar_path)}\033[0m into Docker volume \033[1;33m{volume_name}\033[0m ..." | |
| ) | |
| subprocess.run( | |
| [ | |
| "docker", | |
| "run", | |
| "--rm", # Chạy container tạm thời, tự xóa sau khi xong | |
| "-v", | |
| f"{volume_name}:/data", # Mount Docker volume (volume_name) vào thư mục /data trong container | |
| "-v", | |
| f"{host_path}:/tmpdata", # Mount thư mục chứa file tar.gz trên host vào /tmpdata trong container | |
| "busybox", # Sử dụng busybox, tar hỗ trợ --strip-components | |
| "sh", | |
| "-c", # Chạy lệnh shell trong container | |
| f"tar -xzvf /tmpdata/{os.path.basename(tar_path)} --strip {extract_path} -C /data", # Lệnh giải nén file tar.gz từ /tmpdata vào /data | |
| ], | |
| check=True, | |
| ) | |
| print( | |
| f"✅ [DONE] Extracted \033[1;36m{os.path.basename(tar_path)}\033[0m into volume \033[1;33m{volume_name}\033[0m\n" | |
| ) | |
| def cleanup_tmp(files): | |
| for f in files: | |
| try: | |
| os.remove(f) | |
| print(f"🧹 [CLEANUP] Removed \033[96m{f}\033[0m") | |
| except Exception as e: | |
| print(f"⚠️ [CLEANUP] Could not remove \033[96m{f}\033[0m: {e}") | |
| def main(): | |
| downloaded_files = [] | |
| for url, extract_path in DOWNLOADS: | |
| tar_path, downloaded = download_file(url, TMP_DIR) | |
| downloaded_files.append(tar_path) | |
| volume_name = ( | |
| VOLUME_PREFIX | |
| + os.path.splitext(os.path.splitext(os.path.basename(tar_path))[0])[0] | |
| ) | |
| print(f"🚀 [VOLUME] Name: \033[1;33m{volume_name}\033[0m") | |
| create_volume_and_extract(tar_path, volume_name, extract_path) | |
| if downloaded_files: | |
| cleanup_tmp(downloaded_files) | |
| else: | |
| print("🧹 [CLEANUP] No new files downloaded — nothing to remove.") | |
| if __name__ == "__main__": | |
| main() | |
| # if __name__ == "__main__": | |
| # Test create_volume_and_extract với file test_data_backup.tar.gz | |
| # test_tar = os.path.join(TMP_DIR, "data_backup.tar.gz") | |
| # if os.path.exists(test_tar): | |
| # create_volume_and_extract(test_tar, "atalink_data_backup") | |
| # else: | |
| # print("File tmp/test_data_backup.tar.gz không tồn tại để test.") | |