ChatbotDemo / create_granular_chunks.py
Kalpokoch's picture
improvements dec
0194a83
raw
history blame
8.79 kB
# create_granular_chunks.py
import os
import json
import re
from typing import List, Dict, Any
import nltk
# Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
nltk.download('punkt')
nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError
# --- Configuration ---
INPUT_FILE = "combined_context.jsonl"
OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent
# --- Global State ---
chunk_counter = 0
def get_unique_id() -> str:
"""Returns a unique, incrementing ID for each chunk."""
global chunk_counter
chunk_counter += 1
return f"chunk-{chunk_counter}"
def create_chunk(context: Dict, text: str) -> Dict:
"""Creates a standardized chunk dictionary with rich metadata."""
metadata = {
"section": context.get("section"),
"clause": context.get("clause") or context.get("Clause"),
"title": context.get("title"),
"source_description": context.get("description"),
}
# Add other primitive metadata keys
for key, value in context.items():
if key not in metadata and isinstance(value, (str, int, float, bool)):
metadata[key] = value
return {
"id": get_unique_id(),
"text": text.strip(),
"metadata": {k: v for k, v in metadata.items() if v is not None}
}
def format_delegation_text(delegation: Any) -> str:
"""
Formats a delegation dictionary or string into a readable string.
Explicitly includes "NIL" or "---" to capture no power cases.
"""
if not isinstance(delegation, dict):
return str(delegation)
parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
return ", ".join(parts) if parts else "No specific delegation provided."
def format_remarks(remarks: Any) -> str:
"""Safely formats the 'remarks' field, handling various data types."""
if isinstance(remarks, list):
remark_parts = []
for item in remarks:
if isinstance(item, dict):
for key, value in item.items():
remark_parts.append(f"{key}: {value}")
else:
remark_parts.append(str(item))
return " ".join(remark_parts)
return str(remarks)
def smart_chunk_size(context: Dict) -> int:
"""
Adaptive chunk sizing based on content type.
Smaller chunks for dense information, larger for descriptive.
"""
if "delegation" in context:
return 1000 # Smaller for dense financial/delegation info
elif "composition" in context:
return 800 # Smaller for structural/hierarchical info
elif "items" in context or "exclusions" in context:
return 600 # Smaller for list-based info
else:
return 1500 # Default for descriptive content
def build_descriptive_text(context: Dict) -> str:
"""
Builds a clear, descriptive, natural language text by combining fields.
Focused for best relevance and contextual richness.
"""
text_parts = []
if context.get("title"):
text_parts.append(f"Regarding the policy '{context['title']}'")
specific_desc = context.get('description') or context.get('method')
if specific_desc and specific_desc != context.get('title'):
text_parts.append(f"specifically for '{specific_desc}'")
if "delegation" in context:
delegation_text = format_delegation_text(context["delegation"])
text_parts.append(f", financial delegations are: {delegation_text}.")
elif "composition" in context:
composition_parts = []
for item in context["composition"]:
if isinstance(item, dict):
for role, members in item.items():
member_text = (f"the {role} is {members}" if isinstance(members, str)
else f"the {role} are: {', '.join(members)}")
composition_parts.append(member_text)
text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
if "remarks" in context and context["remarks"]:
remarks_text = format_remarks(context["remarks"])
text_parts.append(f" Important remarks include: {remarks_text}")
# Join all parts into a flowing sentence
return " ".join(text_parts).strip()
def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
"""
Splits a long text into smaller chunks with controlled overlap.
Uses sentence tokenization for natural splits.
"""
text = text.strip()
if len(text) <= max_char_length:
return [text]
# Explicitly specify language to avoid punkt_tab error
sentences = nltk.tokenize.sent_tokenize(text, language='english')
chunks = []
current_chunk = ""
for sentence in sentences:
# +1 for space/newline likely added between sentences
if len(current_chunk) + len(sentence) + 1 <= max_char_length:
current_chunk += (" " + sentence) if current_chunk else sentence
else:
chunks.append(current_chunk.strip())
# Start next chunk with overlap from end of previous chunk (by characters)
if overlap < len(current_chunk):
current_chunk = current_chunk[-overlap:] + " " + sentence
else:
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
"""
Processes a JSON policy entry and returns granular, context-rich chunks.
Applies recursive traversal and implements chunk size limiting.
"""
context = {**(parent_context or {}), **data}
chunks = []
# Handler 1: Simple Item Lists (ex: rules, exclusions)
list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
if list_key:
base_title = context.get('title', 'a policy')
for item in data[list_key]:
if isinstance(item, str):
# Build chunk text with clear descriptive prefix for relevance
text = f"A rule regarding '{base_title}' is: {item}."
# Split if too long
for sub_chunk in split_text_into_chunks(text):
chunks.append(create_chunk(context, sub_chunk))
return chunks
# Handler 2: Recursive traversal for nested dictionaries/lists
has_recursed = False
for key, value in data.items():
if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
for item in value:
chunks.extend(process_entry(item, context))
has_recursed = True
# Handler 3: Leaf nodes with delegation, composition or description
if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
text = build_descriptive_text(context)
# Split long descriptive text intelligently with adaptive chunk size
max_size = smart_chunk_size(data)
for chunk_text in split_text_into_chunks(text, max_char_length=max_size):
chunks.append(create_chunk(context, chunk_text))
return chunks
def main():
"""Main orchestration to read input, process, and write chunks."""
print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
all_chunks = []
try:
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
try:
data = json.loads(line)
processed = process_entry(data)
if processed:
all_chunks.extend(processed)
except json.JSONDecodeError:
print(f"Warning: Skipping malformed JSON on line {i+1}")
continue
except FileNotFoundError:
print(f"Error: Input file '{INPUT_FILE}' not found.")
return
print(f"Generated {len(all_chunks)} chunks before deduplication.")
# Deduplicate by text content (retaining last occurrences)
unique_chunks_map = {}
for chunk in all_chunks:
unique_chunks_map[chunk['text']] = chunk
unique_chunks = list(unique_chunks_map.values())
print(f"{len(unique_chunks)} unique chunks after deduplication.")
# Write output in JSONL format for later vector DB ingestion
with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
for chunk in unique_chunks:
outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
if __name__ == "__main__":
main()