# create_granular_chunks.py import os import json import re from typing import List, Dict, Any import nltk # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup) nltk.download('punkt') nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError # --- Configuration --- INPUT_FILE = "combined_context.jsonl" OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent # --- Global State --- chunk_counter = 0 def get_unique_id() -> str: """Returns a unique, incrementing ID for each chunk.""" global chunk_counter chunk_counter += 1 return f"chunk-{chunk_counter}" def create_chunk(context: Dict, text: str) -> Dict: """Creates a standardized chunk dictionary with rich metadata.""" metadata = { "section": context.get("section"), "clause": context.get("clause") or context.get("Clause"), "title": context.get("title"), "source_description": context.get("description"), } # Add other primitive metadata keys for key, value in context.items(): if key not in metadata and isinstance(value, (str, int, float, bool)): metadata[key] = value return { "id": get_unique_id(), "text": text.strip(), "metadata": {k: v for k, v in metadata.items() if v is not None} } def format_delegation_text(delegation: Any) -> str: """ Formats a delegation dictionary or string into a readable string. Explicitly includes "NIL" or "---" to capture no power cases. """ if not isinstance(delegation, dict): return str(delegation) parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()] return ", ".join(parts) if parts else "No specific delegation provided." def format_remarks(remarks: Any) -> str: """Safely formats the 'remarks' field, handling various data types.""" if isinstance(remarks, list): remark_parts = [] for item in remarks: if isinstance(item, dict): for key, value in item.items(): remark_parts.append(f"{key}: {value}") else: remark_parts.append(str(item)) return " ".join(remark_parts) return str(remarks) def smart_chunk_size(context: Dict) -> int: """ Adaptive chunk sizing based on content type. Smaller chunks for dense information, larger for descriptive. """ if "delegation" in context: return 1000 # Smaller for dense financial/delegation info elif "composition" in context: return 800 # Smaller for structural/hierarchical info elif "items" in context or "exclusions" in context: return 600 # Smaller for list-based info else: return 1500 # Default for descriptive content def build_descriptive_text(context: Dict) -> str: """ Builds a clear, descriptive, natural language text by combining fields. Focused for best relevance and contextual richness. """ text_parts = [] if context.get("title"): text_parts.append(f"Regarding the policy '{context['title']}'") specific_desc = context.get('description') or context.get('method') if specific_desc and specific_desc != context.get('title'): text_parts.append(f"specifically for '{specific_desc}'") if "delegation" in context: delegation_text = format_delegation_text(context["delegation"]) text_parts.append(f", financial delegations are: {delegation_text}.") elif "composition" in context: composition_parts = [] for item in context["composition"]: if isinstance(item, dict): for role, members in item.items(): member_text = (f"the {role} is {members}" if isinstance(members, str) else f"the {role} are: {', '.join(members)}") composition_parts.append(member_text) text_parts.append(f", the composition is: {'; '.join(composition_parts)}.") if "remarks" in context and context["remarks"]: remarks_text = format_remarks(context["remarks"]) text_parts.append(f" Important remarks include: {remarks_text}") # Join all parts into a flowing sentence return " ".join(text_parts).strip() def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]: """ Splits a long text into smaller chunks with controlled overlap. Uses sentence tokenization for natural splits. """ text = text.strip() if len(text) <= max_char_length: return [text] # Explicitly specify language to avoid punkt_tab error sentences = nltk.tokenize.sent_tokenize(text, language='english') chunks = [] current_chunk = "" for sentence in sentences: # +1 for space/newline likely added between sentences if len(current_chunk) + len(sentence) + 1 <= max_char_length: current_chunk += (" " + sentence) if current_chunk else sentence else: chunks.append(current_chunk.strip()) # Start next chunk with overlap from end of previous chunk (by characters) if overlap < len(current_chunk): current_chunk = current_chunk[-overlap:] + " " + sentence else: current_chunk = sentence if current_chunk: chunks.append(current_chunk.strip()) return chunks def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]: """ Processes a JSON policy entry and returns granular, context-rich chunks. Applies recursive traversal and implements chunk size limiting. """ context = {**(parent_context or {}), **data} chunks = [] # Handler 1: Simple Item Lists (ex: rules, exclusions) list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None) if list_key: base_title = context.get('title', 'a policy') for item in data[list_key]: if isinstance(item, str): # Build chunk text with clear descriptive prefix for relevance text = f"A rule regarding '{base_title}' is: {item}." # Split if too long for sub_chunk in split_text_into_chunks(text): chunks.append(create_chunk(context, sub_chunk)) return chunks # Handler 2: Recursive traversal for nested dictionaries/lists has_recursed = False for key, value in data.items(): if isinstance(value, list) and value and all(isinstance(item, dict) for item in value): for item in value: chunks.extend(process_entry(item, context)) has_recursed = True # Handler 3: Leaf nodes with delegation, composition or description if not has_recursed and ("delegation" in data or "composition" in data or "description" in data): text = build_descriptive_text(context) # Split long descriptive text intelligently with adaptive chunk size max_size = smart_chunk_size(data) for chunk_text in split_text_into_chunks(text, max_char_length=max_size): chunks.append(create_chunk(context, chunk_text)) return chunks def main(): """Main orchestration to read input, process, and write chunks.""" print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...") all_chunks = [] try: with open(INPUT_FILE, 'r', encoding='utf-8') as f: for i, line in enumerate(f): try: data = json.loads(line) processed = process_entry(data) if processed: all_chunks.extend(processed) except json.JSONDecodeError: print(f"Warning: Skipping malformed JSON on line {i+1}") continue except FileNotFoundError: print(f"Error: Input file '{INPUT_FILE}' not found.") return print(f"Generated {len(all_chunks)} chunks before deduplication.") # Deduplicate by text content (retaining last occurrences) unique_chunks_map = {} for chunk in all_chunks: unique_chunks_map[chunk['text']] = chunk unique_chunks = list(unique_chunks_map.values()) print(f"{len(unique_chunks)} unique chunks after deduplication.") # Write output in JSONL format for later vector DB ingestion with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf: for chunk in unique_chunks: outf.write(json.dumps(chunk, ensure_ascii=False) + "\n") print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.") if __name__ == "__main__": main()