davidtran999's picture
Upload backend/core/tasks.py with huggingface_hub
f468a7c verified
from __future__ import annotations
import os
from pathlib import Path
from django.utils import timezone
from hue_portal.core.models import IngestionJob
# Optional celery import - may not be available in all environments
try:
from celery import shared_task
CELERY_AVAILABLE = True
except (ImportError, AttributeError, Exception):
CELERY_AVAILABLE = False
# Create a dummy decorator if celery is not available
def shared_task(*args, **kwargs):
def decorator(func):
return func
return decorator
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=30, retry_kwargs={"max_retries": 3})
def process_ingestion_job(self, job_id: str) -> None:
job = IngestionJob.objects.filter(id=job_id).first()
if not job:
return
job.status = IngestionJob.STATUS_RUNNING
job.started_at = timezone.now()
job.progress = 10
job.save(update_fields=["status", "started_at", "progress", "updated_at"])
try:
storage_path = Path(job.storage_path)
if not storage_path.exists():
raise FileNotFoundError(f"Job file missing: {storage_path}")
from hue_portal.core.services.legal_ingestion import ingest_uploaded_document
with storage_path.open("rb") as handle:
result = ingest_uploaded_document(
file_obj=handle,
filename=job.filename,
metadata=job.metadata or {},
)
job.status = IngestionJob.STATUS_COMPLETED
job.document = result.document
job.finished_at = timezone.now()
job.progress = 100
job.stats = {"sections": result.sections_count, "images": result.images_count}
job.save(
update_fields=[
"status",
"document",
"finished_at",
"progress",
"stats",
"updated_at",
]
)
if os.getenv("DELETE_JOB_FILES_ON_SUCCESS", "false").lower() == "true":
storage_path.unlink(missing_ok=True)
except Exception as exc: # pragma: no cover - logging path
job.status = IngestionJob.STATUS_FAILED
job.error_message = str(exc)
job.finished_at = timezone.now()
job.progress = 100
job.save(
update_fields=[
"status",
"error_message",
"finished_at",
"progress",
"updated_at",
]
)
raise