warbler-cda / warbler_cda /pack_loader.py
Bellok's picture
Upload folder using huggingface_hub
0ccf2f0 verified
raw
history blame
11.5 kB
"""Load Warbler pack data into the RetrievalAPI."""
import json
import logging
from pathlib import Path
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
class PackLoader:
"""Load Warbler pack data into the system."""
def __init__(self, packs_dir: Path = None):
"""Initialize the pack loader."""
if packs_dir is None:
packs_dir = Path(__file__).parent.parent / "packs"
self.packs_dir = Path(packs_dir)
self.documents = []
def discover_documents(self) -> List[Dict[str, Any]]:
"""Discover all documents across all packs."""
if not self.packs_dir.exists():
logger.warning(f"Packs directory not found: {self.packs_dir}")
return []
documents = []
for pack_dir in sorted(self.packs_dir.iterdir()):
if not pack_dir.is_dir():
continue
pack_name = pack_dir.name
logger.info(f"Loading pack: {pack_name}")
pack_docs = self._load_pack(pack_dir, pack_name)
documents.extend(pack_docs)
logger.info(f"✓ Loaded {len(pack_docs)} documents from {pack_name}")
self.documents = documents
return documents
def _load_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load documents from a specific pack."""
documents = []
jsonl_file = pack_dir / f"{pack_name}.jsonl"
# Validate this is actually a Warbler pack before loading
if self._is_valid_warbler_pack(pack_dir, pack_name, jsonl_file):
docs = self._load_jsonl_pack(pack_dir, pack_name)
documents.extend(docs)
else:
# Fall back to structured pack format
docs = self._load_structured_pack(pack_dir, pack_name)
documents.extend(docs)
return documents
def _is_valid_warbler_pack(self, pack_dir: Path, pack_name: str, jsonl_file: Path) -> bool:
"""Validate that a directory is a valid Warbler pack.
A valid Warbler pack must have:
1. Either:
- A JSONL file matching the pack name (single-file pack), OR
- Chunk files matching the pattern (chunked pack)
- OR structured templates that can be converted to JSONL
2. AND either:
- A package.json metadata file, OR
- The pack name starts with 'warbler-pack-hf-' (HuggingFace packs)
"""
# Check for package.json metadata first
package_json = pack_dir / "package.json"
has_valid_metadata = False
is_chunked = False
if package_json.exists():
try:
with open(package_json, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Validate it has required fields
if "name" in metadata and "version" in metadata:
has_valid_metadata = True
is_chunked = metadata.get("chunked", False)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Invalid package.json in {pack_dir}: {e}")
# Allow HuggingFace packs even without package.json (for backward compatibility)
if pack_name.startswith("warbler-pack-hf-"):
has_valid_metadata = True
if not has_valid_metadata:
return False
# Check for appropriate JSONL files based on chunked status
if is_chunked:
# For chunked packs, look for chunk files
chunk_files = list(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))
if chunk_files:
logger.debug(f"Found {len(chunk_files)} chunk files for {pack_name}")
return True
else:
logger.warning(f"Chunked pack {pack_name} has no chunk files")
return False
else:
# For single-file packs, check if JSONL file exists
if jsonl_file.exists():
return True
# Check for structured pack templates that can be converted
templates_file = pack_dir / "pack" / "templates.json"
if templates_file.exists():
logger.debug(f"Single-file pack {pack_name} missing JSONL, but templates.json exists")
return True
logger.warning(f"Single-file pack {pack_name} missing JSONL file: {jsonl_file}")
return False
def _load_jsonl_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load JSONL-based pack (supports both single-file and chunked packs)."""
documents = []
# Check if this is a chunked pack by reading package.json
package_json = pack_dir / "package.json"
is_chunked = False
if package_json.exists():
try:
with open(package_json, "r", encoding="utf-8") as f:
metadata = json.load(f)
is_chunked = metadata.get("chunked", False)
except (json.JSONDecodeError, IOError) as err:
logger.warning(f"Could not read package.json for {pack_name}: {err}")
if is_chunked:
# Load chunked pack
logger.info(f"Loading chunked pack: {pack_name}")
# Find all chunk files matching the pattern
# Pattern is like "warbler-pack-hf-arxiv-chunk-*.jsonl"
# We need to find files like "warbler-pack-hf-arxiv-chunk-001.jsonl", etc.
chunk_files = sorted(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))
if not chunk_files:
logger.warning(f"No chunk files found for chunked pack {pack_name}")
return documents
logger.info(f"Found {len(chunk_files)} chunk files for {pack_name}")
# Load each chunk file in order
for chunk_file in chunk_files:
logger.debug(f"Loading chunk: {chunk_file.name}")
chunk_docs = self._load_jsonl_file(chunk_file, pack_name)
documents.extend(chunk_docs)
logger.info(f"Loaded {len(documents)} total documents from {len(chunk_files)} chunks")
else:
# Load single-file pack (backward compatibility)
jsonl_file = pack_dir / f"{pack_name}.jsonl"
if not jsonl_file.exists():
logger.warning(f"JSONL file not found: {jsonl_file}")
return documents
documents = self._load_jsonl_file(jsonl_file, pack_name)
return documents
def _load_jsonl_file(self, jsonl_file: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load a single JSONL file with robust error handling."""
documents = []
error_count = 0
max_errors_to_log = 5
try:
with open(jsonl_file, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
entry = json.loads(line)
doc = self._format_document(
entry, pack_name, f"{jsonl_file.stem}_line_{line_num}"
)
documents.append(doc)
except json.JSONDecodeError as e:
error_count += 1
# Only log first few errors to avoid spam
if error_count <= max_errors_to_log:
logger.warning(
f"Error parsing line {line_num} in {jsonl_file.name}: {e}"
)
# Continue processing other lines instead of failing
continue
if error_count > 0:
logger.info(
f"Loaded {len(documents)} documents from {jsonl_file.name} "
f"({error_count} lines skipped due to errors)"
)
except Exception as e:
logger.error(f"Error loading JSONL file {jsonl_file}: {e}")
return documents
def _load_structured_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load structured pack with templates."""
documents = []
templates_file = pack_dir / "pack" / "templates.json"
if not templates_file.exists():
logger.debug(f"No templates.json found in {pack_dir}")
return documents
try:
with open(templates_file, "r", encoding="utf-8") as f:
data = json.load(f)
templates = data if isinstance(data, list) else data.get("templates", [])
for template in templates:
doc = {
"id": f"{pack_name}/{template.get('id', 'unknown')}",
"content": template.get("content", json.dumps(template)),
"metadata": {
"pack": pack_name,
"type": "template",
"template_id": template.get("id"),
"realm_type": self._infer_realm(pack_name),
"realm_label": pack_name.replace("warbler-pack-", ""),
"lifecycle_stage": "peak",
"activity_level": 0.8,
},
}
documents.append(doc)
self._generate_jsonl_from_templates(pack_dir, pack_name, documents)
except Exception as e:
logger.error(f"Error loading templates from {pack_name}: {e}")
return documents
def _generate_jsonl_from_templates(
self, pack_dir: Path, pack_name: str, documents: List[Dict[str, Any]]
) -> None:
"""Generate JSONL file from templates for future loads."""
try:
jsonl_file = pack_dir / f"{pack_name}.jsonl"
if jsonl_file.exists():
return
with open(jsonl_file, "w", encoding="utf-8") as f:
for doc in documents:
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
logger.info(f"Generated JSONL file for {pack_name}: {jsonl_file.name}")
except Exception as e:
logger.debug(f"Could not generate JSONL for {pack_name}: {e}")
def _format_document(
self, entry: Dict[str, Any], pack_name: str, doc_id: str
) -> Dict[str, Any]:
"""Format a pack entry into a document."""
content = entry.get("content") or entry.get("text") or json.dumps(entry)
return {
"id": f"{pack_name}/{doc_id}",
"content": str(content),
"metadata": {
"pack": pack_name,
"type": entry.get("type", "dialogue"),
"realm_type": self._infer_realm(pack_name),
"realm_label": pack_name.replace("warbler-pack-", ""),
"lifecycle_stage": "emergence",
"activity_level": 0.7,
**{k: v for k, v in entry.items() if k not in ["content", "text"]},
},
}
def _infer_realm(self, pack_name: str) -> str:
"""Infer realm type from pack name."""
if "wisdom" in pack_name:
return "wisdom"
elif "faction" in pack_name or "politics" in pack_name:
return "faction"
elif "dialogue" in pack_name or "npc" in pack_name:
return "narrative"
else:
return "narrative"