Kalpokoch commited on
Commit
4fe8577
·
verified ·
1 Parent(s): a179120

Update create_granular_chunks.py

Browse files
Files changed (1) hide show
  1. create_granular_chunks.py +157 -354
create_granular_chunks.py CHANGED
@@ -1,414 +1,217 @@
1
- # create_granular_chunks.py - Enhanced Version for NEEPCO DOP Policies
2
-
3
  import os
4
  import json
5
  import re
6
- from typing import List, Dict, Any, Set
7
  import nltk
8
 
9
- # Download required NLTK data
10
- nltk.download('punkt', quiet=True)
11
- nltk.download('punkt_tab', quiet=True)
12
 
13
  # --- Configuration ---
14
  INPUT_FILE = "combined_context.jsonl"
15
- OUTPUT_FILE = "granular_chunks_final.jsonl"
 
16
 
17
  # --- Global State ---
18
  chunk_counter = 0
19
 
 
20
  def get_unique_id() -> str:
21
  """Returns a unique, incrementing ID for each chunk."""
22
  global chunk_counter
23
  chunk_counter += 1
24
  return f"chunk-{chunk_counter}"
25
 
26
- # --- Key Enhancement: NEEPCO-specific entity extraction ---
27
- def extract_key_entities(text: str) -> Dict[str, Set[str]]:
28
- """Extract key entities specific to NEEPCO DOP policies."""
29
- entities = {
30
- 'positions': set(),
31
- 'amounts': set(),
32
- 'sections': set(),
33
- 'procedures': set(),
34
- 'authorities': set()
35
  }
36
-
37
- # Position patterns (E-1 to E-9, specific roles)
38
- position_patterns = [
39
- r'\b(?:Director|CMD|ED|CGM|GM|DGM|Sr\.?\s*M(?:gr)?|Manager|HOP|HOD)\b',
40
- r'\bE-[1-9]\b',
41
- r'\b(?:Chairman|Secretary|Chief|Head)\b'
42
- ]
43
-
44
- # Amount patterns (₹, crore, lakh)
45
- amount_patterns = [
46
- r'₹\s*\d+(?:[.,]\d+)*\s*(?:crore|lakh|thousand)?',
47
- r'\d+(?:[.,]\d+)*\s*(?:crore|lakh|thousand)',
48
- r'Full\s+Power[s]?'
49
- ]
50
-
51
- # Section patterns
52
- section_patterns = [r'\b(?:Section|Annexure|Clause)\s*[IVX]+\b', r'\b(?:clause|sub-clause)\s*\d+\b']
53
-
54
- # Extract entities
55
- for pattern in position_patterns:
56
- entities['positions'].update(re.findall(pattern, text, re.IGNORECASE))
57
-
58
- for pattern in amount_patterns:
59
- entities['amounts'].update(re.findall(pattern, text, re.IGNORECASE))
60
-
61
- for pattern in section_patterns:
62
- entities['sections'].update(re.findall(pattern, text, re.IGNORECASE))
63
-
64
- return entities
65
-
66
- def create_question_answer_chunks(context: Dict) -> List[Dict]:
67
- """Create targeted Q&A style chunks that anticipate user questions."""
68
- chunks = []
69
-
70
- section = context.get("section", "")
71
- title = context.get("title", "")
72
- clause = context.get("clause") or context.get("Clause")
73
-
74
- # Generate approval authority questions
75
- if "delegation" in context:
76
- delegation = context["delegation"]
77
- if isinstance(delegation, dict):
78
- for authority, limit in delegation.items():
79
- if limit and str(limit) not in ["---", "NIL"]:
80
- qa_text = (f"Question: Who can approve {title.lower()} and what is their limit? "
81
- f"Answer: {authority} can approve {title.lower()} up to {limit}. "
82
- f"This is covered under {section} clause {clause}.")
83
-
84
- entities = extract_key_entities(qa_text)
85
-
86
- chunk = {
87
- "id": get_unique_id(),
88
- "text": qa_text,
89
- "metadata": {
90
- "section": section,
91
- "clause": clause,
92
- "title": title,
93
- "chunk_type": "approval_authority",
94
- "authority": authority,
95
- "limit": str(limit),
96
- "entities": {k: list(v) for k, v in entities.items() if v}
97
- }
98
- }
99
- chunks.append(chunk)
100
-
101
- # Generate procedure-specific chunks
102
- if "items" in context:
103
- for item in context["items"]:
104
- if isinstance(item, str):
105
- qa_text = (f"Question: What are the requirements for {title.lower()}? "
106
- f"Answer: For {title.lower()}, one requirement is: {item}. "
107
- f"This is specified in {section} clause {clause}.")
108
-
109
- entities = extract_key_entities(qa_text)
110
-
111
- chunk = {
112
- "id": get_unique_id(),
113
- "text": qa_text,
114
- "metadata": {
115
- "section": section,
116
- "clause": clause,
117
- "title": title,
118
- "chunk_type": "requirement",
119
- "requirement": item,
120
- "entities": {k: list(v) for k, v in entities.items() if v}
121
- }
122
- }
123
- chunks.append(chunk)
124
-
125
- return chunks
126
 
127
- def create_context_rich_chunks(context: Dict) -> List[Dict]:
128
- """Create chunks with rich contextual information."""
129
- chunks = []
130
- section = context.get("section", "")
131
- title = context.get("title", "")
132
- clause = context.get("clause") or context.get("Clause")
133
-
134
- # Handle delegation information with full context
135
- if "delegation" in context:
136
- delegation = context["delegation"]
137
- if isinstance(delegation, dict):
138
- # Create a comprehensive delegation summary
139
- delegation_items = []
140
- for auth, limit in delegation.items():
141
- if limit and str(limit) not in ["---", "NIL"]:
142
- delegation_items.append(f"{auth}: {limit}")
143
- elif str(limit) in ["---", "NIL"]:
144
- delegation_items.append(f"{auth}: No authority")
145
-
146
- if delegation_items:
147
- delegation_text = (f"In {section} clause {clause} regarding '{title}', "
148
- f"the delegation of powers is as follows: {'; '.join(delegation_items)}. ")
149
-
150
- # Add remarks if available
151
- if "remarks" in context:
152
- remarks = format_remarks(context["remarks"])
153
- delegation_text += f"Important notes: {remarks}"
154
-
155
- entities = extract_key_entities(delegation_text)
156
-
157
- chunk = {
158
- "id": get_unique_id(),
159
- "text": delegation_text,
160
- "metadata": {
161
- "section": section,
162
- "clause": clause,
163
- "title": title,
164
- "chunk_type": "delegation_summary",
165
- "delegation_count": len(delegation_items),
166
- "entities": {k: list(v) for k, v in entities.items() if v}
167
- }
168
- }
169
- chunks.append(chunk)
170
-
171
- # Handle composition information (for committees)
172
- if "composition" in context:
173
- composition = context["composition"]
174
- if isinstance(composition, list):
175
- comp_text = f"The composition for '{title}' in {section} clause {clause} includes: "
176
- comp_details = []
177
-
178
- for item in composition:
179
- if isinstance(item, dict):
180
- for role, members in item.items():
181
- if isinstance(members, list):
182
- comp_details.append(f"{role}: {', '.join(members)}")
183
- else:
184
- comp_details.append(f"{role}: {members}")
185
-
186
- comp_text += "; ".join(comp_details) + "."
187
-
188
- if "approving_authority" in context:
189
- comp_text += f" The approving authority is: {context['approving_authority']}."
190
-
191
- entities = extract_key_entities(comp_text)
192
-
193
- chunk = {
194
- "id": get_unique_id(),
195
- "text": comp_text,
196
- "metadata": {
197
- "section": section,
198
- "clause": clause,
199
- "title": title,
200
- "chunk_type": "composition",
201
- "entities": {k: list(v) for k, v in entities.items() if v}
202
- }
203
- }
204
- chunks.append(chunk)
205
-
206
- return chunks
207
 
208
- def create_method_specific_chunks(context: Dict) -> List[Dict]:
209
- """Handle method-specific information (like different tender types)."""
210
- chunks = []
211
-
212
- if "methods" in context:
213
- for method in context["methods"]:
214
- if isinstance(method, dict) and "method" in method:
215
- method_name = method["method"]
216
- delegation = method.get("delegation", {})
217
-
218
- if isinstance(delegation, dict):
219
- method_text = (f"For {context.get('title', 'procurement')} using {method_name}, "
220
- f"the approval limits are: ")
221
-
222
- limits = []
223
- for auth, limit in delegation.items():
224
- if limit and str(limit) not in ["---", "NIL"]:
225
- limits.append(f"{auth} can approve up to {limit}")
226
-
227
- method_text += "; ".join(limits) + f". This is covered under {context.get('section')} clause {context.get('clause')}."
228
-
229
- entities = extract_key_entities(method_text)
230
-
231
- chunk = {
232
- "id": get_unique_id(),
233
- "text": method_text,
234
- "metadata": {
235
- "section": context.get("section"),
236
- "clause": context.get("clause"),
237
- "title": context.get("title"),
238
- "method": method_name,
239
- "chunk_type": "method_specific",
240
- "entities": {k: list(v) for k, v in entities.items() if v}
241
- }
242
- }
243
- chunks.append(chunk)
244
-
245
- return chunks
246
 
247
  def format_remarks(remarks: Any) -> str:
248
- """Enhanced remarks formatting with better structure."""
249
  if isinstance(remarks, list):
250
- formatted_remarks = []
251
  for item in remarks:
252
  if isinstance(item, dict):
253
  for key, value in item.items():
254
- formatted_remarks.append(f"{key}: {value}")
255
  else:
256
- formatted_remarks.append(str(item))
257
- return " | ".join(formatted_remarks)
258
- return str(remarks) if remarks else ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
 
260
- def process_subclauses(subclauses: List[Dict], parent_context: Dict) -> List[Dict]:
261
- """Process subclauses with enhanced context preservation."""
 
 
 
 
 
 
 
 
 
 
262
  chunks = []
263
-
264
- for subclause in subclauses:
265
- if isinstance(subclause, dict):
266
- # Merge parent context
267
- full_context = {**parent_context, **subclause}
268
-
269
- # Generate different types of chunks
270
- chunks.extend(create_question_answer_chunks(full_context))
271
- chunks.extend(create_context_rich_chunks(full_context))
272
- chunks.extend(create_method_specific_chunks(full_context))
273
-
274
- # Recursively process nested structures
275
- if "subclauses" in subclause:
276
- chunks.extend(process_subclauses(subclause["subclauses"], full_context))
277
-
 
278
  return chunks
279
 
 
280
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
281
- """Enhanced entry processing with multiple chunking strategies."""
 
 
 
282
  context = {**(parent_context or {}), **data}
283
  chunks = []
284
-
285
- # Process subclauses first (most important for DOP policies)
286
- if "subclauses" in data:
287
- chunks.extend(process_subclauses(data["subclauses"], context))
288
-
289
- # Generate various chunk types for the main entry
290
- chunks.extend(create_question_answer_chunks(context))
291
- chunks.extend(create_context_rich_chunks(context))
292
- chunks.extend(create_method_specific_chunks(context))
293
-
294
- # Handle special cases for financial concurrence
295
- if context.get("section") == "Financial Concurrence":
296
- fc_text = (f"Financial Concurrence requirements for {context.get('title', 'this matter')}: "
297
- f"{context.get('description', 'See policy details')}. ")
298
-
299
- if "exclusions" in context:
300
- fc_text += f"Exclusions from financial concurrence: {'; '.join(context['exclusions'])}."
301
-
302
- entities = extract_key_entities(fc_text)
303
-
304
- chunk = {
305
- "id": get_unique_id(),
306
- "text": fc_text,
307
- "metadata": {
308
- "section": context.get("section"),
309
- "clause": context.get("clause"),
310
- "title": context.get("title"),
311
- "chunk_type": "financial_concurrence",
312
- "entities": {k: list(v) for k, v in entities.items() if v}
313
- }
314
- }
315
- chunks.append(chunk)
316
-
317
- # Handle Annexure items (Board-level approvals)
318
- if context.get("section") == "Annexure A":
319
- annexure_text = (f"Board of Directors approval is required for {context.get('title')}: "
320
- f"{context.get('description', 'various matters')}. ")
321
-
322
- if "items" in context:
323
- annexure_text += f"Specific items include: {'; '.join(context['items'])}."
324
-
325
- entities = extract_key_entities(annexure_text)
326
-
327
- chunk = {
328
- "id": get_unique_id(),
329
- "text": annexure_text,
330
- "metadata": {
331
- "section": context.get("section"),
332
- "clause": context.get("clause"),
333
- "title": context.get("title"),
334
- "chunk_type": "board_approval",
335
- "entities": {k: list(v) for k, v in entities.items() if v}
336
- }
337
- }
338
- chunks.append(chunk)
339
-
340
  return chunks
341
 
 
342
  def main():
343
- """Enhanced main function with better logging and deduplication."""
344
- print(f"Processing '{INPUT_FILE}' with enhanced NEEPCO DOP chunking...")
345
-
346
  all_chunks = []
347
- line_count = 0
348
-
349
  try:
350
  with open(INPUT_FILE, 'r', encoding='utf-8') as f:
351
  for i, line in enumerate(f):
352
- line_count += 1
353
  try:
354
  data = json.loads(line)
355
  processed = process_entry(data)
356
  if processed:
357
  all_chunks.extend(processed)
358
- if line_count % 10 == 0:
359
- print(f"Processed {line_count} lines, generated {len(all_chunks)} chunks so far...")
360
  except json.JSONDecodeError:
361
  print(f"Warning: Skipping malformed JSON on line {i+1}")
362
  continue
363
-
364
  except FileNotFoundError:
365
  print(f"Error: Input file '{INPUT_FILE}' not found.")
366
  return
367
-
368
- print(f"Generated {len(all_chunks)} total chunks from {line_count} lines.")
369
-
370
- # Enhanced deduplication by text similarity
371
- unique_chunks = []
372
- seen_texts = set()
373
-
374
  for chunk in all_chunks:
375
- # Create a normalized version for comparison
376
- normalized_text = re.sub(r'\s+', ' ', chunk['text'].lower().strip())
377
- if normalized_text not in seen_texts:
378
- seen_texts.add(normalized_text)
379
- unique_chunks.append(chunk)
380
-
381
- print(f"After deduplication: {len(unique_chunks)} unique chunks.")
382
-
383
- # Sort chunks by section and clause for better organization
384
- def sort_key(chunk):
385
- section = chunk['metadata'].get('section', 'ZZZ')
386
- clause = chunk['metadata'].get('clause', 999)
387
- if isinstance(clause, str):
388
- try:
389
- clause = int(re.search(r'\d+', clause).group())
390
- except:
391
- clause = 999
392
- return (section, clause)
393
-
394
- unique_chunks.sort(key=sort_key)
395
-
396
- # Write output
397
  with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
398
  for chunk in unique_chunks:
399
  outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
400
-
401
- print(f"Successfully wrote enhanced chunks to '{OUTPUT_FILE}'.")
402
-
403
- # Print some statistics
404
- chunk_types = {}
405
- for chunk in unique_chunks:
406
- chunk_type = chunk['metadata'].get('chunk_type', 'unknown')
407
- chunk_types[chunk_type] = chunk_types.get(chunk_type, 0) + 1
408
-
409
- print("\nChunk type distribution:")
410
- for chunk_type, count in sorted(chunk_types.items()):
411
- print(f" {chunk_type}: {count}")
412
 
413
  if __name__ == "__main__":
414
  main()
 
1
+ # create_granular_chunks.py
 
2
  import os
3
  import json
4
  import re
5
+ from typing import List, Dict, Any
6
  import nltk
7
 
8
+ # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
9
+ nltk.download('punkt')
10
+ nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError
11
 
12
  # --- Configuration ---
13
  INPUT_FILE = "combined_context.jsonl"
14
+ OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent
15
+
16
 
17
  # --- Global State ---
18
  chunk_counter = 0
19
 
20
+
21
  def get_unique_id() -> str:
22
  """Returns a unique, incrementing ID for each chunk."""
23
  global chunk_counter
24
  chunk_counter += 1
25
  return f"chunk-{chunk_counter}"
26
 
27
+
28
+ def create_chunk(context: Dict, text: str) -> Dict:
29
+ """Creates a standardized chunk dictionary with rich metadata."""
30
+ metadata = {
31
+ "section": context.get("section"),
32
+ "clause": context.get("clause") or context.get("Clause"),
33
+ "title": context.get("title"),
34
+ "source_description": context.get("description"),
 
35
  }
36
+ # Add other primitive metadata keys
37
+ for key, value in context.items():
38
+ if key not in metadata and isinstance(value, (str, int, float, bool)):
39
+ metadata[key] = value
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
+ return {
42
+ "id": get_unique_id(),
43
+ "text": text.strip(),
44
+ "metadata": {k: v for k, v in metadata.items() if v is not None}
45
+ }
46
+
47
+
48
+ def format_delegation_text(delegation: Any) -> str:
49
+ """
50
+ Formats a delegation dictionary or string into a readable string.
51
+ Explicitly includes "NIL" or "---" to capture no power cases.
52
+ """
53
+ if not isinstance(delegation, dict):
54
+ return str(delegation)
55
+ parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
56
+ return ", ".join(parts) if parts else "No specific delegation provided."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
  def format_remarks(remarks: Any) -> str:
60
+ """Safely formats the 'remarks' field, handling various data types."""
61
  if isinstance(remarks, list):
62
+ remark_parts = []
63
  for item in remarks:
64
  if isinstance(item, dict):
65
  for key, value in item.items():
66
+ remark_parts.append(f"{key}: {value}")
67
  else:
68
+ remark_parts.append(str(item))
69
+ return " ".join(remark_parts)
70
+ return str(remarks)
71
+
72
+
73
+ def build_descriptive_text(context: Dict) -> str:
74
+ """
75
+ Builds a clear, descriptive, natural language text by combining fields.
76
+ Focused for best relevance and contextual richness.
77
+ """
78
+ text_parts = []
79
+
80
+ if context.get("title"):
81
+ text_parts.append(f"Regarding the policy '{context['title']}'")
82
+
83
+ specific_desc = context.get('description') or context.get('method')
84
+ if specific_desc and specific_desc != context.get('title'):
85
+ text_parts.append(f"specifically for '{specific_desc}'")
86
+
87
+ if "delegation" in context:
88
+ delegation_text = format_delegation_text(context["delegation"])
89
+ text_parts.append(f", financial delegations are: {delegation_text}.")
90
+ elif "composition" in context:
91
+ composition_parts = []
92
+ for item in context["composition"]:
93
+ if isinstance(item, dict):
94
+ for role, members in item.items():
95
+ member_text = (f"the {role} is {members}" if isinstance(members, str)
96
+ else f"the {role} are: {', '.join(members)}")
97
+ composition_parts.append(member_text)
98
+ text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
99
+
100
+ if "remarks" in context and context["remarks"]:
101
+ remarks_text = format_remarks(context["remarks"])
102
+ text_parts.append(f" Important remarks include: {remarks_text}")
103
+
104
+ # Join all parts into a flowing sentence
105
+ return " ".join(text_parts).strip()
106
 
107
+
108
+ def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
109
+ """
110
+ Splits a long text into smaller chunks with controlled overlap.
111
+ Uses sentence tokenization for natural splits.
112
+ """
113
+ text = text.strip()
114
+ if len(text) <= max_char_length:
115
+ return [text]
116
+
117
+ # Explicitly specify language to avoid punkt_tab error
118
+ sentences = nltk.tokenize.sent_tokenize(text, language='english')
119
  chunks = []
120
+ current_chunk = ""
121
+
122
+ for sentence in sentences:
123
+ # +1 for space/newline likely added between sentences
124
+ if len(current_chunk) + len(sentence) + 1 <= max_char_length:
125
+ current_chunk += (" " + sentence) if current_chunk else sentence
126
+ else:
127
+ chunks.append(current_chunk.strip())
128
+ # Start next chunk with overlap from end of previous chunk (by characters)
129
+ if overlap < len(current_chunk):
130
+ current_chunk = current_chunk[-overlap:] + " " + sentence
131
+ else:
132
+ current_chunk = sentence
133
+
134
+ if current_chunk:
135
+ chunks.append(current_chunk.strip())
136
  return chunks
137
 
138
+
139
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
140
+ """
141
+ Processes a JSON policy entry and returns granular, context-rich chunks.
142
+ Applies recursive traversal and implements chunk size limiting.
143
+ """
144
  context = {**(parent_context or {}), **data}
145
  chunks = []
146
+
147
+ # Handler 1: Simple Item Lists (ex: rules, exclusions)
148
+ list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
149
+ if list_key:
150
+ base_title = context.get('title', 'a policy')
151
+ for item in data[list_key]:
152
+ if isinstance(item, str):
153
+ # Build chunk text with clear descriptive prefix for relevance
154
+ text = f"A rule regarding '{base_title}' is: {item}."
155
+ # Split if too long
156
+ for sub_chunk in split_text_into_chunks(text):
157
+ chunks.append(create_chunk(context, sub_chunk))
158
+ return chunks
159
+
160
+ # Handler 2: Recursive traversal for nested dictionaries/lists
161
+ has_recursed = False
162
+ for key, value in data.items():
163
+ if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
164
+ for item in value:
165
+ chunks.extend(process_entry(item, context))
166
+ has_recursed = True
167
+
168
+ # Handler 3: Leaf nodes with delegation, composition or description
169
+ if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
170
+ text = build_descriptive_text(context)
171
+ # Split long descriptive text intelligently
172
+ for chunk_text in split_text_into_chunks(text):
173
+ chunks.append(create_chunk(context, chunk_text))
174
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  return chunks
176
 
177
+
178
  def main():
179
+ """Main orchestration to read input, process, and write chunks."""
180
+ print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
 
181
  all_chunks = []
182
+
 
183
  try:
184
  with open(INPUT_FILE, 'r', encoding='utf-8') as f:
185
  for i, line in enumerate(f):
 
186
  try:
187
  data = json.loads(line)
188
  processed = process_entry(data)
189
  if processed:
190
  all_chunks.extend(processed)
 
 
191
  except json.JSONDecodeError:
192
  print(f"Warning: Skipping malformed JSON on line {i+1}")
193
  continue
 
194
  except FileNotFoundError:
195
  print(f"Error: Input file '{INPUT_FILE}' not found.")
196
  return
197
+
198
+ print(f"Generated {len(all_chunks)} chunks before deduplication.")
199
+
200
+ # Deduplicate by text content (retaining last occurrences)
201
+ unique_chunks_map = {}
 
 
202
  for chunk in all_chunks:
203
+ unique_chunks_map[chunk['text']] = chunk
204
+
205
+ unique_chunks = list(unique_chunks_map.values())
206
+ print(f"{len(unique_chunks)} unique chunks after deduplication.")
207
+
208
+ # Write output in JSONL format for later vector DB ingestion
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
  with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
210
  for chunk in unique_chunks:
211
  outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
212
+
213
+ print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
214
+
 
 
 
 
 
 
 
 
 
215
 
216
  if __name__ == "__main__":
217
  main()