graphwiz-ireland / src /text_processor.py
hirthickraj2015's picture
GraphWiz Ireland - Complete HF Spaces deployment
9679fcd
raw
history blame
9.47 kB
"""
Advanced Text Chunking and Preprocessing Pipeline
Intelligently chunks Wikipedia articles while preserving context and semantic coherence.
"""
import re
import json
from typing import List, Dict, Tuple
from dataclasses import dataclass
import spacy
from tqdm import tqdm
@dataclass
class TextChunk:
"""Represents a chunk of text with metadata"""
chunk_id: str
text: str
source_title: str
source_url: str
section: str
chunk_index: int
total_chunks: int
char_start: int
char_end: int
word_count: int
has_entities: bool = False
entities: List[Dict] = None
class AdvancedTextProcessor:
"""Advanced text processing with intelligent chunking"""
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128, spacy_model: str = "en_core_web_sm"):
self.chunk_size = chunk_size # tokens
self.chunk_overlap = chunk_overlap # tokens
# Load spaCy model for sentence segmentation and entity recognition
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"[INFO] Downloading spaCy model: {spacy_model}")
import subprocess
subprocess.run(["python", "-m", "spacy", "download", spacy_model])
self.nlp = spacy.load(spacy_model)
# Disable unnecessary components for speed
self.nlp.select_pipes(enable=["tok2vec", "tagger", "parser", "ner"])
def clean_text(self, text: str) -> str:
"""Clean Wikipedia text"""
if not text:
return ""
# Remove Wikipedia markup
text = re.sub(r'\{\{[^}]+\}\}', '', text) # Remove templates
text = re.sub(r'\[\[File:[^\]]+\]\]', '', text) # Remove file links
text = re.sub(r'\[\[Image:[^\]]+\]\]', '', text) # Remove image links
# Clean internal links but keep text
text = re.sub(r'\[\[([^|\]]+)\|([^\]]+)\]\]', r'\2', text) # [[Link|Text]] -> Text
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text) # [[Link]] -> Link
# Remove external links
text = re.sub(r'\[http[s]?://[^\]]+\]', '', text)
# Remove citations
text = re.sub(r'<ref[^>]*>.*?</ref>', '', text, flags=re.DOTALL)
text = re.sub(r'<ref[^>]*/?>', '', text)
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def chunk_by_sentences(self, text: str, source_title: str, source_url: str, section: str = "main") -> List[TextChunk]:
"""Chunk text by sentences with overlap"""
if not text:
return []
# Clean text first
text = self.clean_text(text)
# Process with spaCy
doc = self.nlp(text)
sentences = list(doc.sents)
if not sentences:
return []
chunks = []
current_chunk_tokens = []
current_chunk_start = 0
chunk_index = 0
for sent_idx, sent in enumerate(sentences):
sent_tokens = [token.text for token in sent]
# If adding this sentence exceeds chunk size, save current chunk
if len(current_chunk_tokens) + len(sent_tokens) > self.chunk_size and current_chunk_tokens:
# Create chunk
chunk_text = " ".join(current_chunk_tokens)
chunk = TextChunk(
chunk_id=f"{source_title.replace(' ', '_')}_{chunk_index}",
text=chunk_text,
source_title=source_title,
source_url=source_url,
section=section,
chunk_index=chunk_index,
total_chunks=0, # Will update later
char_start=current_chunk_start,
char_end=current_chunk_start + len(chunk_text),
word_count=len(current_chunk_tokens)
)
chunks.append(chunk)
chunk_index += 1
# Create overlap by keeping last N tokens
overlap_tokens = current_chunk_tokens[-self.chunk_overlap:] if len(current_chunk_tokens) > self.chunk_overlap else []
current_chunk_tokens = overlap_tokens + sent_tokens
current_chunk_start = current_chunk_start + len(chunk_text) - len(" ".join(overlap_tokens))
else:
current_chunk_tokens.extend(sent_tokens)
# Add final chunk
if current_chunk_tokens:
chunk_text = " ".join(current_chunk_tokens)
chunk = TextChunk(
chunk_id=f"{source_title.replace(' ', '_')}_{chunk_index}",
text=chunk_text,
source_title=source_title,
source_url=source_url,
section=section,
chunk_index=chunk_index,
total_chunks=0,
char_start=current_chunk_start,
char_end=current_chunk_start + len(chunk_text),
word_count=len(current_chunk_tokens)
)
chunks.append(chunk)
# Update total_chunks
for chunk in chunks:
chunk.total_chunks = len(chunks)
return chunks
def extract_entities(self, chunk: TextChunk) -> TextChunk:
"""Extract named entities from chunk"""
doc = self.nlp(chunk.text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
chunk.has_entities = len(entities) > 0
chunk.entities = entities
return chunk
def process_article(self, article: Dict) -> List[TextChunk]:
"""Process a single article into chunks"""
chunks = []
# Process main summary
if article.get("summary"):
summary_chunks = self.chunk_by_sentences(
article["summary"],
article["title"],
article["url"],
section="summary"
)
chunks.extend(summary_chunks)
# Process full text (skip summary part to avoid duplication)
if article.get("full_text"):
full_text = article["full_text"]
# Remove summary from full text if it's at the beginning
if article.get("summary") and full_text.startswith(article["summary"][:100]):
full_text = full_text[len(article["summary"]):]
main_chunks = self.chunk_by_sentences(
full_text,
article["title"],
article["url"],
section="full_article"
)
chunks.extend(main_chunks)
# Extract entities for all chunks
chunks = [self.extract_entities(chunk) for chunk in chunks]
return chunks
def process_all_articles(self, articles: List[Dict]) -> List[Dict]:
"""Process all articles into chunks"""
print(f"[INFO] Processing {len(articles)} articles into chunks...")
all_chunks = []
for article in tqdm(articles, desc="Processing articles"):
chunks = self.process_article(article)
all_chunks.extend(chunks)
print(f"[SUCCESS] Created {len(all_chunks)} chunks from {len(articles)} articles")
# Convert to dict for JSON serialization
chunks_dict = []
for chunk in all_chunks:
chunk_dict = {
"chunk_id": chunk.chunk_id,
"text": chunk.text,
"source_title": chunk.source_title,
"source_url": chunk.source_url,
"section": chunk.section,
"chunk_index": chunk.chunk_index,
"total_chunks": chunk.total_chunks,
"char_start": chunk.char_start,
"char_end": chunk.char_end,
"word_count": chunk.word_count,
"has_entities": chunk.has_entities,
"entities": chunk.entities if chunk.entities else []
}
chunks_dict.append(chunk_dict)
return chunks_dict
def save_chunks(self, chunks: List[Dict], output_path: str = "dataset/wikipedia_ireland/chunks.json"):
"""Save chunks to JSON file"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(chunks, f, ensure_ascii=False, indent=2)
# Save statistics
stats = {
"total_chunks": len(chunks),
"avg_chunk_length": sum(c["word_count"] for c in chunks) / len(chunks),
"chunks_with_entities": sum(1 for c in chunks if c["has_entities"]),
"total_entities": sum(len(c["entities"]) for c in chunks)
}
stats_path = output_path.replace("chunks.json", "chunk_stats.json")
with open(stats_path, 'w') as f:
json.dump(stats, f, indent=2)
print(f"[SUCCESS] Saved {len(chunks)} chunks to {output_path}")
print(f"[INFO] Statistics saved to {stats_path}")
return output_path
if __name__ == "__main__":
# Test with sample articles
with open("dataset/wikipedia_ireland/ireland_articles.json", 'r') as f:
articles = json.load(f)
processor = AdvancedTextProcessor(chunk_size=512, chunk_overlap=128)
chunks = processor.process_all_articles(articles)
processor.save_chunks(chunks)