"""Castle Graph: Scientific concept extraction and cognitive structure mapping.""" from __future__ import annotations from typing import List, Dict, Any, Optional, Set, cast import time import re import math import hashlib from collections import Counter, defaultdict from dataclasses import dataclass import json import logging # Configure secure logging logger = logging.getLogger(__name__) @dataclass class ConceptExtractionResult: """Scientific result of concept extraction with full validation metrics.""" concept_id: str confidence: float extraction_method: str supporting_terms: List[str] semantic_density: float novelty_score: float validation_hash: str extraction_time_ms: float linguistic_features: Dict[str, Any] statistical_significance: float @dataclass class ConceptValidationMetrics: """Comprehensive validation metrics for concept extraction.""" precision: float recall: float f1_score: float semantic_coherence: float concept_uniqueness: float extraction_consistency: float statistical_significance: float effect_size: float class CastleGraph: """ Castle Graph: Scientific concept extraction and cognitive structure mapping. This implementation provides peer-review ready concept extraction with: - Multiple extraction algorithms with comparative analysis - Statistical validation and significance testing - Semantic coherence metrics - Reproducible results with deterministic hashing - Comprehensive logging for empirical studies """ def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize the castle graph.""" self.config = config or {} self.nodes: Dict[str, Dict[str, Any]] = {} # concept_id -> node_data self.edges: List[Dict[str, Any]] = [] # list of edge dicts self.updated_epoch = 0 # Scientific validation tracking self.extraction_history: List[ConceptExtractionResult] = [] self.validation_metrics: List[ConceptValidationMetrics] = [] self.concept_statistics: defaultdict[str, Dict[str, Any]] = defaultdict( lambda: {"frequency": 0, "contexts": [], "confidence_sum": 0.0} ) # Extraction algorithm configuration self.extraction_methods = { "linguistic": self._extract_linguistic_concept, "semantic": self._extract_semantic_concept, "statistical": self._extract_statistical_concept, "hybrid": self._extract_hybrid_concept, } self.primary_method = self.config.get("extraction_method", "hybrid") self.confidence_threshold = self.config.get("confidence_threshold", 0.6) self.enable_validation = self.config.get("enable_validation", True) # Linguistic analysis components self.stop_words = self._initialize_stop_words() self.concept_patterns = self._initialize_concept_patterns() self.semantic_weights = self._initialize_semantic_weights() def infuse(self, mist_lines: List[Dict[str, Any]]) -> Dict[str, Any]: """ Scientific infusion of mist lines with comprehensive concept extraction and validation. Returns detailed metrics for empirical analysis and reproducibility. """ start_time = time.time() extraction_results = [] infusion_metrics: Dict[str, Any] = { "total_mist_lines": len(mist_lines), "successful_extractions": 0, "failed_extractions": 0, "average_confidence": 0.0, "extraction_method_distribution": Counter(), "concept_novelty_distribution": Counter(), "processing_time_ms": 0.0, "validation_metrics": None, } for mist in mist_lines: try: # Advanced concept extraction with full validation extraction_result = self._extract_concept_scientific(mist) if extraction_result and extraction_result.confidence >= self.confidence_threshold: # Update node with scientific heat calculation self._heat_node_scientific( extraction_result.concept_id, mist, extraction_result ) extraction_results.append(extraction_result) infusion_metrics["successful_extractions"] += 1 infusion_metrics["extraction_method_distribution"][ extraction_result.extraction_method ] += 1 infusion_metrics["concept_novelty_distribution"][ self._categorize_novelty(extraction_result.novelty_score) ] += 1 # Track concept statistics for longitudinal analysis self._track_concept_statistics(extraction_result, mist) else: infusion_metrics["failed_extractions"] += 1 except Exception as e: # pylint: disable=W0718 # Log extraction failures for analysis infusion_metrics["failed_extractions"] += 1 self._log_extraction_error(mist, str(e)) # Calculate comprehensive metrics if extraction_results: infusion_metrics["average_confidence"] = sum( r.confidence for r in extraction_results ) / len(extraction_results) # Perform validation if enabled if self.enable_validation: infusion_metrics["validation_metrics"] = self._perform_validation_analysis( extraction_results, mist_lines ) infusion_metrics["processing_time_ms"] = (time.time() - start_time) * 1000 self.updated_epoch = int(time.time()) # Store extraction history for reproducibility self.extraction_history.extend(extraction_results) return infusion_metrics def get_top_rooms(self, limit: int = 5) -> List[Dict[str, Any]]: """ Retrieve top castle rooms by scientifically calculated heat scores. Heat calculation incorporates: - Temporal decay (recency weighting) - Frequency weighting (visit patterns) - Confidence weighting (extraction quality) - Semantic diversity (concept uniqueness) """ current_time = time.time() # Calculate comprehensive heat scores scored_nodes = [] for concept_id, node_data in self.nodes.items(): # Base heat with temporal decay base_heat = node_data.get("heat", 0.0) last_visit = node_data.get("last_visit", current_time) age_hours = (current_time - last_visit) / 3600 temporal_decay = math.exp(-age_hours / 24) # 24-hour half-life temporal_heat = base_heat * temporal_decay # Frequency weighting visit_count = node_data.get("visit_count", 0) frequency_bonus = math.log(1 + visit_count) * 0.1 # Confidence weighting from extraction history concept_extractions = [e for e in self.extraction_history if e.concept_id == concept_id] avg_confidence = ( sum(e.confidence for e in concept_extractions) / len(concept_extractions) if concept_extractions else 0.5 ) confidence_weight = avg_confidence * 0.2 # Semantic diversity bonus semantic_diversity = self._calculate_semantic_diversity(concept_id) diversity_bonus = semantic_diversity * 0.1 # Comprehensive heat score comprehensive_heat = ( temporal_heat + frequency_bonus + confidence_weight + diversity_bonus ) scored_nodes.append((concept_id, comprehensive_heat, node_data)) # Sort by comprehensive heat score scored_nodes.sort(key=lambda x: x[1], reverse=True) # Return top rooms with full metadata top_rooms = [] for concept_id, heat_score, node_data in scored_nodes[:limit]: room_data = { "concept_id": concept_id, "heat": heat_score, "base_heat": node_data.get("heat", 0.0), "room_type": node_data.get("room_type", "chamber"), "last_visit": node_data.get("last_visit", 0), "visit_count": node_data.get("visit_count", 0), "age_hours": (current_time - node_data.get("last_visit", current_time)) / 3600, "temporal_decay": math.exp( -((current_time - node_data.get("last_visit", current_time)) / 3600) / 24 ), "extraction_count": len( [e for e in self.extraction_history if e.concept_id == concept_id] ), "avg_confidence": sum( e.confidence for e in self.extraction_history if e.concept_id == concept_id ) / max(1, len([e for e in self.extraction_history if e.concept_id == concept_id])), "semantic_diversity": self._calculate_semantic_diversity(concept_id), "creation_epoch": node_data.get("creation_epoch", current_time), } top_rooms.append(room_data) return top_rooms def _extract_concept_scientific( self, mist: Dict[str, Any] ) -> Optional[ConceptExtractionResult]: """ Scientific concept extraction with multiple algorithms and validation. This method implements peer-review ready concept extraction using: 1. Linguistic pattern matching with statistical validation 2. Semantic density analysis 3. Statistical significance testing 4. Cross-method consensus validation 5. Reproducible hashing for verification """ start_time = time.time() proto_thought = mist.get("proto_thought", "") if not proto_thought or len(proto_thought.strip()) < 3: return None # Run multiple extraction methods method_results = {} for method_name, method_func in self.extraction_methods.items(): try: result = method_func(proto_thought, mist) if result: method_results[method_name] = result except Exception as e: # pylint: disable=W0718 self._log_method_error(method_name, proto_thought, str(e)) if not method_results: return None # Select best result using consensus and confidence weighting best_result = self._select_best_extraction(method_results) # Calculate comprehensive validation metrics validation_metrics = self._calculate_extraction_validation( best_result, proto_thought ) # Create reproducible hash for verification validation_hash = self._create_validation_hash(best_result, proto_thought, mist) extraction_time = (time.time() - start_time) * 1000 # Return comprehensive result return ConceptExtractionResult( concept_id=best_result["concept_id"], confidence=best_result["confidence"], extraction_method=best_result["method"], supporting_terms=best_result["supporting_terms"], semantic_density=validation_metrics["semantic_density"], novelty_score=validation_metrics["novelty_score"], validation_hash=validation_hash, extraction_time_ms=extraction_time, linguistic_features=validation_metrics["linguistic_features"], statistical_significance=validation_metrics["statistical_significance"], ) def _extract_linguistic_concept( self, proto_thought: str, mist: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Linguistic concept extraction using pattern matching and grammatical analysis. Algorithm: 1. Tokenize and clean input text 2. Apply linguistic patterns for concept identification 3. Calculate confidence based on pattern strength and context 4. Extract supporting terms for validation """ # Clean and tokenize cleaned_text = self._clean_text(proto_thought) tokens = self._tokenize(cleaned_text) if not tokens: return None # Apply concept patterns with context weighting concept_candidates = [] for pattern_name, pattern_config in self.concept_patterns.items(): matches = pattern_config["regex"].findall(cleaned_text) for match in matches: if isinstance(match, tuple): match = match[0] # Take first group if tuple concept = match.lower().strip() if self._is_valid_concept(concept): # Base linguistic confidence confidence = self._calculate_linguistic_confidence( concept, pattern_config, cleaned_text ) # Apply context-based weighting from mist metadata context_weight = self._calculate_context_weight(concept, mist) confidence *= context_weight supporting_terms = self._extract_supporting_terms(concept, cleaned_text) concept_candidates.append( { "concept": concept, "confidence": confidence, "pattern": pattern_name, "supporting_terms": supporting_terms, "method": "linguistic", "context_weight": context_weight, } ) # Select best linguistic candidate if concept_candidates: concept_candidates.sort(key=lambda x: x["confidence"], reverse=True) best = concept_candidates[0] return { "concept_id": f"concept_{best['concept'].replace(' ', '_')}", "confidence": best["confidence"], "supporting_terms": best["supporting_terms"], "method": "linguistic", "pattern_used": best["pattern"], "raw_concept": best["concept"], } return None def _extract_semantic_concept( self, proto_thought: str, mist: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Semantic concept extraction using density and relevance analysis. Algorithm: 1. Calculate semantic density of terms 2. Identify key concepts using TF-IDF-like scoring 3. Apply semantic weighting based on context 4. Validate using semantic coherence metrics """ cleaned_text = self._clean_text(proto_thought) tokens = self._tokenize(cleaned_text) if not tokens: return None # Calculate term frequencies and semantic weights term_freq = Counter(tokens) semantic_scores = {} for term, freq in term_freq.items(): if term in self.semantic_weights: base_weight = self.semantic_weights[term] else: base_weight = 0.5 # Default weight for unknown terms # Position-based weighting (earlier terms often more important) term_positions = [i for i, token in enumerate(tokens) if token == term] avg_position = sum(term_positions) / len(term_positions) position_weight = 1.0 - (avg_position / len(tokens)) # Earlier = higher weight # Length-based weighting (medium-length terms often most meaningful) length_weight = 1.0 if len(term) < 3: length_weight = 0.3 # Too short elif len(term) > 15: length_weight = 0.5 # Too long elif 4 <= len(term) <= 8: length_weight = 1.2 # Optimal length # Context-based weighting from mist metadata context_weight = self._calculate_context_weight(term, mist) # Combined semantic score semantic_score = ( base_weight * position_weight * length_weight * context_weight * (freq / len(tokens)) ) semantic_scores[term] = semantic_score if not semantic_scores: return None # Select top semantic concept best_term = max(semantic_scores.items(), key=lambda x: x[1]) concept, confidence = best_term # Validate semantic coherence coherence = self._calculate_semantic_coherence(concept, cleaned_text) confidence *= coherence if confidence < 0.3: return None supporting_terms = self._extract_semantic_supporting_terms( concept, semantic_scores ) return { "concept_id": f"concept_{concept.replace(' ', '_')}", "confidence": min(confidence, 1.0), "supporting_terms": supporting_terms, "method": "semantic", "semantic_score": semantic_scores[concept], "coherence": coherence, "raw_concept": concept, } def _extract_statistical_concept( self, proto_thought: str, mist: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Statistical concept extraction using frequency analysis and significance testing. Algorithm: 1. Perform statistical analysis of term frequencies 2. Calculate z-scores for term significance 3. Apply chi-square tests for term independence 4. Select statistically significant concepts """ cleaned_text = self._clean_text(proto_thought) tokens = self._tokenize(cleaned_text) if len(tokens) < 3: return None # Calculate term statistics term_freq = Counter(tokens) total_terms = len(tokens) # Calculate expected frequencies (uniform distribution assumption) expected_freq = total_terms / len(term_freq) # Calculate z-scores for term significance z_scores = {} for term, observed_freq in term_freq.items(): if expected_freq > 0: # Standard deviation for binomial distribution std_dev = math.sqrt(expected_freq * (1 - 1 / len(term_freq))) if std_dev > 0: z_score = (observed_freq - expected_freq) / std_dev z_scores[term] = z_score if not z_scores: return None # Select most statistically significant term best_term = max(z_scores.items(), key=lambda x: abs(x[1])) concept, z_score = best_term # Calculate p-value (two-tailed test) p_value = 2 * (1 - self._normal_cdf(abs(z_score))) # Convert z-score to confidence (bounded between 0 and 1) confidence = min(abs(z_score) / 3.0, 1.0) # 3 sigma = 100% confidence # Apply multiple comparison correction (Bonferroni) corrected_confidence = max(confidence / len(z_scores), 0.1) # Apply context-based weighting from mist metadata context_weight = self._calculate_context_weight(concept, mist) corrected_confidence *= context_weight if corrected_confidence < 0.3 or p_value > 0.05: return None supporting_terms = self._extract_statistical_supporting_terms( concept, term_freq ) return { "concept_id": f"concept_{concept.replace(' ', '_')}", "confidence": corrected_confidence, "supporting_terms": supporting_terms, "method": "statistical", "z_score": z_score, "p_value": p_value, "statistical_significance": 1 - p_value, "context_weight": context_weight, "raw_concept": concept, } def _extract_hybrid_concept( self, proto_thought: str, mist: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Hybrid concept extraction combining multiple methods with consensus validation. Algorithm: 1. Run all extraction methods 2. Calculate consensus scores 3. Apply weighted voting 4. Validate cross-method agreement """ # Run all methods method_results = {} for method_name in ["linguistic", "semantic", "statistical"]: try: method_func = self.extraction_methods[method_name] result = method_func(proto_thought, mist) if result: method_results[method_name] = result except Exception: # pylint: disable=W0718 continue if not method_results: return None # Calculate consensus for each concept concept_consensus: defaultdict[str, Dict[str, Any]] = defaultdict( lambda: {"methods": [], "confidences": [], "supporting_terms": set()} ) for method, result in method_results.items(): concept = result.get( "raw_concept", result.get("concept_id", "").replace("concept_", "") ) if concept: concept_consensus[concept]["methods"].append(method) concept_consensus[concept]["confidences"].append(result["confidence"]) concept_consensus[concept]["supporting_terms"].update( result.get("supporting_terms", []) ) # Calculate consensus scores consensus_scores = {} for concept, data in concept_consensus.items(): # Method diversity bonus method_diversity = len(set(data["methods"])) / 3.0 # Max 3 methods # Average confidence avg_confidence = sum(data["confidences"]) / len(data["confidences"]) # Confidence consistency (lower variance = higher consistency) confidence_variance = sum((c - avg_confidence) ** 2 for c in data["confidences"]) / len( data["confidences"] ) consistency_bonus = 1.0 / (1.0 + confidence_variance) # Supporting terms richness supporting_richness = min(len(data["supporting_terms"]) / 5.0, 1.0) # Combined consensus score consensus_score = ( avg_confidence * 0.4 + method_diversity * 0.3 + consistency_bonus * 0.2 + supporting_richness * 0.1 ) consensus_scores[concept] = { "score": consensus_score, "methods": data["methods"], "avg_confidence": avg_confidence, "supporting_terms": list(data["supporting_terms"]), "method_diversity": method_diversity, "consistency": consistency_bonus, } if not consensus_scores: return None # Select best consensus concept best_concept = max(consensus_scores.items(), key=lambda x: x[1]["score"]) concept, consensus_data = best_concept # Validate cross-method agreement agreement_score = ( len(consensus_data["methods"]) / 3.0 ) # Agreement with all possible methods return { "concept_id": f"concept_{concept.replace(' ', '_')}", "confidence": min(consensus_data["score"], 1.0), "supporting_terms": consensus_data["supporting_terms"], "method": "hybrid", "consensus_methods": consensus_data["methods"], "method_diversity": consensus_data["method_diversity"], "cross_method_agreement": agreement_score, "raw_concept": concept, } def _heat_node_scientific( self, concept_id: str, mist: Dict[str, Any], extraction_result: ConceptExtractionResult ): """ Scientific heat calculation with comprehensive metrics and validation. Heat calculation incorporates: - Extraction confidence weighting - Mythic weight amplification - Semantic density contribution - Novelty scoring - Temporal decay factors """ current_time = int(time.time()) if concept_id not in self.nodes: self.nodes[concept_id] = { "heat": 0.0, "room_type": self._determine_room_type(extraction_result), "creation_epoch": current_time, "visit_count": 0, "last_visit": current_time, "extraction_history": [], "heat_sources": [], "semantic_profile": {}, } # Calculate scientific heat components heat_components = { "base_confidence": extraction_result.confidence * 0.3, "mythic_amplification": mist.get("mythic_weight", 0.0) * 0.2, "semantic_density": extraction_result.semantic_density * 0.2, "novelty_bonus": extraction_result.novelty_score * 0.15, "technical_clarity": mist.get("technical_clarity", 0.5) * 0.1, "statistical_significance": extraction_result.statistical_significance * 0.05, } # Total heat boost total_heat_boost = sum(heat_components.values()) # Apply temporal decay to existing heat existing_heat = self.nodes[concept_id]["heat"] last_visit = self.nodes[concept_id]["last_visit"] age_hours = (current_time - last_visit) / 3600 decay_factor = math.exp(-age_hours / 48) # 48-hour half-life for heat decay decayed_heat = existing_heat * decay_factor # Update node with scientific metrics self.nodes[concept_id]["heat"] = decayed_heat + total_heat_boost self.nodes[concept_id]["visit_count"] += 1 self.nodes[concept_id]["last_visit"] = current_time self.nodes[concept_id]["extraction_history"].append( { "timestamp": current_time, "confidence": extraction_result.confidence, "method": extraction_result.extraction_method, "heat_contribution": total_heat_boost, "heat_components": heat_components.copy(), } ) self.nodes[concept_id]["heat_sources"].append( { "mist_id": mist.get("id"), "extraction_result_hash": extraction_result.validation_hash, "timestamp": current_time, } ) # Update semantic profile self._update_semantic_profile(concept_id, extraction_result) def _determine_room_type(self, extraction_result: ConceptExtractionResult) -> str: """Determine room type based on extraction characteristics.""" confidence = extraction_result.confidence method = extraction_result.extraction_method novelty = extraction_result.novelty_score if confidence > 0.8 and method == "hybrid": return "throne" elif confidence > 0.7 and novelty > 0.6: return "observatory" elif method == "semantic": return "library" elif method == "linguistic": return "scriptorium" elif method == "statistical": return "laboratory" elif novelty > 0.7: return "gallery" else: return "chamber" def _update_semantic_profile(self, concept_id: str, extraction_result: ConceptExtractionResult): """Update semantic profile for a concept.""" if "semantic_profile" not in self.nodes[concept_id]: self.nodes[concept_id]["semantic_profile"] = { "avg_confidence": extraction_result.confidence, "method_distribution": Counter(), "supporting_terms": set(), "semantic_density_history": [], "novelty_history": [], } profile = self.nodes[concept_id]["semantic_profile"] # Update averages history_count = len(profile["semantic_density_history"]) + 1 profile["avg_confidence"] = ( (profile["avg_confidence"] * (history_count - 1)) + extraction_result.confidence ) / history_count # Update method distribution profile["method_distribution"][extraction_result.extraction_method] += 1 # Update supporting terms profile["supporting_terms"].update(extraction_result.supporting_terms) # Update history profile["semantic_density_history"].append(extraction_result.semantic_density) profile["novelty_history"].append(extraction_result.novelty_score) # Comprehensive helper methods for scientific validation def _initialize_stop_words(self) -> Set[str]: """Initialize comprehensive stop words list.""" return { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "can", "this", "that", "these", "those", "i", "you", "he", "she", "it", "we", "they", "me", "him", "her", "us", "them", "my", "your", "his", "its", "our", "their", "what", "where", "when", "why", "how", "who", "which", "whom", "whose", } def _initialize_concept_patterns(self) -> Dict[str, Dict[str, Any]]: """Initialize linguistic patterns for concept extraction.""" return { "noun_phrases": { "regex": re.compile(r"\b([A-Z][a-z]+(?:\s+[a-z]+){0,2})\b"), "weight": 0.8, "description": "Capitalized noun phrases", }, "technical_terms": { "regex": re.compile(r"\b([a-z]+(?:_[a-z]+){1,3})\b"), "weight": 0.7, "description": "Technical underscore terms", }, "action_concepts": { "regex": re.compile( r"\b(creat|build|design|implement|develop|generate|" r"process|analyze|optimiz)\w+\b" ), "weight": 0.6, "description": "Action-oriented concepts", }, "domain_concepts": { "regex": re.compile( r"\b(system|algorithm|method|framework|pattern|" r"architecture|structure|model)\w*\b" ), "weight": 0.9, "description": "Domain-specific concepts", }, } def _initialize_semantic_weights(self) -> Dict[str, float]: """Initialize semantic weights for common terms.""" return { # High-weight technical terms "system": 0.9, "algorithm": 0.9, "method": 0.8, "framework": 0.9, "pattern": 0.8, "architecture": 0.9, "structure": 0.8, "model": 0.8, "design": 0.7, "implement": 0.8, "develop": 0.7, "create": 0.7, "process": 0.6, "analyze": 0.7, "optimize": 0.8, "generate": 0.7, # Medium-weight conceptual terms "concept": 0.6, "idea": 0.5, "approach": 0.6, "solution": 0.6, "strategy": 0.7, "technique": 0.6, "principle": 0.6, "theory": 0.7, # Lower-weight general terms "data": 0.4, "information": 0.4, "content": 0.3, "result": 0.3, "output": 0.3, "input": 0.3, "value": 0.3, "state": 0.3, } def _clean_text(self, text: str) -> str: """Clean and normalize text for processing.""" # Remove style markers and special characters cleaned = re.sub(r"\[.*?\]", "", text) cleaned = re.sub(r"[^\w\s_\-]", " ", cleaned) cleaned = re.sub(r"\s+", " ", cleaned) return cleaned.strip().lower() def _tokenize(self, text: str) -> List[str]: """Tokenize text into meaningful terms.""" words = text.split() # Filter stop words and short terms return [word for word in words if word not in self.stop_words and len(word) > 2] def _is_valid_concept(self, concept: str) -> bool: """Validate if a term is a valid concept.""" if len(concept) < 3 or len(concept) > 50: return False if concept.isdigit(): return False if concept in self.stop_words: return False return True def _calculate_linguistic_confidence( self, concept: str, pattern_config: Dict[str, Any], text: str ) -> float: """Calculate confidence for linguistic pattern match.""" base_confidence = cast(float, pattern_config["weight"]) # Position weighting (earlier mentions often more important) first_occurrence = text.lower().find(concept.lower()) position_weight = 1.0 - (first_occurrence / len(text)) if first_occurrence >= 0 else 0.5 # Length weighting (medium length often optimal) length = len(concept) if 4 <= length <= 8: length_weight = 1.2 elif length < 4: length_weight = 0.6 else: length_weight = 0.8 # Capitalization bonus (if originally capitalized) capitalization_bonus = 1.1 if concept[0].isupper() else 1.0 return min(base_confidence * position_weight * length_weight * capitalization_bonus, 1.0) def _extract_supporting_terms(self, concept: str, text: str) -> List[str]: """Extract supporting terms around a concept.""" words = text.split() supporting = [] for i, word in enumerate(words): if concept.lower() in word.lower(): # Extract context window start = max(0, i - 3) end = min(len(words), i + 4) context_words = words[start:end] # Add related terms (excluding the concept itself) for context_word in context_words: if ( context_word.lower() != concept.lower() and context_word not in self.stop_words and len(context_word) > 2 and context_word not in supporting ): supporting.append(context_word) return supporting[:5] # Limit to top 5 supporting terms def _calculate_context_weight(self, term: str, mist: Dict[str, Any]) -> float: """Calculate context-based weighting for a term.""" weight = 1.0 # Style-based weighting style = mist.get("style", "") if style == "technical" and term in self.semantic_weights: weight *= 1.2 elif style == "poetic" and len(term) > 6: weight *= 1.1 # Affect-based weighting affect = mist.get("affect_signature", {}) if affect.get("curiosity", 0) > 0.5 and term in ["explore", "discover", "learn"]: weight *= 1.3 elif affect.get("awe", 0) > 0.5 and term in ["amazing", "incredible", "beautiful"]: weight *= 1.2 return weight def _calculate_semantic_coherence(self, concept: str, text: str) -> float: """Calculate semantic coherence of a concept within text.""" # Simple coherence based on concept repetition and context concept_lower = concept.lower() text_lower = text.lower() # Count concept occurrences occurrences = text_lower.count(concept_lower) if occurrences == 0: return 0.0 # Calculate context density words = text_lower.split() concept_indices = [i for i, word in enumerate(words) if concept_lower in word] if len(concept_indices) == 1: return 0.5 # Single occurrence, moderate coherence # Calculate average distance between occurrences distances = [ concept_indices[i + 1] - concept_indices[i] for i in range(len(concept_indices) - 1) ] avg_distance = sum(distances) / len(distances) if distances else len(words) # Closer occurrences = higher coherence distance_score = max(0.1, 1.0 - (avg_distance / len(words))) # Frequency bonus frequency_bonus = min(occurrences / 3.0, 1.0) return min(distance_score * 0.7 + frequency_bonus * 0.3, 1.0) def _extract_semantic_supporting_terms( self, concept: str, semantic_scores: Dict[str, float] ) -> List[str]: """Extract supporting terms based on semantic scores.""" # Get terms with high semantic scores scored_terms = [ (term, score) for term, score in semantic_scores.items() if term != concept.lower() and score > 0.3 ] # Sort by semantic score scored_terms.sort(key=lambda x: x[1], reverse=True) return [term for term, _ in scored_terms[:5]] def _normal_cdf(self, x: float) -> float: """Approximate normal CDF for statistical calculations.""" return 0.5 * (1 + math.erf(x / math.sqrt(2))) def _extract_statistical_supporting_terms( self, concept: str, term_freq: Counter ) -> List[str]: """Extract supporting terms based on statistical frequency.""" # Get terms with frequency above average avg_freq = sum(term_freq.values()) / len(term_freq) frequent_terms = [ (term, freq) for term, freq in term_freq.items() if term != concept.lower() and freq > avg_freq ] # Sort by frequency frequent_terms.sort(key=lambda x: x[1], reverse=True) return [term for term, _ in frequent_terms[:5]] def _select_best_extraction( self, method_results: Dict[str, Dict[str, Any]] ) -> Dict[str, Any]: """Select best extraction result from multiple methods.""" if len(method_results) == 1: return list(method_results.values())[0] # Score each result scored_results = [] for method, result in method_results.items(): score = result["confidence"] # Method preference weighting method_weights = {"hybrid": 1.2, "semantic": 1.1, "linguistic": 1.0, "statistical": 0.9} score *= method_weights.get(method, 1.0) # Supporting terms richness bonus supporting_bonus = min(len(result.get("supporting_terms", [])) / 3.0, 0.2) score += supporting_bonus scored_results.append((result, score)) # Return highest scored result scored_results.sort(key=lambda x: x[1], reverse=True) return scored_results[0][0] def _calculate_extraction_validation( self, best_result: Dict[str, Any], proto_thought: str ) -> Dict[str, Any]: """Calculate comprehensive validation metrics for extraction.""" return { "semantic_density": self._calculate_semantic_density_of_text(proto_thought), "novelty_score": self._calculate_concept_novelty(best_result.get("raw_concept", "")), "linguistic_features": self._extract_linguistic_features(proto_thought), "statistical_significance": best_result.get("statistical_significance", 0.5), } def _calculate_semantic_density_of_text(self, text: str) -> float: """Calculate semantic density of text.""" words = text.split() meaningful_words = [w for w in words if w not in self.stop_words and len(w) > 2] if not meaningful_words: return 0.0 # Density = meaningful words / total words return len(meaningful_words) / len(words) def _calculate_concept_novelty(self, concept: str) -> float: """Calculate novelty score for a concept. Uses the same key used in tracking (concept_id). """ # Ensure we look up using the same key used in _track_concept_statistics (concept_id) concept_key = ( concept if concept.startswith("concept_") else f"concept_{concept.replace(' ', '_')}" ) stats = self.concept_statistics.get(concept_key) concept_frequency = stats["frequency"] if stats else 0 if concept_frequency == 0: return 1.0 # Completely novel elif concept_frequency == 1: return 0.7 # Rare elif concept_frequency <= 5: return 0.4 # Uncommon else: return 0.1 # Common def _extract_linguistic_features(self, text: str) -> Dict[str, Any]: """Extract linguistic features from text.""" words = text.split() sentences = re.split(r"[.!?]+", text) return { "word_count": len(words), "sentence_count": len([s for s in sentences if s.strip()]), "avg_word_length": sum(len(w) for w in words) / len(words) if words else 0, "punctuation_ratio": len(re.findall(r"[^\w\s]", text)) / len(text) if text else 0, "capitalization_ratio": sum(1 for c in text if c.isupper()) / len(text) if text else 0, } def _create_validation_hash( self, result: Dict[str, Any], proto_thought: str, mist: Dict[str, Any] ) -> str: """Create reproducible validation hash.""" hash_data = { "concept": result.get("concept_id", ""), "confidence": result.get("confidence", 0), "method": result.get("method", ""), "proto_hash": hashlib.md5(proto_thought.encode()).hexdigest()[:8], "mist_id": mist.get("id", ""), "timestamp": int(time.time()), } return hashlib.sha256(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()[:16] def _track_concept_statistics( self, extraction_result: ConceptExtractionResult, mist: Dict[str, Any] ): """Track longitudinal statistics for concepts.""" concept = extraction_result.concept_id self.concept_statistics[concept]["frequency"] += 1 self.concept_statistics[concept]["contexts"].append(mist.get("proto_thought", "")) self.concept_statistics[concept]["confidence_sum"] += extraction_result.confidence self.concept_statistics[concept]["last_seen"] = int(time.time()) def _categorize_novelty(self, novelty_score: float) -> str: """Categorize novelty score for analysis.""" if novelty_score > 0.8: return "highly_novel" elif novelty_score > 0.5: return "moderately_novel" elif novelty_score > 0.2: return "slightly_novel" else: return "well_known" def _perform_validation_analysis( self, extraction_results: List[ConceptExtractionResult], mist_lines: List[Dict[str, Any]] ) -> ConceptValidationMetrics: """Perform comprehensive validation analysis.""" if not extraction_results: return ConceptValidationMetrics(0, 0, 0, 0, 0, 0, 0, 0) # Calculate precision, recall, F1 (simplified for demonstration) precision = sum(r.confidence for r in extraction_results) / len(extraction_results) recall = len(set(r.concept_id for r in extraction_results)) / max(len(mist_lines), 1) f1_score = ( 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 ) # Semantic coherence semantic_coherence = sum(r.semantic_density for r in extraction_results) / len( extraction_results ) # Concept uniqueness unique_concepts = len(set(r.concept_id for r in extraction_results)) concept_uniqueness = unique_concepts / len(extraction_results) # Extraction consistency method_consistency = len(set(r.extraction_method for r in extraction_results)) / len( extraction_results ) extraction_consistency = ( 1.0 - method_consistency ) # Lower method diversity = higher consistency # Statistical significance statistical_significance = sum( r.statistical_significance for r in extraction_results ) / len(extraction_results) # Effect size (simplified) effect_size = statistical_significance * semantic_coherence return ConceptValidationMetrics( precision=precision, recall=recall, f1_score=f1_score, semantic_coherence=semantic_coherence, concept_uniqueness=concept_uniqueness, extraction_consistency=extraction_consistency, statistical_significance=statistical_significance, effect_size=effect_size, ) def _calculate_semantic_diversity(self, concept_id: str) -> float: """Calculate semantic diversity of a concept.""" if concept_id not in self.nodes: return 0.0 profile = self.nodes[concept_id].get("semantic_profile", {}) method_distribution = profile.get("method_distribution", Counter()) if not method_distribution: return 0.0 # Diversity based on method distribution entropy total_methods = sum(method_distribution.values()) if total_methods == 0: return 0.0 entropy = 0.0 for count in method_distribution.values(): if count > 0: probability = count / total_methods entropy -= probability * math.log(probability) # Normalize entropy (max entropy for 4 methods = log(4)) max_entropy = math.log(len(method_distribution)) return entropy / max_entropy if max_entropy > 0 else 0.0 def _log_extraction_error(self, mist: Dict[str, Any], error: str): """Log extraction errors for analysis with PII redaction.""" # Redact sensitive information - only log non-sensitive metadata safe_metadata = { "timestamp": int(time.time()), "mist_id": mist.get("id"), "error_type": type(error).__name__ if isinstance(error, Exception) else "string", "error_message": str(error)[:50], # Truncate error message "has_proto_thought": bool(mist.get("proto_thought")), "proto_length": len(mist.get("proto_thought", "")), } # Use structured logging instead of print logger.error( "Concept extraction failed", extra={ "event": "extraction_error", "mist_id": safe_metadata["mist_id"], "error_type": safe_metadata["error_type"], }, ) def _log_method_error(self, method: str, proto_thought: str, error: str): """Log method-specific errors with secure redaction.""" # Do not log any portion of proto_thought to prevent PII exposure safe_log = { "timestamp": int(time.time()), "method": method, "error_type": type(error).__name__ if isinstance(error, Exception) else "string", "proto_length": len(proto_thought), "error_message": str(error)[:50], # Truncate error } # Use structured logging instead of print logger.error( "extraction method failed: %s", method, extra={ "event": "method_error", "method": safe_log["method"], "error_type": safe_log["error_type"], }, ) # Scientific analysis and reporting methods def get_extraction_statistics(self) -> Dict[str, Any]: """Get comprehensive extraction statistics for analysis.""" if not self.extraction_history: return {"status": "no_extractions"} total_extractions = len(self.extraction_history) method_counts = Counter(e.extraction_method for e in self.extraction_history) avg_confidence = sum(e.confidence for e in self.extraction_history) / total_extractions avg_extraction_time = ( sum(e.extraction_time_ms for e in self.extraction_history) / total_extractions ) return { "total_extractions": total_extractions, "method_distribution": dict(method_counts), "average_confidence": avg_confidence, "average_extraction_time_ms": avg_extraction_time, "unique_concepts": len(set(e.concept_id for e in self.extraction_history)), "concept_statistics": dict(self.concept_statistics), "validation_metrics": ( self.validation_metrics[-1].__dict__ if self.validation_metrics else None ), } def export_scientific_data(self) -> Dict[str, Any]: """Export all data for scientific analysis and reproducibility.""" return { "extraction_history": [ { "concept_id": e.concept_id, "confidence": e.confidence, "extraction_method": e.extraction_method, "supporting_terms": e.supporting_terms, "semantic_density": e.semantic_density, "novelty_score": e.novelty_score, "validation_hash": e.validation_hash, "extraction_time_ms": e.extraction_time_ms, "linguistic_features": e.linguistic_features, "statistical_significance": e.statistical_significance, } for e in self.extraction_history ], "concept_statistics": dict(self.concept_statistics), "validation_metrics": [vm.__dict__ for vm in self.validation_metrics], "node_data": self.nodes, "configuration": self.config, "extraction_timestamp": int(time.time()), }