File size: 8,791 Bytes
4fe8577
74a1e84
a9681af
 
4fe8577
74a1e84
1e6e534
4fe8577
 
 
74a1e84
 
 
4fe8577
 
74a1e84
 
 
 
4fe8577
74a1e84
 
 
 
 
 
4fe8577
 
 
 
 
 
 
 
74a1e84
4fe8577
 
 
 
74a1e84
4fe8577
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74a1e84
 
 
4fe8577
74a1e84
4fe8577
74a1e84
 
 
4fe8577
74a1e84
4fe8577
 
 
 
 
0194a83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fe8577
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74a1e84
4fe8577
 
 
 
 
 
 
 
 
 
 
 
1e6e534
4fe8577
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e6e534
 
4fe8577
74a1e84
4fe8577
 
 
 
74a1e84
 
4fe8577
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0194a83
 
 
4fe8577
 
74a1e84
 
4fe8577
74a1e84
4fe8577
 
448f148
4fe8577
a9681af
74a1e84
 
a9681af
74a1e84
 
 
 
 
 
a9681af
 
74a1e84
a9681af
4fe8577
 
 
 
 
74a1e84
4fe8577
 
 
 
 
 
74a1e84
 
 
4fe8577
 
 
a9681af
 
74a1e84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# create_granular_chunks.py
import os
import json
import re
from typing import List, Dict, Any
import nltk

# Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
nltk.download('punkt')
nltk.download('punkt_tab')  # Also download punkt_tab to avoid LookupError

# --- Configuration ---
INPUT_FILE = "combined_context.jsonl"
OUTPUT_FILE = "granular_chunks_final.jsonl"  # Keep filename consistent


# --- Global State ---
chunk_counter = 0


def get_unique_id() -> str:
    """Returns a unique, incrementing ID for each chunk."""
    global chunk_counter
    chunk_counter += 1
    return f"chunk-{chunk_counter}"


def create_chunk(context: Dict, text: str) -> Dict:
    """Creates a standardized chunk dictionary with rich metadata."""
    metadata = {
        "section": context.get("section"),
        "clause": context.get("clause") or context.get("Clause"),
        "title": context.get("title"),
        "source_description": context.get("description"),
    }
    # Add other primitive metadata keys
    for key, value in context.items():
        if key not in metadata and isinstance(value, (str, int, float, bool)):
            metadata[key] = value

    return {
        "id": get_unique_id(),
        "text": text.strip(),
        "metadata": {k: v for k, v in metadata.items() if v is not None}
    }


def format_delegation_text(delegation: Any) -> str:
    """
    Formats a delegation dictionary or string into a readable string.
    Explicitly includes "NIL" or "---" to capture no power cases.
    """
    if not isinstance(delegation, dict):
        return str(delegation)
    parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
    return ", ".join(parts) if parts else "No specific delegation provided."


def format_remarks(remarks: Any) -> str:
    """Safely formats the 'remarks' field, handling various data types."""
    if isinstance(remarks, list):
        remark_parts = []
        for item in remarks:
            if isinstance(item, dict):
                for key, value in item.items():
                    remark_parts.append(f"{key}: {value}")
            else:
                remark_parts.append(str(item))
        return " ".join(remark_parts)
    return str(remarks)


def smart_chunk_size(context: Dict) -> int:
    """
    Adaptive chunk sizing based on content type.
    Smaller chunks for dense information, larger for descriptive.
    """
    if "delegation" in context:
        return 1000  # Smaller for dense financial/delegation info
    elif "composition" in context:
        return 800   # Smaller for structural/hierarchical info
    elif "items" in context or "exclusions" in context:
        return 600   # Smaller for list-based info
    else:
        return 1500  # Default for descriptive content


def build_descriptive_text(context: Dict) -> str:
    """
    Builds a clear, descriptive, natural language text by combining fields.
    Focused for best relevance and contextual richness.
    """
    text_parts = []

    if context.get("title"):
        text_parts.append(f"Regarding the policy '{context['title']}'")

    specific_desc = context.get('description') or context.get('method')
    if specific_desc and specific_desc != context.get('title'):
        text_parts.append(f"specifically for '{specific_desc}'")

    if "delegation" in context:
        delegation_text = format_delegation_text(context["delegation"])
        text_parts.append(f", financial delegations are: {delegation_text}.")
    elif "composition" in context:
        composition_parts = []
        for item in context["composition"]:
            if isinstance(item, dict):
                for role, members in item.items():
                    member_text = (f"the {role} is {members}" if isinstance(members, str)
                                   else f"the {role} are: {', '.join(members)}")
                    composition_parts.append(member_text)
        text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")

    if "remarks" in context and context["remarks"]:
        remarks_text = format_remarks(context["remarks"])
        text_parts.append(f" Important remarks include: {remarks_text}")

    # Join all parts into a flowing sentence
    return " ".join(text_parts).strip()


def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
    """
    Splits a long text into smaller chunks with controlled overlap.
    Uses sentence tokenization for natural splits.
    """
    text = text.strip()
    if len(text) <= max_char_length:
        return [text]

    # Explicitly specify language to avoid punkt_tab error
    sentences = nltk.tokenize.sent_tokenize(text, language='english')
    chunks = []
    current_chunk = ""

    for sentence in sentences:
        # +1 for space/newline likely added between sentences
        if len(current_chunk) + len(sentence) + 1 <= max_char_length:
            current_chunk += (" " + sentence) if current_chunk else sentence
        else:
            chunks.append(current_chunk.strip())
            # Start next chunk with overlap from end of previous chunk (by characters)
            if overlap < len(current_chunk):
                current_chunk = current_chunk[-overlap:] + " " + sentence
            else:
                current_chunk = sentence

    if current_chunk:
        chunks.append(current_chunk.strip())
    return chunks


def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
    """
    Processes a JSON policy entry and returns granular, context-rich chunks.
    Applies recursive traversal and implements chunk size limiting.
    """
    context = {**(parent_context or {}), **data}
    chunks = []

    # Handler 1: Simple Item Lists (ex: rules, exclusions)
    list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
    if list_key:
        base_title = context.get('title', 'a policy')
        for item in data[list_key]:
            if isinstance(item, str):
                # Build chunk text with clear descriptive prefix for relevance
                text = f"A rule regarding '{base_title}' is: {item}."
                # Split if too long
                for sub_chunk in split_text_into_chunks(text):
                    chunks.append(create_chunk(context, sub_chunk))
        return chunks

    # Handler 2: Recursive traversal for nested dictionaries/lists
    has_recursed = False
    for key, value in data.items():
        if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
            for item in value:
                chunks.extend(process_entry(item, context))
            has_recursed = True

    # Handler 3: Leaf nodes with delegation, composition or description
    if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
        text = build_descriptive_text(context)
        # Split long descriptive text intelligently with adaptive chunk size
        max_size = smart_chunk_size(data)
        for chunk_text in split_text_into_chunks(text, max_char_length=max_size):
            chunks.append(create_chunk(context, chunk_text))

    return chunks


def main():
    """Main orchestration to read input, process, and write chunks."""
    print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
    all_chunks = []

    try:
        with open(INPUT_FILE, 'r', encoding='utf-8') as f:
            for i, line in enumerate(f):
                try:
                    data = json.loads(line)
                    processed = process_entry(data)
                    if processed:
                        all_chunks.extend(processed)
                except json.JSONDecodeError:
                    print(f"Warning: Skipping malformed JSON on line {i+1}")
                    continue
    except FileNotFoundError:
        print(f"Error: Input file '{INPUT_FILE}' not found.")
        return

    print(f"Generated {len(all_chunks)} chunks before deduplication.")

    # Deduplicate by text content (retaining last occurrences)
    unique_chunks_map = {}
    for chunk in all_chunks:
        unique_chunks_map[chunk['text']] = chunk

    unique_chunks = list(unique_chunks_map.values())
    print(f"{len(unique_chunks)} unique chunks after deduplication.")

    # Write output in JSONL format for later vector DB ingestion
    with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
        for chunk in unique_chunks:
            outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")

    print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")


if __name__ == "__main__":
    main()