WMB2Backened / api /update_static.py
42Cummer's picture
Upload update_static.py
94fe315 verified
import requests # type: ignore
import zipfile
import io
import os
import shutil
from pathlib import Path
class GTFSSyncManager:
def __init__(self):
self.CKAN_BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca/api/3/action/package_show"
self.PACKAGE_ID = "merged-gtfs-ttc-routes-and-schedules"
self.STATIC_DIR = Path(__file__).parent.parent / "static"
self.DB_PATH = Path(__file__).parent.parent / "src" / "ttc_gtfs.duckdb"
def get_remote_metadata(self):
"""Queries CKAN API for the latest ZIP URL and its modification date."""
try:
params = {"id": self.PACKAGE_ID}
response = requests.get(self.CKAN_BASE_URL, params=params, timeout=10)
data = response.json()
# Extract the metadata_modified date and the resource URL
last_modified = data["result"]["metadata_modified"]
resources = data["result"]["resources"]
download_url = next((r["url"] for r in resources if r["format"].lower() == "zip"), None)
return {"url": download_url, "updated_at": last_modified}
except Exception as e:
print(f"Metadata fetch failed: {e}")
return None
def perform_full_sync(self, download_url):
"""Downloads, extracts, and clears the old DB to force a rebuild."""
print(f"--- Downloading New GTFS Bundle from {download_url} ---")
if self.STATIC_DIR.exists():
shutil.rmtree(self.STATIC_DIR)
self.STATIC_DIR.mkdir(parents=True, exist_ok=True)
r = requests.get(download_url)
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
z.extractall(self.STATIC_DIR)
print(f"✓ Extracted {len(z.namelist())} files to {self.STATIC_DIR}.")
# Nuke the old DB so init_db triggers a fresh import
if self.DB_PATH.exists():
os.remove(self.DB_PATH)
print("✓ Old database deleted for rebuild.")