Kalpokoch commited on
Commit
74a1e84
·
verified ·
1 Parent(s): ea6be63

Update create_granular_chunks.py

Browse files
Files changed (1) hide show
  1. create_granular_chunks.py +195 -219
create_granular_chunks.py CHANGED
@@ -1,241 +1,217 @@
1
- # create_granular_chunks.py (place this in root directory)
 
2
  import json
3
  import re
4
- import hashlib
5
- from typing import List, Dict, Any, Set
6
- import tiktoken
7
 
8
- def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
9
- """Count tokens using tiktoken."""
10
- try:
11
- encoding = tiktoken.encoding_for_model(model)
12
- return len(encoding.encode(text))
13
- except Exception:
14
- # Fallback to simple word-based estimation
15
- return len(text.split()) * 1.3
16
-
17
- def extract_financial_keywords(text: str) -> List[str]:
18
- """Extract financial keywords from text."""
19
- financial_patterns = [
20
- r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?',
21
- r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b',
22
- r'\b(?:tender|contract|purchase|award)\b',
23
- r'\b(?:crore|lakh|thousand)\b'
24
- ]
25
-
26
- keywords = set()
27
- for pattern in financial_patterns:
28
- matches = re.findall(pattern, text, re.IGNORECASE)
29
- keywords.update(matches)
30
-
31
- return list(keywords)[:10] # Limit to 10 keywords
32
-
33
- def extract_authority_keywords(text: str) -> List[str]:
34
- """Extract authority/designation keywords from text."""
35
- authority_patterns = [
36
- r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b',
37
- r'\b(?:Director|Manager|Chief|Head)\b',
38
- r'\b(?:CMD|BOD|HOP|HOD|HOF)\b',
39
- r'\b(?:approval|sanction|delegation|authority|power)\b'
40
- ]
41
-
42
- keywords = set()
43
- for pattern in authority_patterns:
44
- matches = re.findall(pattern, text, re.IGNORECASE)
45
- keywords.update(matches)
46
-
47
- return list(keywords)[:10] # Limit to 10 keywords
48
-
49
- def create_chunk_text_from_item(item: Dict) -> str:
50
- """Create comprehensive chunk text from a single item."""
51
- parts = []
52
-
53
- # Add section and title context
54
- if item.get('section'):
55
- parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':")
56
-
57
- # Add main description
58
- if item.get('description'):
59
- parts.append(item['description'])
60
-
61
- # Add items if present
62
- if item.get('items'):
63
- if len(item['items']) == 1:
64
- parts.append(f"This covers: {item['items'][0]}")
65
- else:
66
- parts.append("This covers the following:")
67
- for i, sub_item in enumerate(item['items'], 1):
68
- parts.append(f"{i}. {sub_item}")
69
-
70
- # Add delegation information
71
- if item.get('delegation'):
72
- parts.append("Authority delegation:")
73
- for role, limit in item['delegation'].items():
74
- if limit and limit != "NIL":
75
- parts.append(f"- {role}: {limit}")
76
-
77
- # Add subclauses
78
- if item.get('subclauses'):
79
- parts.append("This includes:")
80
- for subclause in item['subclauses']:
81
- if subclause.get('description'):
82
- parts.append(f"• {subclause['description']}")
83
- if subclause.get('delegation'):
84
- for role, limit in subclause['delegation'].items():
85
- if limit and limit != "NIL":
86
- parts.append(f" - {role}: {limit}")
87
-
88
- # Add methods (for complex delegation structures)
89
- if item.get('methods'):
90
- for method in item['methods']:
91
- if method.get('delegation'):
92
- parts.append(f"For {method.get('method', 'this method')}:")
93
- for role, limit in method['delegation'].items():
94
- if limit and limit != "NIL":
95
- parts.append(f"- {role}: {limit}")
96
-
97
- # Add remarks
98
- if item.get('remarks'):
99
- parts.append("Important notes:")
100
- if isinstance(item['remarks'], list):
101
- for remark in item['remarks']:
102
- if isinstance(remark, str):
103
- parts.append(f"• {remark}")
104
- elif isinstance(item['remarks'], str):
105
- parts.append(f" {item['remarks']}")
106
-
107
- return " ".join(parts)
108
-
109
- def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]:
110
- """Split text into chunks based on token count."""
111
- sentences = re.split(r'[.!?]\s+', text)
 
 
 
 
 
 
 
112
  chunks = []
113
  current_chunk = ""
114
- current_tokens = 0
115
-
116
  for sentence in sentences:
117
- sentence = sentence.strip()
118
- if not sentence:
119
- continue
120
-
121
- sentence_tokens = count_tokens(sentence)
122
-
123
- # If adding this sentence would exceed max_tokens, finalize current chunk
124
- if current_tokens + sentence_tokens > max_tokens and current_chunk:
125
  chunks.append(current_chunk.strip())
126
-
127
- # Start new chunk with overlap
128
- if overlap_tokens > 0 and chunks:
129
- overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation
130
- current_chunk = overlap_text + " " + sentence
131
  else:
132
  current_chunk = sentence
133
- current_tokens = count_tokens(current_chunk)
134
- else:
135
- current_chunk += (" " if current_chunk else "") + sentence
136
- current_tokens += sentence_tokens
137
-
138
- # Add the last chunk if it has content
139
- if current_chunk.strip():
140
  chunks.append(current_chunk.strip())
141
-
142
  return chunks
143
 
144
- def create_chunk_hash(text: str) -> str:
145
- """Create a hash of the chunk text for deduplication."""
146
- return hashlib.md5(text.encode('utf-8')).hexdigest()[:12]
147
 
148
- def process_jsonl_file(file_path: str, output_path: str):
149
- """Process the JSONL file and create granular chunks."""
150
- print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...")
151
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  all_chunks = []
153
- chunk_hashes = set() # For deduplication
154
- chunk_id_counter = 1
155
-
156
  try:
157
- with open(file_path, 'r', encoding='utf-8') as file:
158
- for line_num, line in enumerate(file, 1):
159
  try:
160
- item = json.loads(line.strip())
161
-
162
- # Create comprehensive text from the item
163
- chunk_text = create_chunk_text_from_item(item)
164
-
165
- if not chunk_text.strip():
166
- continue
167
-
168
- # Split into token-based chunks
169
- text_chunks = split_into_token_chunks(chunk_text)
170
-
171
- for i, chunk in enumerate(text_chunks):
172
- if not chunk.strip():
173
- continue
174
-
175
- # Check for duplicates
176
- chunk_hash = create_chunk_hash(chunk)
177
- if chunk_hash in chunk_hashes:
178
- continue
179
- chunk_hashes.add(chunk_hash)
180
-
181
- # Extract keywords
182
- financial_keywords = extract_financial_keywords(chunk)
183
- authority_keywords = extract_authority_keywords(chunk)
184
-
185
- # Create chunk object
186
- chunk_obj = {
187
- 'id': f'chunk-{chunk_id_counter}',
188
- 'text': chunk,
189
- 'metadata': {
190
- 'section': item.get('section', ''),
191
- 'clause': item.get('clause', ''),
192
- 'title': item.get('title', ''),
193
- 'chunk_index': i,
194
- 'source_line': line_num,
195
- 'financial_keywords': financial_keywords,
196
- 'authority_keywords': authority_keywords,
197
- 'token_count': count_tokens(chunk)
198
- }
199
- }
200
-
201
- all_chunks.append(chunk_obj)
202
- chunk_id_counter += 1
203
-
204
- except json.JSONDecodeError as e:
205
- print(f"Warning: Invalid JSON on line {line_num}: {e}")
206
  continue
207
-
208
  except FileNotFoundError:
209
- print(f"Error: File '{file_path}' not found.")
210
  return
211
- except Exception as e:
212
- print(f"Error reading file: {e}")
213
- return
214
-
215
  print(f"Generated {len(all_chunks)} chunks before deduplication.")
216
- print(f"{len(chunk_hashes)} unique chunks after deduplication.")
217
-
218
- # Write chunks to output file
219
- try:
220
- with open(output_path, 'w', encoding='utf-8') as output_file:
221
- for chunk in all_chunks:
222
- json.dump(chunk, output_file, ensure_ascii=False)
223
- output_file.write('\n')
224
-
225
- print(f"Successfully wrote improved granular chunks to '{output_path}'.")
226
- print(f"Sample chunk structure:")
227
- if all_chunks:
228
- sample = all_chunks[0]
229
- print(f" ID: {sample['id']}")
230
- print(f" Text length: {len(sample['text'])} chars")
231
- print(f" Section: {sample['metadata']['section']}")
232
- print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...")
233
- print(f" Token count: {sample['metadata']['token_count']}")
234
-
235
- except Exception as e:
236
- print(f"Error writing output file: {e}")
237
 
238
  if __name__ == "__main__":
239
- input_file = "combined_context.jsonl"
240
- output_file = "granular_chunks_final.jsonl"
241
- process_jsonl_file(input_file, output_file)
 
1
+ # create_granular_chunks.py
2
+ import os
3
  import json
4
  import re
5
+ from typing import List, Dict, Any
6
+ import nltk
 
7
 
8
+ # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
9
+ nltk.download('punkt')
10
+ nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError
11
+
12
+ # --- Configuration ---
13
+ INPUT_FILE = "combined_context.jsonl"
14
+ OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent
15
+
16
+
17
+ # --- Global State ---
18
+ chunk_counter = 0
19
+
20
+
21
+ def get_unique_id() -> str:
22
+ """Returns a unique, incrementing ID for each chunk."""
23
+ global chunk_counter
24
+ chunk_counter += 1
25
+ return f"chunk-{chunk_counter}"
26
+
27
+
28
+ def create_chunk(context: Dict, text: str) -> Dict:
29
+ """Creates a standardized chunk dictionary with rich metadata."""
30
+ metadata = {
31
+ "section": context.get("section"),
32
+ "clause": context.get("clause") or context.get("Clause"),
33
+ "title": context.get("title"),
34
+ "source_description": context.get("description"),
35
+ }
36
+ # Add other primitive metadata keys
37
+ for key, value in context.items():
38
+ if key not in metadata and isinstance(value, (str, int, float, bool)):
39
+ metadata[key] = value
40
+
41
+ return {
42
+ "id": get_unique_id(),
43
+ "text": text.strip(),
44
+ "metadata": {k: v for k, v in metadata.items() if v is not None}
45
+ }
46
+
47
+
48
+ def format_delegation_text(delegation: Any) -> str:
49
+ """
50
+ Formats a delegation dictionary or string into a readable string.
51
+ Explicitly includes "NIL" or "---" to capture no power cases.
52
+ """
53
+ if not isinstance(delegation, dict):
54
+ return str(delegation)
55
+ parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
56
+ return ", ".join(parts) if parts else "No specific delegation provided."
57
+
58
+
59
+ def format_remarks(remarks: Any) -> str:
60
+ """Safely formats the 'remarks' field, handling various data types."""
61
+ if isinstance(remarks, list):
62
+ remark_parts = []
63
+ for item in remarks:
64
+ if isinstance(item, dict):
65
+ for key, value in item.items():
66
+ remark_parts.append(f"{key}: {value}")
67
+ else:
68
+ remark_parts.append(str(item))
69
+ return " ".join(remark_parts)
70
+ return str(remarks)
71
+
72
+
73
+ def build_descriptive_text(context: Dict) -> str:
74
+ """
75
+ Builds a clear, descriptive, natural language text by combining fields.
76
+ Focused for best relevance and contextual richness.
77
+ """
78
+ text_parts = []
79
+
80
+ if context.get("title"):
81
+ text_parts.append(f"Regarding the policy '{context['title']}'")
82
+
83
+ specific_desc = context.get('description') or context.get('method')
84
+ if specific_desc and specific_desc != context.get('title'):
85
+ text_parts.append(f"specifically for '{specific_desc}'")
86
+
87
+ if "delegation" in context:
88
+ delegation_text = format_delegation_text(context["delegation"])
89
+ text_parts.append(f", financial delegations are: {delegation_text}.")
90
+ elif "composition" in context:
91
+ composition_parts = []
92
+ for item in context["composition"]:
93
+ if isinstance(item, dict):
94
+ for role, members in item.items():
95
+ member_text = (f"the {role} is {members}" if isinstance(members, str)
96
+ else f"the {role} are: {', '.join(members)}")
97
+ composition_parts.append(member_text)
98
+ text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
99
+
100
+ if "remarks" in context and context["remarks"]:
101
+ remarks_text = format_remarks(context["remarks"])
102
+ text_parts.append(f" Important remarks include: {remarks_text}")
103
+
104
+ # Join all parts into a flowing sentence
105
+ return " ".join(text_parts).strip()
106
+
107
+
108
+ def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
109
+ """
110
+ Splits a long text into smaller chunks with controlled overlap.
111
+ Uses sentence tokenization for natural splits.
112
+ """
113
+ text = text.strip()
114
+ if len(text) <= max_char_length:
115
+ return [text]
116
+
117
+ # Explicitly specify language to avoid punkt_tab error
118
+ sentences = nltk.tokenize.sent_tokenize(text, language='english')
119
  chunks = []
120
  current_chunk = ""
121
+
 
122
  for sentence in sentences:
123
+ # +1 for space/newline likely added between sentences
124
+ if len(current_chunk) + len(sentence) + 1 <= max_char_length:
125
+ current_chunk += (" " + sentence) if current_chunk else sentence
126
+ else:
 
 
 
 
127
  chunks.append(current_chunk.strip())
128
+ # Start next chunk with overlap from end of previous chunk (by characters)
129
+ if overlap < len(current_chunk):
130
+ current_chunk = current_chunk[-overlap:] + " " + sentence
 
 
131
  else:
132
  current_chunk = sentence
133
+
134
+ if current_chunk:
 
 
 
 
 
135
  chunks.append(current_chunk.strip())
 
136
  return chunks
137
 
 
 
 
138
 
139
+ def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
140
+ """
141
+ Processes a JSON policy entry and returns granular, context-rich chunks.
142
+ Applies recursive traversal and implements chunk size limiting.
143
+ """
144
+ context = {**(parent_context or {}), **data}
145
+ chunks = []
146
+
147
+ # Handler 1: Simple Item Lists (ex: rules, exclusions)
148
+ list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
149
+ if list_key:
150
+ base_title = context.get('title', 'a policy')
151
+ for item in data[list_key]:
152
+ if isinstance(item, str):
153
+ # Build chunk text with clear descriptive prefix for relevance
154
+ text = f"A rule regarding '{base_title}' is: {item}."
155
+ # Split if too long
156
+ for sub_chunk in split_text_into_chunks(text):
157
+ chunks.append(create_chunk(context, sub_chunk))
158
+ return chunks
159
+
160
+ # Handler 2: Recursive traversal for nested dictionaries/lists
161
+ has_recursed = False
162
+ for key, value in data.items():
163
+ if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
164
+ for item in value:
165
+ chunks.extend(process_entry(item, context))
166
+ has_recursed = True
167
+
168
+ # Handler 3: Leaf nodes with delegation, composition or description
169
+ if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
170
+ text = build_descriptive_text(context)
171
+ # Split long descriptive text intelligently
172
+ for chunk_text in split_text_into_chunks(text):
173
+ chunks.append(create_chunk(context, chunk_text))
174
+
175
+ return chunks
176
+
177
+
178
+ def main():
179
+ """Main orchestration to read input, process, and write chunks."""
180
+ print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
181
  all_chunks = []
182
+
 
 
183
  try:
184
+ with open(INPUT_FILE, 'r', encoding='utf-8') as f:
185
+ for i, line in enumerate(f):
186
  try:
187
+ data = json.loads(line)
188
+ processed = process_entry(data)
189
+ if processed:
190
+ all_chunks.extend(processed)
191
+ except json.JSONDecodeError:
192
+ print(f"Warning: Skipping malformed JSON on line {i+1}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
  continue
 
194
  except FileNotFoundError:
195
+ print(f"Error: Input file '{INPUT_FILE}' not found.")
196
  return
197
+
 
 
 
198
  print(f"Generated {len(all_chunks)} chunks before deduplication.")
199
+
200
+ # Deduplicate by text content (retaining last occurrences)
201
+ unique_chunks_map = {}
202
+ for chunk in all_chunks:
203
+ unique_chunks_map[chunk['text']] = chunk
204
+
205
+ unique_chunks = list(unique_chunks_map.values())
206
+ print(f"{len(unique_chunks)} unique chunks after deduplication.")
207
+
208
+ # Write output in JSONL format for later vector DB ingestion
209
+ with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
210
+ for chunk in unique_chunks:
211
+ outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
212
+
213
+ print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
214
+
 
 
 
 
 
215
 
216
  if __name__ == "__main__":
217
+ main()