Spaces:
Running
Running
File size: 9,527 Bytes
1201e66 a9681af 1201e66 1e6e534 1201e66 1e6e534 b91ce4b 1201e66 1e6e534 1201e66 b91ce4b 1e6e534 1201e66 1e6e534 1201e66 1e6e534 1201e66 1e6e534 1201e66 448f148 1201e66 a9681af 1201e66 a9681af 1201e66 a9681af 1201e66 a9681af 1201e66 a9681af 1201e66 1e6e534 1201e66 a9681af 1201e66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# create_granular_chunks.py (place this in root directory)
import json
import re
import hashlib
from typing import List, Dict, Any, Set
import tiktoken
def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
"""Count tokens using tiktoken."""
try:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception:
# Fallback to simple word-based estimation
return len(text.split()) * 1.3
def extract_financial_keywords(text: str) -> List[str]:
"""Extract financial keywords from text."""
financial_patterns = [
r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?',
r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b',
r'\b(?:tender|contract|purchase|award)\b',
r'\b(?:crore|lakh|thousand)\b'
]
keywords = set()
for pattern in financial_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
keywords.update(matches)
return list(keywords)[:10] # Limit to 10 keywords
def extract_authority_keywords(text: str) -> List[str]:
"""Extract authority/designation keywords from text."""
authority_patterns = [
r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b',
r'\b(?:Director|Manager|Chief|Head)\b',
r'\b(?:CMD|BOD|HOP|HOD|HOF)\b',
r'\b(?:approval|sanction|delegation|authority|power)\b'
]
keywords = set()
for pattern in authority_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
keywords.update(matches)
return list(keywords)[:10] # Limit to 10 keywords
def create_chunk_text_from_item(item: Dict) -> str:
"""Create comprehensive chunk text from a single item."""
parts = []
# Add section and title context
if item.get('section'):
parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':")
# Add main description
if item.get('description'):
parts.append(item['description'])
# Add items if present
if item.get('items'):
if len(item['items']) == 1:
parts.append(f"This covers: {item['items'][0]}")
else:
parts.append("This covers the following:")
for i, sub_item in enumerate(item['items'], 1):
parts.append(f"{i}. {sub_item}")
# Add delegation information
if item.get('delegation'):
parts.append("Authority delegation:")
for role, limit in item['delegation'].items():
if limit and limit != "NIL":
parts.append(f"- {role}: {limit}")
# Add subclauses
if item.get('subclauses'):
parts.append("This includes:")
for subclause in item['subclauses']:
if subclause.get('description'):
parts.append(f"• {subclause['description']}")
if subclause.get('delegation'):
for role, limit in subclause['delegation'].items():
if limit and limit != "NIL":
parts.append(f" - {role}: {limit}")
# Add methods (for complex delegation structures)
if item.get('methods'):
for method in item['methods']:
if method.get('delegation'):
parts.append(f"For {method.get('method', 'this method')}:")
for role, limit in method['delegation'].items():
if limit and limit != "NIL":
parts.append(f"- {role}: {limit}")
# Add remarks
if item.get('remarks'):
parts.append("Important notes:")
if isinstance(item['remarks'], list):
for remark in item['remarks']:
if isinstance(remark, str):
parts.append(f"• {remark}")
elif isinstance(item['remarks'], str):
parts.append(f"• {item['remarks']}")
return " ".join(parts)
def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]:
"""Split text into chunks based on token count."""
sentences = re.split(r'[.!?]\s+', text)
chunks = []
current_chunk = ""
current_tokens = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
sentence_tokens = count_tokens(sentence)
# If adding this sentence would exceed max_tokens, finalize current chunk
if current_tokens + sentence_tokens > max_tokens and current_chunk:
chunks.append(current_chunk.strip())
# Start new chunk with overlap
if overlap_tokens > 0 and chunks:
overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation
current_chunk = overlap_text + " " + sentence
else:
current_chunk = sentence
current_tokens = count_tokens(current_chunk)
else:
current_chunk += (" " if current_chunk else "") + sentence
current_tokens += sentence_tokens
# Add the last chunk if it has content
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def create_chunk_hash(text: str) -> str:
"""Create a hash of the chunk text for deduplication."""
return hashlib.md5(text.encode('utf-8')).hexdigest()[:12]
def process_jsonl_file(file_path: str, output_path: str):
"""Process the JSONL file and create granular chunks."""
print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...")
all_chunks = []
chunk_hashes = set() # For deduplication
chunk_id_counter = 1
try:
with open(file_path, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
try:
item = json.loads(line.strip())
# Create comprehensive text from the item
chunk_text = create_chunk_text_from_item(item)
if not chunk_text.strip():
continue
# Split into token-based chunks
text_chunks = split_into_token_chunks(chunk_text)
for i, chunk in enumerate(text_chunks):
if not chunk.strip():
continue
# Check for duplicates
chunk_hash = create_chunk_hash(chunk)
if chunk_hash in chunk_hashes:
continue
chunk_hashes.add(chunk_hash)
# Extract keywords
financial_keywords = extract_financial_keywords(chunk)
authority_keywords = extract_authority_keywords(chunk)
# Create chunk object
chunk_obj = {
'id': f'chunk-{chunk_id_counter}',
'text': chunk,
'metadata': {
'section': item.get('section', ''),
'clause': item.get('clause', ''),
'title': item.get('title', ''),
'chunk_index': i,
'source_line': line_num,
'financial_keywords': financial_keywords,
'authority_keywords': authority_keywords,
'token_count': count_tokens(chunk)
}
}
all_chunks.append(chunk_obj)
chunk_id_counter += 1
except json.JSONDecodeError as e:
print(f"Warning: Invalid JSON on line {line_num}: {e}")
continue
except FileNotFoundError:
print(f"Error: File '{file_path}' not found.")
return
except Exception as e:
print(f"Error reading file: {e}")
return
print(f"Generated {len(all_chunks)} chunks before deduplication.")
print(f"{len(chunk_hashes)} unique chunks after deduplication.")
# Write chunks to output file
try:
with open(output_path, 'w', encoding='utf-8') as output_file:
for chunk in all_chunks:
json.dump(chunk, output_file, ensure_ascii=False)
output_file.write('\n')
print(f"Successfully wrote improved granular chunks to '{output_path}'.")
print(f"Sample chunk structure:")
if all_chunks:
sample = all_chunks[0]
print(f" ID: {sample['id']}")
print(f" Text length: {len(sample['text'])} chars")
print(f" Section: {sample['metadata']['section']}")
print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...")
print(f" Token count: {sample['metadata']['token_count']}")
except Exception as e:
print(f"Error writing output file: {e}")
if __name__ == "__main__":
input_file = "combined_context.jsonl"
output_file = "granular_chunks_final.jsonl"
process_jsonl_file(input_file, output_file)
|