|
|
""" |
|
|
Utilities to ingest uploaded legal documents into persistent storage. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import hashlib |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, date |
|
|
from io import BytesIO |
|
|
from typing import BinaryIO, Dict, Optional |
|
|
from pathlib import Path |
|
|
import re |
|
|
|
|
|
from django.conf import settings |
|
|
from django.core.files.base import ContentFile |
|
|
from django.db import transaction |
|
|
from django.utils import timezone |
|
|
|
|
|
from hue_portal.core.models import ( |
|
|
LegalDocument, |
|
|
LegalSection, |
|
|
LegalDocumentImage, |
|
|
IngestionJob, |
|
|
) |
|
|
from hue_portal.core.etl.legal_document_loader import load_legal_document |
|
|
from hue_portal.core.tasks import process_ingestion_job |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LegalIngestionResult: |
|
|
document: LegalDocument |
|
|
created: bool |
|
|
sections_count: int |
|
|
images_count: int |
|
|
|
|
|
|
|
|
def _parse_date(value: Optional[str | date]) -> Optional[date]: |
|
|
if isinstance(value, date): |
|
|
return value |
|
|
if not value: |
|
|
return None |
|
|
for fmt in ("%Y-%m-%d", "%d/%m/%Y"): |
|
|
try: |
|
|
return datetime.strptime(value, fmt).date() |
|
|
except ValueError: |
|
|
continue |
|
|
return None |
|
|
|
|
|
|
|
|
def _sha256(data: bytes) -> str: |
|
|
digest = hashlib.sha256() |
|
|
digest.update(data) |
|
|
return digest.hexdigest() |
|
|
|
|
|
|
|
|
def _normalize_text(text: str) -> str: |
|
|
cleaned = re.sub(r"\s+", "", text or "") |
|
|
return cleaned.lower() |
|
|
|
|
|
|
|
|
DOC_TYPE_KEYWORDS = { |
|
|
"decision": ["quyết định"], |
|
|
"circular": ["thông tư"], |
|
|
"guideline": ["hướng dẫn"], |
|
|
"plan": ["kế hoạch"], |
|
|
} |
|
|
|
|
|
|
|
|
def _auto_fill_metadata( |
|
|
*, text: str, title: str, issued_by: str, issued_at: Optional[date], doc_type: str |
|
|
) -> tuple[str, str, Optional[date], str]: |
|
|
head = (text or "")[:2000] |
|
|
if not issued_by: |
|
|
match = re.search(r"(BỘ\s+[A-ZÂĂÊÔƠƯ\s]+|ỦY BAN\s+NHÂN DÂN\s+[^\n]+)", head, re.IGNORECASE) |
|
|
if match: |
|
|
issued_by = match.group(0).strip() |
|
|
|
|
|
if not issued_at: |
|
|
match = re.search( |
|
|
r"(\d{1,2})[\/\-](\d{1,2})[\/\-](\d{4})", head, |
|
|
) |
|
|
if match: |
|
|
day, month, year = match.groups() |
|
|
issued_at = _parse_date(f"{year}-{int(month):02d}-{int(day):02d}") |
|
|
else: |
|
|
match = re.search( |
|
|
r"ngày\s+(\d{1,2})\s+tháng\s+(\d{1,2})\s+năm\s+(\d{4})", |
|
|
head, |
|
|
re.IGNORECASE, |
|
|
) |
|
|
if match: |
|
|
day, month, year = match.groups() |
|
|
issued_at = _parse_date(f"{year}-{int(month):02d}-{int(day):02d}") |
|
|
|
|
|
if doc_type == "other": |
|
|
lower = head.lower() |
|
|
for dtype, keywords in DOC_TYPE_KEYWORDS.items(): |
|
|
if any(keyword in lower for keyword in keywords): |
|
|
doc_type = dtype |
|
|
break |
|
|
|
|
|
if not title or title == (DOC_TYPE_KEYWORDS.get(doc_type, [title])[0] if doc_type != "other" else ""): |
|
|
match = re.search(r"(QUYẾT ĐỊNH|THÔNG TƯ|HƯỚNG DẪN|KẾ HOẠCH)[^\n]+", head, re.IGNORECASE) |
|
|
if match: |
|
|
title = match.group(0).strip().title() |
|
|
|
|
|
return title, issued_by, issued_at, doc_type |
|
|
|
|
|
|
|
|
def ingest_uploaded_document( |
|
|
*, |
|
|
file_obj: BinaryIO, |
|
|
filename: str, |
|
|
metadata: Dict, |
|
|
) -> LegalIngestionResult: |
|
|
""" |
|
|
Ingest uploaded PDF/DOCX file, storing raw file, sections, and extracted images. |
|
|
|
|
|
Args: |
|
|
file_obj: Binary file-like object positioned at start. |
|
|
filename: Original filename. |
|
|
metadata: dict containing code, title, doc_type, summary, issued_by, issued_at, source_url, extra_metadata. |
|
|
""" |
|
|
code = metadata.get("code", "").strip() |
|
|
if not code: |
|
|
raise ValueError("Document code is required.") |
|
|
|
|
|
title = metadata.get("title") or code |
|
|
doc_type = metadata.get("doc_type", "other") |
|
|
issued_at = _parse_date(metadata.get("issued_at")) |
|
|
summary = metadata.get("summary", "") |
|
|
issued_by = metadata.get("issued_by", "") |
|
|
source_url = metadata.get("source_url", "") |
|
|
extra_metadata = metadata.get("metadata") or {} |
|
|
|
|
|
file_bytes = file_obj.read() |
|
|
if hasattr(file_obj, "seek"): |
|
|
file_obj.seek(0) |
|
|
checksum = _sha256(file_bytes) |
|
|
mime_type = metadata.get("mime_type") or getattr(file_obj, "content_type", "") |
|
|
size = len(file_bytes) |
|
|
|
|
|
extracted = load_legal_document(BytesIO(file_bytes), filename=filename) |
|
|
title, issued_by, issued_at, doc_type = _auto_fill_metadata( |
|
|
text=extracted.text, title=title, issued_by=issued_by, issued_at=issued_at, doc_type=doc_type |
|
|
) |
|
|
normalized_text = _normalize_text(extracted.text) |
|
|
content_checksum = _sha256(normalized_text.encode("utf-8")) |
|
|
|
|
|
duplicate = ( |
|
|
LegalDocument.objects.filter(content_checksum=content_checksum) |
|
|
.exclude(code=code) |
|
|
.first() |
|
|
) |
|
|
if duplicate: |
|
|
raise ValueError(f"Nội dung trùng với văn bản hiện có: {duplicate.code}") |
|
|
|
|
|
with transaction.atomic(): |
|
|
doc, created = LegalDocument.objects.get_or_create( |
|
|
code=code, |
|
|
defaults={ |
|
|
"title": title, |
|
|
"doc_type": doc_type, |
|
|
"summary": summary, |
|
|
"issued_by": issued_by, |
|
|
"issued_at": issued_at, |
|
|
"source_url": source_url, |
|
|
"metadata": extra_metadata, |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
doc.title = title |
|
|
doc.doc_type = doc_type |
|
|
doc.summary = summary |
|
|
doc.issued_by = issued_by |
|
|
doc.issued_at = issued_at |
|
|
doc.source_url = source_url |
|
|
doc.metadata = extra_metadata |
|
|
doc.page_count = extracted.page_count |
|
|
doc.raw_text = extracted.text |
|
|
doc.raw_text_ocr = extracted.ocr_text or "" |
|
|
doc.file_checksum = checksum |
|
|
doc.content_checksum = content_checksum |
|
|
doc.file_size = size |
|
|
doc.mime_type = mime_type |
|
|
doc.original_filename = filename |
|
|
doc.updated_at = timezone.now() |
|
|
|
|
|
|
|
|
content = ContentFile(file_bytes) |
|
|
storage_name = f"{code}/{filename}" |
|
|
doc.uploaded_file.save(storage_name, content, save=False) |
|
|
doc.source_file = doc.uploaded_file.name |
|
|
doc.save() |
|
|
|
|
|
|
|
|
doc.sections.all().delete() |
|
|
sections = [] |
|
|
for idx, section in enumerate(extracted.sections, start=1): |
|
|
sections.append( |
|
|
LegalSection( |
|
|
document=doc, |
|
|
section_code=section.code, |
|
|
section_title=section.title, |
|
|
level=section.level, |
|
|
order=idx, |
|
|
content=section.content, |
|
|
excerpt=section.content[:400], |
|
|
page_start=section.page_start, |
|
|
page_end=section.page_end, |
|
|
is_ocr=section.is_ocr, |
|
|
metadata=section.metadata or {}, |
|
|
) |
|
|
) |
|
|
LegalSection.objects.bulk_create(sections, batch_size=200) |
|
|
|
|
|
|
|
|
doc.images.all().delete() |
|
|
images = [] |
|
|
for idx, image in enumerate(extracted.images, start=1): |
|
|
image_content = ContentFile(image.data) |
|
|
image_name = f"{code}/img_{idx}.{image.extension}" |
|
|
img_instance = LegalDocumentImage( |
|
|
document=doc, |
|
|
page_number=image.page_number, |
|
|
description=image.description, |
|
|
width=image.width, |
|
|
height=image.height, |
|
|
checksum=_sha256(image.data), |
|
|
) |
|
|
img_instance.image.save(image_name, image_content, save=False) |
|
|
images.append(img_instance) |
|
|
LegalDocumentImage.objects.bulk_create(images, batch_size=100) |
|
|
|
|
|
return LegalIngestionResult( |
|
|
document=doc, |
|
|
created=created, |
|
|
sections_count=len(sections), |
|
|
images_count=len(images), |
|
|
) |
|
|
|
|
|
|
|
|
def enqueue_ingestion_job(*, file_obj, filename: str, metadata: Dict) -> IngestionJob: |
|
|
""" |
|
|
Persist uploaded file to a temporary job folder and enqueue Celery processing. |
|
|
""" |
|
|
|
|
|
job = IngestionJob.objects.create( |
|
|
code=metadata.get("code", ""), |
|
|
filename=filename, |
|
|
metadata=metadata, |
|
|
status=IngestionJob.STATUS_PENDING, |
|
|
) |
|
|
|
|
|
temp_dir = Path(settings.MEDIA_ROOT) / "ingestion_jobs" / str(job.id) |
|
|
temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
temp_path = temp_dir / filename |
|
|
|
|
|
if hasattr(file_obj, "seek"): |
|
|
file_obj.seek(0) |
|
|
if hasattr(file_obj, "chunks"): |
|
|
with temp_path.open("wb") as dest: |
|
|
for chunk in file_obj.chunks(): |
|
|
dest.write(chunk) |
|
|
else: |
|
|
data = file_obj.read() |
|
|
with temp_path.open("wb") as dest: |
|
|
dest.write(data) |
|
|
|
|
|
job.storage_path = str(temp_path) |
|
|
job.save(update_fields=["storage_path"]) |
|
|
process_ingestion_job.delay(str(job.id)) |
|
|
return job |
|
|
|
|
|
|