|
|
import argparse |
|
|
import csv |
|
|
import os |
|
|
import sys |
|
|
from datetime import datetime, date |
|
|
from pathlib import Path |
|
|
from typing import Dict, Optional |
|
|
|
|
|
import django |
|
|
from pydantic import BaseModel, ValidationError, field_validator |
|
|
|
|
|
|
|
|
ROOT_DIR = Path(__file__).resolve().parents[2] |
|
|
BACKEND_DIR = ROOT_DIR / "backend" |
|
|
HUE_PORTAL_DIR = BACKEND_DIR / "hue_portal" |
|
|
DEFAULT_DATA_DIR = ROOT_DIR / "tài nguyên" |
|
|
DATA_DIR = Path(os.environ.get("ETL_DATA_DIR", DEFAULT_DATA_DIR)) |
|
|
LOG_DIR = ROOT_DIR / "backend" / "logs" / "data_quality" |
|
|
|
|
|
for path in (HUE_PORTAL_DIR, BACKEND_DIR, ROOT_DIR): |
|
|
if str(path) not in sys.path: |
|
|
sys.path.insert(0, str(path)) |
|
|
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hue_portal.hue_portal.settings") |
|
|
django.setup() |
|
|
|
|
|
from hue_portal.core.models import Fine, Office, Procedure, Advisory |
|
|
|
|
|
|
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
class OfficeRecord(BaseModel): |
|
|
unit_name: str |
|
|
address: Optional[str] = "" |
|
|
district: Optional[str] = "" |
|
|
working_hours: Optional[str] = "" |
|
|
phone: Optional[str] = "" |
|
|
email: Optional[str] = "" |
|
|
latitude: Optional[float] |
|
|
longitude: Optional[float] |
|
|
service_scope: Optional[str] = "" |
|
|
updated_at: Optional[datetime] |
|
|
|
|
|
@field_validator("unit_name") |
|
|
@classmethod |
|
|
def unit_name_not_blank(cls, value: str) -> str: |
|
|
if not value: |
|
|
raise ValueError("unit_name is required") |
|
|
return value |
|
|
|
|
|
|
|
|
class FineRecord(BaseModel): |
|
|
violation_code: str |
|
|
violation_name: Optional[str] = "" |
|
|
article: Optional[str] = "" |
|
|
decree: Optional[str] = "" |
|
|
min_fine: Optional[float] |
|
|
max_fine: Optional[float] |
|
|
license_points: Optional[str] = "" |
|
|
remedial_measures: Optional[str] = "" |
|
|
source_url: Optional[str] = "" |
|
|
updated_at: Optional[datetime] |
|
|
|
|
|
@field_validator("violation_code") |
|
|
@classmethod |
|
|
def code_not_blank(cls, value: str) -> str: |
|
|
if not value: |
|
|
raise ValueError("violation_code is required") |
|
|
return value |
|
|
|
|
|
|
|
|
class ProcedureRecord(BaseModel): |
|
|
title: str |
|
|
domain: Optional[str] = "" |
|
|
level: Optional[str] = "" |
|
|
conditions: Optional[str] = "" |
|
|
dossier: Optional[str] = "" |
|
|
fee: Optional[str] = "" |
|
|
duration: Optional[str] = "" |
|
|
authority: Optional[str] = "" |
|
|
source_url: Optional[str] = "" |
|
|
updated_at: Optional[datetime] |
|
|
|
|
|
@field_validator("title") |
|
|
@classmethod |
|
|
def title_not_blank(cls, value: str) -> str: |
|
|
if not value: |
|
|
raise ValueError("title is required") |
|
|
return value |
|
|
|
|
|
|
|
|
class AdvisoryRecord(BaseModel): |
|
|
title: str |
|
|
summary: str |
|
|
source_url: Optional[str] = "" |
|
|
published_at: Optional[date] |
|
|
|
|
|
@field_validator("title") |
|
|
@classmethod |
|
|
def title_not_blank(cls, value: str) -> str: |
|
|
if not value: |
|
|
raise ValueError("title is required") |
|
|
return value |
|
|
|
|
|
@field_validator("summary") |
|
|
@classmethod |
|
|
def summary_not_blank(cls, value: str) -> str: |
|
|
if not value: |
|
|
raise ValueError("summary is required") |
|
|
return value |
|
|
|
|
|
|
|
|
def parse_datetime(value: Optional[str]) -> Optional[datetime]: |
|
|
if not value: |
|
|
return None |
|
|
for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%d/%m/%Y"): |
|
|
try: |
|
|
return datetime.strptime(value, fmt) |
|
|
except ValueError: |
|
|
continue |
|
|
try: |
|
|
return datetime.fromisoformat(value) |
|
|
except ValueError: |
|
|
return None |
|
|
|
|
|
|
|
|
def parse_date(value: Optional[str]) -> Optional[datetime]: |
|
|
"""Parse date string to datetime.date object (for Advisory.published_at)""" |
|
|
if not value: |
|
|
return None |
|
|
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%d/%m/%Y"): |
|
|
try: |
|
|
dt = datetime.strptime(value, fmt) |
|
|
return dt.date() |
|
|
except ValueError: |
|
|
continue |
|
|
return None |
|
|
|
|
|
|
|
|
def log_error(file_handle, dataset: str, row: Dict[str, str], error: str) -> None: |
|
|
file_handle.write( |
|
|
f"[{datetime.utcnow().isoformat()}Z] dataset={dataset} error={error} row={row}\n" |
|
|
) |
|
|
|
|
|
|
|
|
def should_skip(updated_at: Optional[datetime], since: Optional[datetime]) -> bool: |
|
|
if not since or not updated_at: |
|
|
return False |
|
|
return updated_at < since |
|
|
|
|
|
|
|
|
def load_offices(since: Optional[datetime], dry_run: bool, log_file) -> int: |
|
|
path = DATA_DIR / "danh_ba_diem_tiep_dan.csv" |
|
|
if not path.exists(): |
|
|
log_error(log_file, "offices", {}, f"File không tồn tại: {path}") |
|
|
return 0 |
|
|
|
|
|
processed = 0 |
|
|
with path.open(encoding="utf-8") as handle: |
|
|
reader = csv.DictReader(handle) |
|
|
for row in reader: |
|
|
row = {k: (v or "").strip() for k, v in row.items()} |
|
|
for key in ["latitude", "longitude"]: |
|
|
if row.get(key) == "": |
|
|
row[key] = None |
|
|
row["updated_at"] = parse_datetime(row.get("updated_at")) |
|
|
try: |
|
|
record = OfficeRecord(**row) |
|
|
except ValidationError as exc: |
|
|
log_error(log_file, "offices", row, str(exc)) |
|
|
continue |
|
|
|
|
|
if should_skip(record.updated_at, since): |
|
|
continue |
|
|
|
|
|
processed += 1 |
|
|
if dry_run: |
|
|
continue |
|
|
|
|
|
Office.objects.update_or_create( |
|
|
unit_name=record.unit_name, |
|
|
defaults={ |
|
|
"address": record.address or "", |
|
|
"district": record.district or "", |
|
|
"working_hours": record.working_hours or "", |
|
|
"phone": record.phone or "", |
|
|
"email": record.email or "", |
|
|
"latitude": record.latitude, |
|
|
"longitude": record.longitude, |
|
|
"service_scope": record.service_scope or "", |
|
|
}, |
|
|
) |
|
|
return processed |
|
|
|
|
|
|
|
|
def load_fines(since: Optional[datetime], dry_run: bool, log_file) -> int: |
|
|
path = DATA_DIR / "muc_phat_theo_hanh_vi.csv" |
|
|
if not path.exists(): |
|
|
log_error(log_file, "fines", {}, f"File không tồn tại: {path}") |
|
|
return 0 |
|
|
|
|
|
processed = 0 |
|
|
with path.open(encoding="utf-8") as handle: |
|
|
reader = csv.DictReader(handle) |
|
|
for row in reader: |
|
|
row = {k: (v or "").strip() for k, v in row.items()} |
|
|
for key in ["min_fine", "max_fine"]: |
|
|
if row.get(key) == "": |
|
|
row[key] = None |
|
|
row["updated_at"] = parse_datetime(row.get("updated_at")) |
|
|
try: |
|
|
record = FineRecord(**row) |
|
|
except ValidationError as exc: |
|
|
log_error(log_file, "fines", row, str(exc)) |
|
|
continue |
|
|
|
|
|
if should_skip(record.updated_at, since): |
|
|
continue |
|
|
|
|
|
processed += 1 |
|
|
if dry_run: |
|
|
continue |
|
|
|
|
|
Fine.objects.update_or_create( |
|
|
code=record.violation_code, |
|
|
defaults={ |
|
|
"name": record.violation_name or "", |
|
|
"article": record.article or "", |
|
|
"decree": record.decree or "", |
|
|
"min_fine": record.min_fine, |
|
|
"max_fine": record.max_fine, |
|
|
"license_points": record.license_points or "", |
|
|
"remedial": record.remedial_measures or "", |
|
|
"source_url": record.source_url or "", |
|
|
}, |
|
|
) |
|
|
return processed |
|
|
|
|
|
|
|
|
def load_procedures(since: Optional[datetime], dry_run: bool, log_file) -> int: |
|
|
path = DATA_DIR / "thu_tuc_hanh_chinh.csv" |
|
|
if not path.exists(): |
|
|
log_error(log_file, "procedures", {}, f"File không tồn tại: {path}") |
|
|
return 0 |
|
|
|
|
|
processed = 0 |
|
|
with path.open(encoding="utf-8") as handle: |
|
|
reader = csv.DictReader(handle) |
|
|
for row in reader: |
|
|
|
|
|
clean_row = {} |
|
|
for k, v in row.items(): |
|
|
key = str(k).strip() if k else "" |
|
|
value = (v.strip() if isinstance(v, str) else str(v or "")) if v else "" |
|
|
clean_row[key] = value |
|
|
clean_row["updated_at"] = parse_datetime(clean_row.get("updated_at")) |
|
|
try: |
|
|
record = ProcedureRecord(**clean_row) |
|
|
except ValidationError as exc: |
|
|
log_error(log_file, "procedures", clean_row, str(exc)) |
|
|
continue |
|
|
|
|
|
if should_skip(record.updated_at, since): |
|
|
continue |
|
|
|
|
|
processed += 1 |
|
|
if dry_run: |
|
|
continue |
|
|
|
|
|
Procedure.objects.update_or_create( |
|
|
title=record.title, |
|
|
domain=record.domain or "", |
|
|
defaults={ |
|
|
"level": record.level or "", |
|
|
"conditions": record.conditions or "", |
|
|
"dossier": record.dossier or "", |
|
|
"fee": record.fee or "", |
|
|
"duration": record.duration or "", |
|
|
"authority": record.authority or "", |
|
|
"source_url": record.source_url or "", |
|
|
}, |
|
|
) |
|
|
return processed |
|
|
|
|
|
|
|
|
def load_advisories(since: Optional[datetime], dry_run: bool, log_file) -> int: |
|
|
path = DATA_DIR / "canh_bao_lua_dao.csv" |
|
|
if not path.exists(): |
|
|
log_error(log_file, "advisories", {}, f"File không tồn tại: {path}") |
|
|
return 0 |
|
|
|
|
|
processed = 0 |
|
|
with path.open(encoding="utf-8") as handle: |
|
|
reader = csv.DictReader(handle) |
|
|
for row in reader: |
|
|
|
|
|
clean_row = {} |
|
|
for k, v in row.items(): |
|
|
key = str(k).strip() if k else "" |
|
|
value = (v.strip() if isinstance(v, str) else str(v or "")) if v else "" |
|
|
clean_row[key] = value |
|
|
clean_row["published_at"] = parse_date(clean_row.get("published_at")) |
|
|
try: |
|
|
record = AdvisoryRecord(**clean_row) |
|
|
except ValidationError as exc: |
|
|
log_error(log_file, "advisories", clean_row, str(exc)) |
|
|
continue |
|
|
|
|
|
|
|
|
if since and record.published_at: |
|
|
if record.published_at < since.date(): |
|
|
continue |
|
|
|
|
|
processed += 1 |
|
|
if dry_run: |
|
|
continue |
|
|
|
|
|
Advisory.objects.update_or_create( |
|
|
title=record.title, |
|
|
defaults={ |
|
|
"summary": record.summary or "", |
|
|
"source_url": record.source_url or "", |
|
|
"published_at": record.published_at, |
|
|
}, |
|
|
) |
|
|
return processed |
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
parser = argparse.ArgumentParser(description="ETL dữ liệu chatbot") |
|
|
parser.add_argument("--since", help="Chỉ xử lý bản ghi có updated_at >= giá trị này (ISO date)") |
|
|
parser.add_argument("--dry-run", action="store_true", help="Chỉ kiểm tra dữ liệu, không ghi vào DB") |
|
|
parser.add_argument("--datasets", nargs="*", default=["offices", "fines"], choices=["offices", "fines", "procedures", "advisories"], help="Chọn dataset cần nạp") |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = parse_args() |
|
|
since = parse_datetime(args.since) if args.since else None |
|
|
log_path = LOG_DIR / f"etl_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.log" |
|
|
|
|
|
with log_path.open("a", encoding="utf-8") as log_file: |
|
|
if "offices" in args.datasets: |
|
|
total = load_offices(since, args.dry_run, log_file) |
|
|
print(f"Offices processed: {total}") |
|
|
if "fines" in args.datasets: |
|
|
total = load_fines(since, args.dry_run, log_file) |
|
|
print(f"Fines processed: {total}") |
|
|
if "procedures" in args.datasets: |
|
|
total = load_procedures(since, args.dry_run, log_file) |
|
|
print(f"Procedures processed: {total}") |
|
|
if "advisories" in args.datasets: |
|
|
total = load_advisories(since, args.dry_run, log_file) |
|
|
print(f"Advisories processed: {total}") |
|
|
|
|
|
print(f"Log ghi tại {log_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|