File size: 9,527 Bytes
1201e66
a9681af
 
1201e66
 
 
1e6e534
1201e66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e6e534
 
b91ce4b
1201e66
 
 
 
 
 
 
 
 
 
1e6e534
1201e66
 
 
 
b91ce4b
1e6e534
 
1201e66
 
 
 
 
 
 
1e6e534
1201e66
1e6e534
 
1201e66
 
 
1e6e534
1201e66
 
 
 
448f148
1201e66
 
 
a9681af
1201e66
 
a9681af
1201e66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9681af
1201e66
a9681af
1201e66
a9681af
1201e66
 
 
 
1e6e534
1201e66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9681af
 
1201e66
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# create_granular_chunks.py (place this in root directory)
import json
import re
import hashlib
from typing import List, Dict, Any, Set
import tiktoken

def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
    """Count tokens using tiktoken."""
    try:
        encoding = tiktoken.encoding_for_model(model)
        return len(encoding.encode(text))
    except Exception:
        # Fallback to simple word-based estimation
        return len(text.split()) * 1.3

def extract_financial_keywords(text: str) -> List[str]:
    """Extract financial keywords from text."""
    financial_patterns = [
        r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?',
        r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b',
        r'\b(?:tender|contract|purchase|award)\b',
        r'\b(?:crore|lakh|thousand)\b'
    ]
    
    keywords = set()
    for pattern in financial_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        keywords.update(matches)
    
    return list(keywords)[:10]  # Limit to 10 keywords

def extract_authority_keywords(text: str) -> List[str]:
    """Extract authority/designation keywords from text."""
    authority_patterns = [
        r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b',
        r'\b(?:Director|Manager|Chief|Head)\b',
        r'\b(?:CMD|BOD|HOP|HOD|HOF)\b',
        r'\b(?:approval|sanction|delegation|authority|power)\b'
    ]
    
    keywords = set()
    for pattern in authority_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        keywords.update(matches)
    
    return list(keywords)[:10]  # Limit to 10 keywords

def create_chunk_text_from_item(item: Dict) -> str:
    """Create comprehensive chunk text from a single item."""
    parts = []
    
    # Add section and title context
    if item.get('section'):
        parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':")
    
    # Add main description
    if item.get('description'):
        parts.append(item['description'])
    
    # Add items if present
    if item.get('items'):
        if len(item['items']) == 1:
            parts.append(f"This covers: {item['items'][0]}")
        else:
            parts.append("This covers the following:")
            for i, sub_item in enumerate(item['items'], 1):
                parts.append(f"{i}. {sub_item}")
    
    # Add delegation information
    if item.get('delegation'):
        parts.append("Authority delegation:")
        for role, limit in item['delegation'].items():
            if limit and limit != "NIL":
                parts.append(f"- {role}: {limit}")
    
    # Add subclauses
    if item.get('subclauses'):
        parts.append("This includes:")
        for subclause in item['subclauses']:
            if subclause.get('description'):
                parts.append(f"• {subclause['description']}")
            if subclause.get('delegation'):
                for role, limit in subclause['delegation'].items():
                    if limit and limit != "NIL":
                        parts.append(f"  - {role}: {limit}")
    
    # Add methods (for complex delegation structures)
    if item.get('methods'):
        for method in item['methods']:
            if method.get('delegation'):
                parts.append(f"For {method.get('method', 'this method')}:")
                for role, limit in method['delegation'].items():
                    if limit and limit != "NIL":
                        parts.append(f"- {role}: {limit}")
    
    # Add remarks
    if item.get('remarks'):
        parts.append("Important notes:")
        if isinstance(item['remarks'], list):
            for remark in item['remarks']:
                if isinstance(remark, str):
                    parts.append(f"• {remark}")
        elif isinstance(item['remarks'], str):
            parts.append(f"• {item['remarks']}")
    
    return " ".join(parts)

def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]:
    """Split text into chunks based on token count."""
    sentences = re.split(r'[.!?]\s+', text)
    chunks = []
    current_chunk = ""
    current_tokens = 0
    
    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue
            
        sentence_tokens = count_tokens(sentence)
        
        # If adding this sentence would exceed max_tokens, finalize current chunk
        if current_tokens + sentence_tokens > max_tokens and current_chunk:
            chunks.append(current_chunk.strip())
            
            # Start new chunk with overlap
            if overlap_tokens > 0 and chunks:
                overlap_text = current_chunk[-overlap_tokens*5:]  # Rough overlap estimation
                current_chunk = overlap_text + " " + sentence
            else:
                current_chunk = sentence
            current_tokens = count_tokens(current_chunk)
        else:
            current_chunk += (" " if current_chunk else "") + sentence
            current_tokens += sentence_tokens
    
    # Add the last chunk if it has content
    if current_chunk.strip():
        chunks.append(current_chunk.strip())
    
    return chunks

def create_chunk_hash(text: str) -> str:
    """Create a hash of the chunk text for deduplication."""
    return hashlib.md5(text.encode('utf-8')).hexdigest()[:12]

def process_jsonl_file(file_path: str, output_path: str):
    """Process the JSONL file and create granular chunks."""
    print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...")
    
    all_chunks = []
    chunk_hashes = set()  # For deduplication
    chunk_id_counter = 1
    
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            for line_num, line in enumerate(file, 1):
                try:
                    item = json.loads(line.strip())
                    
                    # Create comprehensive text from the item
                    chunk_text = create_chunk_text_from_item(item)
                    
                    if not chunk_text.strip():
                        continue
                    
                    # Split into token-based chunks
                    text_chunks = split_into_token_chunks(chunk_text)
                    
                    for i, chunk in enumerate(text_chunks):
                        if not chunk.strip():
                            continue
                            
                        # Check for duplicates
                        chunk_hash = create_chunk_hash(chunk)
                        if chunk_hash in chunk_hashes:
                            continue
                        chunk_hashes.add(chunk_hash)
                        
                        # Extract keywords
                        financial_keywords = extract_financial_keywords(chunk)
                        authority_keywords = extract_authority_keywords(chunk)
                        
                        # Create chunk object
                        chunk_obj = {
                            'id': f'chunk-{chunk_id_counter}',
                            'text': chunk,
                            'metadata': {
                                'section': item.get('section', ''),
                                'clause': item.get('clause', ''),
                                'title': item.get('title', ''),
                                'chunk_index': i,
                                'source_line': line_num,
                                'financial_keywords': financial_keywords,
                                'authority_keywords': authority_keywords,
                                'token_count': count_tokens(chunk)
                            }
                        }
                        
                        all_chunks.append(chunk_obj)
                        chunk_id_counter += 1
                        
                except json.JSONDecodeError as e:
                    print(f"Warning: Invalid JSON on line {line_num}: {e}")
                    continue
    
    except FileNotFoundError:
        print(f"Error: File '{file_path}' not found.")
        return
    except Exception as e:
        print(f"Error reading file: {e}")
        return
    
    print(f"Generated {len(all_chunks)} chunks before deduplication.")
    print(f"{len(chunk_hashes)} unique chunks after deduplication.")
    
    # Write chunks to output file
    try:
        with open(output_path, 'w', encoding='utf-8') as output_file:
            for chunk in all_chunks:
                json.dump(chunk, output_file, ensure_ascii=False)
                output_file.write('\n')
        
        print(f"Successfully wrote improved granular chunks to '{output_path}'.")
        print(f"Sample chunk structure:")
        if all_chunks:
            sample = all_chunks[0]
            print(f"  ID: {sample['id']}")
            print(f"  Text length: {len(sample['text'])} chars")
            print(f"  Section: {sample['metadata']['section']}")
            print(f"  Financial keywords: {sample['metadata']['financial_keywords'][:3]}...")
            print(f"  Token count: {sample['metadata']['token_count']}")
        
    except Exception as e:
        print(f"Error writing output file: {e}")

if __name__ == "__main__":
    input_file = "combined_context.jsonl"
    output_file = "granular_chunks_final.jsonl"
    process_jsonl_file(input_file, output_file)