ChatbotDemo / create_granular_chunks.py
Kalpokoch's picture
Update create_granular_chunks.py
1201e66 verified
raw
history blame
9.53 kB
# create_granular_chunks.py (place this in root directory)
import json
import re
import hashlib
from typing import List, Dict, Any, Set
import tiktoken
def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
"""Count tokens using tiktoken."""
try:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception:
# Fallback to simple word-based estimation
return len(text.split()) * 1.3
def extract_financial_keywords(text: str) -> List[str]:
"""Extract financial keywords from text."""
financial_patterns = [
r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?',
r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b',
r'\b(?:tender|contract|purchase|award)\b',
r'\b(?:crore|lakh|thousand)\b'
]
keywords = set()
for pattern in financial_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
keywords.update(matches)
return list(keywords)[:10] # Limit to 10 keywords
def extract_authority_keywords(text: str) -> List[str]:
"""Extract authority/designation keywords from text."""
authority_patterns = [
r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b',
r'\b(?:Director|Manager|Chief|Head)\b',
r'\b(?:CMD|BOD|HOP|HOD|HOF)\b',
r'\b(?:approval|sanction|delegation|authority|power)\b'
]
keywords = set()
for pattern in authority_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
keywords.update(matches)
return list(keywords)[:10] # Limit to 10 keywords
def create_chunk_text_from_item(item: Dict) -> str:
"""Create comprehensive chunk text from a single item."""
parts = []
# Add section and title context
if item.get('section'):
parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':")
# Add main description
if item.get('description'):
parts.append(item['description'])
# Add items if present
if item.get('items'):
if len(item['items']) == 1:
parts.append(f"This covers: {item['items'][0]}")
else:
parts.append("This covers the following:")
for i, sub_item in enumerate(item['items'], 1):
parts.append(f"{i}. {sub_item}")
# Add delegation information
if item.get('delegation'):
parts.append("Authority delegation:")
for role, limit in item['delegation'].items():
if limit and limit != "NIL":
parts.append(f"- {role}: {limit}")
# Add subclauses
if item.get('subclauses'):
parts.append("This includes:")
for subclause in item['subclauses']:
if subclause.get('description'):
parts.append(f"• {subclause['description']}")
if subclause.get('delegation'):
for role, limit in subclause['delegation'].items():
if limit and limit != "NIL":
parts.append(f" - {role}: {limit}")
# Add methods (for complex delegation structures)
if item.get('methods'):
for method in item['methods']:
if method.get('delegation'):
parts.append(f"For {method.get('method', 'this method')}:")
for role, limit in method['delegation'].items():
if limit and limit != "NIL":
parts.append(f"- {role}: {limit}")
# Add remarks
if item.get('remarks'):
parts.append("Important notes:")
if isinstance(item['remarks'], list):
for remark in item['remarks']:
if isinstance(remark, str):
parts.append(f"• {remark}")
elif isinstance(item['remarks'], str):
parts.append(f"• {item['remarks']}")
return " ".join(parts)
def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]:
"""Split text into chunks based on token count."""
sentences = re.split(r'[.!?]\s+', text)
chunks = []
current_chunk = ""
current_tokens = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
sentence_tokens = count_tokens(sentence)
# If adding this sentence would exceed max_tokens, finalize current chunk
if current_tokens + sentence_tokens > max_tokens and current_chunk:
chunks.append(current_chunk.strip())
# Start new chunk with overlap
if overlap_tokens > 0 and chunks:
overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation
current_chunk = overlap_text + " " + sentence
else:
current_chunk = sentence
current_tokens = count_tokens(current_chunk)
else:
current_chunk += (" " if current_chunk else "") + sentence
current_tokens += sentence_tokens
# Add the last chunk if it has content
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def create_chunk_hash(text: str) -> str:
"""Create a hash of the chunk text for deduplication."""
return hashlib.md5(text.encode('utf-8')).hexdigest()[:12]
def process_jsonl_file(file_path: str, output_path: str):
"""Process the JSONL file and create granular chunks."""
print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...")
all_chunks = []
chunk_hashes = set() # For deduplication
chunk_id_counter = 1
try:
with open(file_path, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
try:
item = json.loads(line.strip())
# Create comprehensive text from the item
chunk_text = create_chunk_text_from_item(item)
if not chunk_text.strip():
continue
# Split into token-based chunks
text_chunks = split_into_token_chunks(chunk_text)
for i, chunk in enumerate(text_chunks):
if not chunk.strip():
continue
# Check for duplicates
chunk_hash = create_chunk_hash(chunk)
if chunk_hash in chunk_hashes:
continue
chunk_hashes.add(chunk_hash)
# Extract keywords
financial_keywords = extract_financial_keywords(chunk)
authority_keywords = extract_authority_keywords(chunk)
# Create chunk object
chunk_obj = {
'id': f'chunk-{chunk_id_counter}',
'text': chunk,
'metadata': {
'section': item.get('section', ''),
'clause': item.get('clause', ''),
'title': item.get('title', ''),
'chunk_index': i,
'source_line': line_num,
'financial_keywords': financial_keywords,
'authority_keywords': authority_keywords,
'token_count': count_tokens(chunk)
}
}
all_chunks.append(chunk_obj)
chunk_id_counter += 1
except json.JSONDecodeError as e:
print(f"Warning: Invalid JSON on line {line_num}: {e}")
continue
except FileNotFoundError:
print(f"Error: File '{file_path}' not found.")
return
except Exception as e:
print(f"Error reading file: {e}")
return
print(f"Generated {len(all_chunks)} chunks before deduplication.")
print(f"{len(chunk_hashes)} unique chunks after deduplication.")
# Write chunks to output file
try:
with open(output_path, 'w', encoding='utf-8') as output_file:
for chunk in all_chunks:
json.dump(chunk, output_file, ensure_ascii=False)
output_file.write('\n')
print(f"Successfully wrote improved granular chunks to '{output_path}'.")
print(f"Sample chunk structure:")
if all_chunks:
sample = all_chunks[0]
print(f" ID: {sample['id']}")
print(f" Text length: {len(sample['text'])} chars")
print(f" Section: {sample['metadata']['section']}")
print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...")
print(f" Token count: {sample['metadata']['token_count']}")
except Exception as e:
print(f"Error writing output file: {e}")
if __name__ == "__main__":
input_file = "combined_context.jsonl"
output_file = "granular_chunks_final.jsonl"
process_jsonl_file(input_file, output_file)