|
|
""" |
|
|
HDX Data Ingestion Script |
|
|
|
|
|
Downloads and processes humanitarian datasets from the Humanitarian Data Exchange (HDX) |
|
|
for Panama, including population, health facilities, and other indicators. |
|
|
""" |
|
|
|
|
|
import httpx |
|
|
import json |
|
|
import os |
|
|
import asyncio |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
HDX_API = "https://data.humdata.org/api/3" |
|
|
|
|
|
|
|
|
DATASETS = { |
|
|
"population_worldpop": "worldpop-population-counts-for-panama", |
|
|
"admin_boundaries": "cod-ab-pan", |
|
|
"health_facilities": "panama-healthsites", |
|
|
} |
|
|
|
|
|
DATA_DIR = Path(__file__).parent.parent.parent / "data" |
|
|
RAW_DIR = DATA_DIR / "raw" / "hdx" |
|
|
PROCESSED_DIR = DATA_DIR / "processed" |
|
|
|
|
|
def ensure_dirs(): |
|
|
"""Create data directories if they don't exist.""" |
|
|
RAW_DIR.mkdir(parents=True, exist_ok=True) |
|
|
PROCESSED_DIR.mkdir(parents=True, exist_ok=True) |
|
|
(PROCESSED_DIR / "demographics").mkdir(exist_ok=True) |
|
|
(PROCESSED_DIR / "health").mkdir(exist_ok=True) |
|
|
(PROCESSED_DIR / "infrastructure").mkdir(exist_ok=True) |
|
|
|
|
|
async def get_dataset_resources(client: httpx.AsyncClient, dataset_id: str) -> list: |
|
|
"""Get list of downloadable resources for a dataset.""" |
|
|
try: |
|
|
response = await client.get(f"{HDX_API}/action/package_show", params={"id": dataset_id}) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
if data.get("success"): |
|
|
return data["result"].get("resources", []) |
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"Error fetching dataset {dataset_id}: {e}") |
|
|
return [] |
|
|
|
|
|
async def download_resource(client: httpx.AsyncClient, resource: dict, output_dir: Path) -> str: |
|
|
"""Download a single resource file.""" |
|
|
url = resource.get("url") |
|
|
name = resource.get("name", "unknown") |
|
|
format = resource.get("format", "").lower() |
|
|
|
|
|
|
|
|
if format not in ["csv", "json", "geojson", "xlsx", "xls", "zip"]: |
|
|
return None |
|
|
|
|
|
filename = f"{name}.{format}" |
|
|
filepath = output_dir / filename |
|
|
|
|
|
|
|
|
if filepath.exists(): |
|
|
print(f" Skipping (exists): {filename}") |
|
|
return str(filepath) |
|
|
|
|
|
print(f" Downloading: {filename}") |
|
|
try: |
|
|
response = await client.get(url, follow_redirects=True) |
|
|
response.raise_for_status() |
|
|
|
|
|
with open(filepath, "wb") as f: |
|
|
f.write(response.content) |
|
|
|
|
|
return str(filepath) |
|
|
except Exception as e: |
|
|
print(f" Error downloading {name}: {e}") |
|
|
return None |
|
|
|
|
|
async def ingest_hdx_datasets(): |
|
|
"""Main ingestion function.""" |
|
|
ensure_dirs() |
|
|
|
|
|
print("=" * 60) |
|
|
print("HDX Data Ingestion for Panama") |
|
|
print("=" * 60) |
|
|
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
|
for name, dataset_id in DATASETS.items(): |
|
|
print(f"\n📦 Dataset: {name} ({dataset_id})") |
|
|
|
|
|
|
|
|
dataset_dir = RAW_DIR / name |
|
|
dataset_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
resources = await get_dataset_resources(client, dataset_id) |
|
|
print(f" Found {len(resources)} resources") |
|
|
|
|
|
|
|
|
for resource in resources: |
|
|
await download_resource(client, resource, dataset_dir) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("Ingestion complete!") |
|
|
print("=" * 60) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(ingest_hdx_datasets()) |
|
|
|