Spaces:
Running
Running
| # create_granular_chunks.py | |
| import os | |
| import json | |
| import re | |
| from typing import List, Dict, Any | |
| import nltk | |
| # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup) | |
| nltk.download('punkt') | |
| nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError | |
| # --- Configuration --- | |
| INPUT_FILE = "combined_context.jsonl" | |
| OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent | |
| # --- Global State --- | |
| chunk_counter = 0 | |
| def get_unique_id() -> str: | |
| """Returns a unique, incrementing ID for each chunk.""" | |
| global chunk_counter | |
| chunk_counter += 1 | |
| return f"chunk-{chunk_counter}" | |
| def create_chunk(context: Dict, text: str) -> Dict: | |
| """Creates a standardized chunk dictionary with rich metadata.""" | |
| metadata = { | |
| "section": context.get("section"), | |
| "clause": context.get("clause") or context.get("Clause"), | |
| "title": context.get("title"), | |
| "source_description": context.get("description"), | |
| } | |
| # Add other primitive metadata keys | |
| for key, value in context.items(): | |
| if key not in metadata and isinstance(value, (str, int, float, bool)): | |
| metadata[key] = value | |
| return { | |
| "id": get_unique_id(), | |
| "text": text.strip(), | |
| "metadata": {k: v for k, v in metadata.items() if v is not None} | |
| } | |
| def format_delegation_text(delegation: Any) -> str: | |
| """ | |
| Formats a delegation dictionary or string into a readable string. | |
| Explicitly includes "NIL" or "---" to capture no power cases. | |
| """ | |
| if not isinstance(delegation, dict): | |
| return str(delegation) | |
| parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()] | |
| return ", ".join(parts) if parts else "No specific delegation provided." | |
| def format_remarks(remarks: Any) -> str: | |
| """Safely formats the 'remarks' field, handling various data types.""" | |
| if isinstance(remarks, list): | |
| remark_parts = [] | |
| for item in remarks: | |
| if isinstance(item, dict): | |
| for key, value in item.items(): | |
| remark_parts.append(f"{key}: {value}") | |
| else: | |
| remark_parts.append(str(item)) | |
| return " ".join(remark_parts) | |
| return str(remarks) | |
| def smart_chunk_size(context: Dict) -> int: | |
| """ | |
| Adaptive chunk sizing based on content type. | |
| Smaller chunks for dense information, larger for descriptive. | |
| """ | |
| if "delegation" in context: | |
| return 1000 # Smaller for dense financial/delegation info | |
| elif "composition" in context: | |
| return 800 # Smaller for structural/hierarchical info | |
| elif "items" in context or "exclusions" in context: | |
| return 600 # Smaller for list-based info | |
| else: | |
| return 1500 # Default for descriptive content | |
| def build_descriptive_text(context: Dict) -> str: | |
| """ | |
| Builds a clear, descriptive, natural language text by combining fields. | |
| Focused for best relevance and contextual richness. | |
| """ | |
| text_parts = [] | |
| if context.get("title"): | |
| text_parts.append(f"Regarding the policy '{context['title']}'") | |
| specific_desc = context.get('description') or context.get('method') | |
| if specific_desc and specific_desc != context.get('title'): | |
| text_parts.append(f"specifically for '{specific_desc}'") | |
| if "delegation" in context: | |
| delegation_text = format_delegation_text(context["delegation"]) | |
| text_parts.append(f", financial delegations are: {delegation_text}.") | |
| elif "composition" in context: | |
| composition_parts = [] | |
| for item in context["composition"]: | |
| if isinstance(item, dict): | |
| for role, members in item.items(): | |
| member_text = (f"the {role} is {members}" if isinstance(members, str) | |
| else f"the {role} are: {', '.join(members)}") | |
| composition_parts.append(member_text) | |
| text_parts.append(f", the composition is: {'; '.join(composition_parts)}.") | |
| if "remarks" in context and context["remarks"]: | |
| remarks_text = format_remarks(context["remarks"]) | |
| text_parts.append(f" Important remarks include: {remarks_text}") | |
| # Join all parts into a flowing sentence | |
| return " ".join(text_parts).strip() | |
| def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]: | |
| """ | |
| Splits a long text into smaller chunks with controlled overlap. | |
| Uses sentence tokenization for natural splits. | |
| """ | |
| text = text.strip() | |
| if len(text) <= max_char_length: | |
| return [text] | |
| # Explicitly specify language to avoid punkt_tab error | |
| sentences = nltk.tokenize.sent_tokenize(text, language='english') | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| # +1 for space/newline likely added between sentences | |
| if len(current_chunk) + len(sentence) + 1 <= max_char_length: | |
| current_chunk += (" " + sentence) if current_chunk else sentence | |
| else: | |
| chunks.append(current_chunk.strip()) | |
| # Start next chunk with overlap from end of previous chunk (by characters) | |
| if overlap < len(current_chunk): | |
| current_chunk = current_chunk[-overlap:] + " " + sentence | |
| else: | |
| current_chunk = sentence | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]: | |
| """ | |
| Processes a JSON policy entry and returns granular, context-rich chunks. | |
| Applies recursive traversal and implements chunk size limiting. | |
| """ | |
| context = {**(parent_context or {}), **data} | |
| chunks = [] | |
| # Handler 1: Simple Item Lists (ex: rules, exclusions) | |
| list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None) | |
| if list_key: | |
| base_title = context.get('title', 'a policy') | |
| for item in data[list_key]: | |
| if isinstance(item, str): | |
| # Build chunk text with clear descriptive prefix for relevance | |
| text = f"A rule regarding '{base_title}' is: {item}." | |
| # Split if too long | |
| for sub_chunk in split_text_into_chunks(text): | |
| chunks.append(create_chunk(context, sub_chunk)) | |
| return chunks | |
| # Handler 2: Recursive traversal for nested dictionaries/lists | |
| has_recursed = False | |
| for key, value in data.items(): | |
| if isinstance(value, list) and value and all(isinstance(item, dict) for item in value): | |
| for item in value: | |
| chunks.extend(process_entry(item, context)) | |
| has_recursed = True | |
| # Handler 3: Leaf nodes with delegation, composition or description | |
| if not has_recursed and ("delegation" in data or "composition" in data or "description" in data): | |
| text = build_descriptive_text(context) | |
| # Split long descriptive text intelligently with adaptive chunk size | |
| max_size = smart_chunk_size(data) | |
| for chunk_text in split_text_into_chunks(text, max_char_length=max_size): | |
| chunks.append(create_chunk(context, chunk_text)) | |
| return chunks | |
| def main(): | |
| """Main orchestration to read input, process, and write chunks.""" | |
| print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...") | |
| all_chunks = [] | |
| try: | |
| with open(INPUT_FILE, 'r', encoding='utf-8') as f: | |
| for i, line in enumerate(f): | |
| try: | |
| data = json.loads(line) | |
| processed = process_entry(data) | |
| if processed: | |
| all_chunks.extend(processed) | |
| except json.JSONDecodeError: | |
| print(f"Warning: Skipping malformed JSON on line {i+1}") | |
| continue | |
| except FileNotFoundError: | |
| print(f"Error: Input file '{INPUT_FILE}' not found.") | |
| return | |
| print(f"Generated {len(all_chunks)} chunks before deduplication.") | |
| # Deduplicate by text content (retaining last occurrences) | |
| unique_chunks_map = {} | |
| for chunk in all_chunks: | |
| unique_chunks_map[chunk['text']] = chunk | |
| unique_chunks = list(unique_chunks_map.values()) | |
| print(f"{len(unique_chunks)} unique chunks after deduplication.") | |
| # Write output in JSONL format for later vector DB ingestion | |
| with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf: | |
| for chunk in unique_chunks: | |
| outf.write(json.dumps(chunk, ensure_ascii=False) + "\n") | |
| print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.") | |
| if __name__ == "__main__": | |
| main() | |