Spaces:
Running
Running
| # create_granular_chunks.py (place this in root directory) | |
| import json | |
| import re | |
| import hashlib | |
| from typing import List, Dict, Any, Set | |
| import tiktoken | |
| def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int: | |
| """Count tokens using tiktoken.""" | |
| try: | |
| encoding = tiktoken.encoding_for_model(model) | |
| return len(encoding.encode(text)) | |
| except Exception: | |
| # Fallback to simple word-based estimation | |
| return len(text.split()) * 1.3 | |
| def extract_financial_keywords(text: str) -> List[str]: | |
| """Extract financial keywords from text.""" | |
| financial_patterns = [ | |
| r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?', | |
| r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b', | |
| r'\b(?:tender|contract|purchase|award)\b', | |
| r'\b(?:crore|lakh|thousand)\b' | |
| ] | |
| keywords = set() | |
| for pattern in financial_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| keywords.update(matches) | |
| return list(keywords)[:10] # Limit to 10 keywords | |
| def extract_authority_keywords(text: str) -> List[str]: | |
| """Extract authority/designation keywords from text.""" | |
| authority_patterns = [ | |
| r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b', | |
| r'\b(?:Director|Manager|Chief|Head)\b', | |
| r'\b(?:CMD|BOD|HOP|HOD|HOF)\b', | |
| r'\b(?:approval|sanction|delegation|authority|power)\b' | |
| ] | |
| keywords = set() | |
| for pattern in authority_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| keywords.update(matches) | |
| return list(keywords)[:10] # Limit to 10 keywords | |
| def create_chunk_text_from_item(item: Dict) -> str: | |
| """Create comprehensive chunk text from a single item.""" | |
| parts = [] | |
| # Add section and title context | |
| if item.get('section'): | |
| parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':") | |
| # Add main description | |
| if item.get('description'): | |
| parts.append(item['description']) | |
| # Add items if present | |
| if item.get('items'): | |
| if len(item['items']) == 1: | |
| parts.append(f"This covers: {item['items'][0]}") | |
| else: | |
| parts.append("This covers the following:") | |
| for i, sub_item in enumerate(item['items'], 1): | |
| parts.append(f"{i}. {sub_item}") | |
| # Add delegation information | |
| if item.get('delegation'): | |
| parts.append("Authority delegation:") | |
| for role, limit in item['delegation'].items(): | |
| if limit and limit != "NIL": | |
| parts.append(f"- {role}: {limit}") | |
| # Add subclauses | |
| if item.get('subclauses'): | |
| parts.append("This includes:") | |
| for subclause in item['subclauses']: | |
| if subclause.get('description'): | |
| parts.append(f"• {subclause['description']}") | |
| if subclause.get('delegation'): | |
| for role, limit in subclause['delegation'].items(): | |
| if limit and limit != "NIL": | |
| parts.append(f" - {role}: {limit}") | |
| # Add methods (for complex delegation structures) | |
| if item.get('methods'): | |
| for method in item['methods']: | |
| if method.get('delegation'): | |
| parts.append(f"For {method.get('method', 'this method')}:") | |
| for role, limit in method['delegation'].items(): | |
| if limit and limit != "NIL": | |
| parts.append(f"- {role}: {limit}") | |
| # Add remarks | |
| if item.get('remarks'): | |
| parts.append("Important notes:") | |
| if isinstance(item['remarks'], list): | |
| for remark in item['remarks']: | |
| if isinstance(remark, str): | |
| parts.append(f"• {remark}") | |
| elif isinstance(item['remarks'], str): | |
| parts.append(f"• {item['remarks']}") | |
| return " ".join(parts) | |
| def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]: | |
| """Split text into chunks based on token count.""" | |
| sentences = re.split(r'[.!?]\s+', text) | |
| chunks = [] | |
| current_chunk = "" | |
| current_tokens = 0 | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| sentence_tokens = count_tokens(sentence) | |
| # If adding this sentence would exceed max_tokens, finalize current chunk | |
| if current_tokens + sentence_tokens > max_tokens and current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| # Start new chunk with overlap | |
| if overlap_tokens > 0 and chunks: | |
| overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation | |
| current_chunk = overlap_text + " " + sentence | |
| else: | |
| current_chunk = sentence | |
| current_tokens = count_tokens(current_chunk) | |
| else: | |
| current_chunk += (" " if current_chunk else "") + sentence | |
| current_tokens += sentence_tokens | |
| # Add the last chunk if it has content | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def create_chunk_hash(text: str) -> str: | |
| """Create a hash of the chunk text for deduplication.""" | |
| return hashlib.md5(text.encode('utf-8')).hexdigest()[:12] | |
| def process_jsonl_file(file_path: str, output_path: str): | |
| """Process the JSONL file and create granular chunks.""" | |
| print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...") | |
| all_chunks = [] | |
| chunk_hashes = set() # For deduplication | |
| chunk_id_counter = 1 | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| for line_num, line in enumerate(file, 1): | |
| try: | |
| item = json.loads(line.strip()) | |
| # Create comprehensive text from the item | |
| chunk_text = create_chunk_text_from_item(item) | |
| if not chunk_text.strip(): | |
| continue | |
| # Split into token-based chunks | |
| text_chunks = split_into_token_chunks(chunk_text) | |
| for i, chunk in enumerate(text_chunks): | |
| if not chunk.strip(): | |
| continue | |
| # Check for duplicates | |
| chunk_hash = create_chunk_hash(chunk) | |
| if chunk_hash in chunk_hashes: | |
| continue | |
| chunk_hashes.add(chunk_hash) | |
| # Extract keywords | |
| financial_keywords = extract_financial_keywords(chunk) | |
| authority_keywords = extract_authority_keywords(chunk) | |
| # Create chunk object | |
| chunk_obj = { | |
| 'id': f'chunk-{chunk_id_counter}', | |
| 'text': chunk, | |
| 'metadata': { | |
| 'section': item.get('section', ''), | |
| 'clause': item.get('clause', ''), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': i, | |
| 'source_line': line_num, | |
| 'financial_keywords': financial_keywords, | |
| 'authority_keywords': authority_keywords, | |
| 'token_count': count_tokens(chunk) | |
| } | |
| } | |
| all_chunks.append(chunk_obj) | |
| chunk_id_counter += 1 | |
| except json.JSONDecodeError as e: | |
| print(f"Warning: Invalid JSON on line {line_num}: {e}") | |
| continue | |
| except FileNotFoundError: | |
| print(f"Error: File '{file_path}' not found.") | |
| return | |
| except Exception as e: | |
| print(f"Error reading file: {e}") | |
| return | |
| print(f"Generated {len(all_chunks)} chunks before deduplication.") | |
| print(f"{len(chunk_hashes)} unique chunks after deduplication.") | |
| # Write chunks to output file | |
| try: | |
| with open(output_path, 'w', encoding='utf-8') as output_file: | |
| for chunk in all_chunks: | |
| json.dump(chunk, output_file, ensure_ascii=False) | |
| output_file.write('\n') | |
| print(f"Successfully wrote improved granular chunks to '{output_path}'.") | |
| print(f"Sample chunk structure:") | |
| if all_chunks: | |
| sample = all_chunks[0] | |
| print(f" ID: {sample['id']}") | |
| print(f" Text length: {len(sample['text'])} chars") | |
| print(f" Section: {sample['metadata']['section']}") | |
| print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...") | |
| print(f" Token count: {sample['metadata']['token_count']}") | |
| except Exception as e: | |
| print(f"Error writing output file: {e}") | |
| if __name__ == "__main__": | |
| input_file = "combined_context.jsonl" | |
| output_file = "granular_chunks_final.jsonl" | |
| process_jsonl_file(input_file, output_file) | |