Spaces:
Running
Running
| import hmac | |
| from hashlib import sha256 | |
| from typing import Dict, Any | |
| from fastapi import FastAPI, Header, HTTPException, Request, Response | |
| from huggingface_hub import HfApi | |
| import os | |
| # --- Configuration --- | |
| # Load from Space secrets | |
| WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET") | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| RESOURCE_GROUP_ID = os.environ.get("RESOURCE_GROUP_ID") | |
| if not all([WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID]): | |
| raise ValueError("Missing one or more required environment variables: WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID") | |
| app = FastAPI() | |
| hf_api = HfApi(token=os.getenv("HF_TOKEN")) | |
| # --- Webhook validation --- | |
| async def validate_webhook(request: Request, x_webhook_secret: str = Header(...)): | |
| if not x_webhook_secret: | |
| raise HTTPException(status_code=400, detail="X-Webhook-Secret header is missing") | |
| # Make sure the secret matches | |
| if x_webhook_secret != WEBHOOK_SECRET: | |
| raise HTTPException(status_code=401, detail="Invalid webhook secret") | |
| # --- Webhook endpoint --- | |
| def webhook_endpoint(): | |
| # path_to_call = f"/api/models/{MODEL_REPO_ID}/resource-group" | |
| payload = {"resourceGroupId": RESOURCE_GROUP_ID} | |
| response = hf_api.get_user_overview(username="CharlieBoyer", token=HF_TOKEN) | |
| print(response) | |
| return {"status": "event_not_handled"} | |
| def health_check(): | |
| return "Webhook listener is running." |