|
|
""" |
|
|
Advanced Text Chunking and Preprocessing Pipeline |
|
|
Intelligently chunks Wikipedia articles while preserving context and semantic coherence. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
from typing import List, Dict, Tuple |
|
|
from dataclasses import dataclass |
|
|
import spacy |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TextChunk: |
|
|
"""Represents a chunk of text with metadata""" |
|
|
chunk_id: str |
|
|
text: str |
|
|
source_title: str |
|
|
source_url: str |
|
|
section: str |
|
|
chunk_index: int |
|
|
total_chunks: int |
|
|
char_start: int |
|
|
char_end: int |
|
|
word_count: int |
|
|
has_entities: bool = False |
|
|
entities: List[Dict] = None |
|
|
|
|
|
|
|
|
class AdvancedTextProcessor: |
|
|
"""Advanced text processing with intelligent chunking""" |
|
|
|
|
|
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128, spacy_model: str = "en_core_web_sm"): |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
|
|
|
|
|
|
try: |
|
|
self.nlp = spacy.load(spacy_model) |
|
|
except OSError: |
|
|
print(f"[INFO] Downloading spaCy model: {spacy_model}") |
|
|
import subprocess |
|
|
subprocess.run(["python", "-m", "spacy", "download", spacy_model]) |
|
|
self.nlp = spacy.load(spacy_model) |
|
|
|
|
|
|
|
|
self.nlp.select_pipes(enable=["tok2vec", "tagger", "parser", "ner"]) |
|
|
|
|
|
def clean_text(self, text: str) -> str: |
|
|
"""Clean Wikipedia text""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\{\{[^}]+\}\}', '', text) |
|
|
text = re.sub(r'\[\[File:[^\]]+\]\]', '', text) |
|
|
text = re.sub(r'\[\[Image:[^\]]+\]\]', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\[\[([^|\]]+)\|([^\]]+)\]\]', r'\2', text) |
|
|
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\[http[s]?://[^\]]+\]', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'<ref[^>]*>.*?</ref>', '', text, flags=re.DOTALL) |
|
|
text = re.sub(r'<ref[^>]*/?>', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'<[^>]+>', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
text = text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
def chunk_by_sentences(self, text: str, source_title: str, source_url: str, section: str = "main") -> List[TextChunk]: |
|
|
"""Chunk text by sentences with overlap""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
|
|
|
text = self.clean_text(text) |
|
|
|
|
|
|
|
|
doc = self.nlp(text) |
|
|
sentences = list(doc.sents) |
|
|
|
|
|
if not sentences: |
|
|
return [] |
|
|
|
|
|
chunks = [] |
|
|
current_chunk_tokens = [] |
|
|
current_chunk_start = 0 |
|
|
chunk_index = 0 |
|
|
|
|
|
for sent_idx, sent in enumerate(sentences): |
|
|
sent_tokens = [token.text for token in sent] |
|
|
|
|
|
|
|
|
if len(current_chunk_tokens) + len(sent_tokens) > self.chunk_size and current_chunk_tokens: |
|
|
|
|
|
chunk_text = " ".join(current_chunk_tokens) |
|
|
chunk = TextChunk( |
|
|
chunk_id=f"{source_title.replace(' ', '_')}_{chunk_index}", |
|
|
text=chunk_text, |
|
|
source_title=source_title, |
|
|
source_url=source_url, |
|
|
section=section, |
|
|
chunk_index=chunk_index, |
|
|
total_chunks=0, |
|
|
char_start=current_chunk_start, |
|
|
char_end=current_chunk_start + len(chunk_text), |
|
|
word_count=len(current_chunk_tokens) |
|
|
) |
|
|
chunks.append(chunk) |
|
|
chunk_index += 1 |
|
|
|
|
|
|
|
|
overlap_tokens = current_chunk_tokens[-self.chunk_overlap:] if len(current_chunk_tokens) > self.chunk_overlap else [] |
|
|
current_chunk_tokens = overlap_tokens + sent_tokens |
|
|
current_chunk_start = current_chunk_start + len(chunk_text) - len(" ".join(overlap_tokens)) |
|
|
else: |
|
|
current_chunk_tokens.extend(sent_tokens) |
|
|
|
|
|
|
|
|
if current_chunk_tokens: |
|
|
chunk_text = " ".join(current_chunk_tokens) |
|
|
chunk = TextChunk( |
|
|
chunk_id=f"{source_title.replace(' ', '_')}_{chunk_index}", |
|
|
text=chunk_text, |
|
|
source_title=source_title, |
|
|
source_url=source_url, |
|
|
section=section, |
|
|
chunk_index=chunk_index, |
|
|
total_chunks=0, |
|
|
char_start=current_chunk_start, |
|
|
char_end=current_chunk_start + len(chunk_text), |
|
|
word_count=len(current_chunk_tokens) |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
for chunk in chunks: |
|
|
chunk.total_chunks = len(chunks) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def extract_entities(self, chunk: TextChunk) -> TextChunk: |
|
|
"""Extract named entities from chunk""" |
|
|
doc = self.nlp(chunk.text) |
|
|
entities = [] |
|
|
|
|
|
for ent in doc.ents: |
|
|
entities.append({ |
|
|
"text": ent.text, |
|
|
"label": ent.label_, |
|
|
"start": ent.start_char, |
|
|
"end": ent.end_char |
|
|
}) |
|
|
|
|
|
chunk.has_entities = len(entities) > 0 |
|
|
chunk.entities = entities |
|
|
return chunk |
|
|
|
|
|
def process_article(self, article: Dict) -> List[TextChunk]: |
|
|
"""Process a single article into chunks""" |
|
|
chunks = [] |
|
|
|
|
|
|
|
|
if article.get("summary"): |
|
|
summary_chunks = self.chunk_by_sentences( |
|
|
article["summary"], |
|
|
article["title"], |
|
|
article["url"], |
|
|
section="summary" |
|
|
) |
|
|
chunks.extend(summary_chunks) |
|
|
|
|
|
|
|
|
if article.get("full_text"): |
|
|
full_text = article["full_text"] |
|
|
|
|
|
if article.get("summary") and full_text.startswith(article["summary"][:100]): |
|
|
full_text = full_text[len(article["summary"]):] |
|
|
|
|
|
main_chunks = self.chunk_by_sentences( |
|
|
full_text, |
|
|
article["title"], |
|
|
article["url"], |
|
|
section="full_article" |
|
|
) |
|
|
chunks.extend(main_chunks) |
|
|
|
|
|
|
|
|
chunks = [self.extract_entities(chunk) for chunk in chunks] |
|
|
|
|
|
return chunks |
|
|
|
|
|
def process_all_articles(self, articles: List[Dict]) -> List[Dict]: |
|
|
"""Process all articles into chunks""" |
|
|
print(f"[INFO] Processing {len(articles)} articles into chunks...") |
|
|
all_chunks = [] |
|
|
|
|
|
for article in tqdm(articles, desc="Processing articles"): |
|
|
chunks = self.process_article(article) |
|
|
all_chunks.extend(chunks) |
|
|
|
|
|
print(f"[SUCCESS] Created {len(all_chunks)} chunks from {len(articles)} articles") |
|
|
|
|
|
|
|
|
chunks_dict = [] |
|
|
for chunk in all_chunks: |
|
|
chunk_dict = { |
|
|
"chunk_id": chunk.chunk_id, |
|
|
"text": chunk.text, |
|
|
"source_title": chunk.source_title, |
|
|
"source_url": chunk.source_url, |
|
|
"section": chunk.section, |
|
|
"chunk_index": chunk.chunk_index, |
|
|
"total_chunks": chunk.total_chunks, |
|
|
"char_start": chunk.char_start, |
|
|
"char_end": chunk.char_end, |
|
|
"word_count": chunk.word_count, |
|
|
"has_entities": chunk.has_entities, |
|
|
"entities": chunk.entities if chunk.entities else [] |
|
|
} |
|
|
chunks_dict.append(chunk_dict) |
|
|
|
|
|
return chunks_dict |
|
|
|
|
|
def save_chunks(self, chunks: List[Dict], output_path: str = "dataset/wikipedia_ireland/chunks.json"): |
|
|
"""Save chunks to JSON file""" |
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(chunks, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
stats = { |
|
|
"total_chunks": len(chunks), |
|
|
"avg_chunk_length": sum(c["word_count"] for c in chunks) / len(chunks), |
|
|
"chunks_with_entities": sum(1 for c in chunks if c["has_entities"]), |
|
|
"total_entities": sum(len(c["entities"]) for c in chunks) |
|
|
} |
|
|
|
|
|
stats_path = output_path.replace("chunks.json", "chunk_stats.json") |
|
|
with open(stats_path, 'w') as f: |
|
|
json.dump(stats, f, indent=2) |
|
|
|
|
|
print(f"[SUCCESS] Saved {len(chunks)} chunks to {output_path}") |
|
|
print(f"[INFO] Statistics saved to {stats_path}") |
|
|
|
|
|
return output_path |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
with open("dataset/wikipedia_ireland/ireland_articles.json", 'r') as f: |
|
|
articles = json.load(f) |
|
|
|
|
|
processor = AdvancedTextProcessor(chunk_size=512, chunk_overlap=128) |
|
|
chunks = processor.process_all_articles(articles) |
|
|
processor.save_chunks(chunks) |
|
|
|