warbler-cda / warbler_cda /conflict_detector.py
Bellok's picture
Upload folder using huggingface_hub
0ccf2f0 verified
raw
history blame
26 kB
"""
Conflict Detector - Semantic Statement Clash Detection.
Detects conflicting or contradictory statements using semantic similarity and
logical opposition analysis for the Cognitive Geo-Thermal Lore Engine v0.3.
"""
from typing import List, Dict, Any, Optional, Set
import time
import hashlib
from dataclasses import dataclass
from enum import Enum
class ConflictType(Enum):
"""Types of conflicts that can be detected."""
SEMANTIC_OPPOSITION = "semantic_opposition" # Directly opposing statements
LOGICAL_CONTRADICTION = "logical_contradiction" # Logically incompatible
FACTUAL_INCONSISTENCY = "factual_inconsistency" # Inconsistent facts
TEMPORAL_CONFLICT = "temporal_conflict" # Time-based conflicts
SCOPE_MISMATCH = "scope_mismatch" # Different scope/context but conflicting
@dataclass
class ConflictEvidence:
"""Evidence for a detected conflict."""
statement_a_id: str
statement_b_id: str
conflict_type: ConflictType
confidence_score: float # 0.0 to 1.0
semantic_distance: float
opposition_indicators: List[str]
context_overlap: float
detection_timestamp: float
def get_age_seconds(self) -> float:
"""Get conflict age in seconds."""
return time.time() - self.detection_timestamp
@dataclass
class StatementFingerprint:
"""Semantic and structural fingerprint of a statement."""
statement_id: str
content: str
embedding: List[float]
negation_indicators: List[str]
assertion_strength: float # How definitive the statement is
temporal_markers: List[str]
domain_tags: Set[str]
creation_timestamp: float
class ConflictDetector:
"""
Semantic conflict detection system for identifying clashing statements.
Features:
- Semantic opposition detection using embeddings
- Negation and assertion analysis
- Temporal conflict identification
- Confidence scoring and evidence collection
"""
def __init__(self, config: Optional[Dict[str, Any]] = None, embedding_provider=None):
"""Initialize the conflict detector."""
self.config = config or {}
self.embedding_provider = embedding_provider
# Configuration parameters
self.opposition_threshold = self.config.get("opposition_threshold", 0.7)
self.semantic_similarity_threshold = self.config.get("semantic_similarity_threshold", 0.8)
self.min_confidence_score = self.config.get("min_confidence_score", 0.6)
self.max_statement_age_hours = self.config.get("max_statement_age_hours", 24)
# Storage
self.statement_fingerprints: Dict[str, StatementFingerprint] = {}
self.detected_conflicts: List[ConflictEvidence] = []
self.conflict_history: List[ConflictEvidence] = []
# Conflict detection patterns
self.negation_patterns = [
"not",
"no",
"never",
"none",
"nothing",
"nowhere",
"isn't",
"aren't",
"won't",
"can't",
"don't",
"doesn't",
"unable",
"impossible",
"incorrect",
"false",
"wrong",
]
self.assertion_patterns = [
"always",
"definitely",
"certainly",
"absolutely",
"must",
"will",
"shall",
"guaranteed",
"proven",
"fact",
"truth",
]
self.temporal_patterns = [
"before",
"after",
"during",
"when",
"while",
"since",
"until",
"by",
"at",
"on",
"in",
"yesterday",
"today",
"tomorrow",
"now",
"then",
"later",
"earlier",
]
# Metrics
self.metrics = {
"statements_processed": 0,
"conflicts_detected": 0,
"false_positives_resolved": 0,
"processing_time_ms": 0.0,
"average_confidence": 0.0,
}
def process_statements(self, statements: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Process new statements and detect conflicts with existing statements.
Args:
statements: List of statement dicts with 'id', 'text', and optional metadata
Returns:
Processing report with new conflicts detected
"""
start_time = time.time()
processing_report = {
"statements_processed": len(statements),
"new_conflicts": [],
"fingerprints_created": 0,
"total_active_statements": 0,
"conflict_summary": {"high_confidence": 0, "medium_confidence": 0, "low_confidence": 0},
}
# Process each statement
for statement in statements:
statement_id = statement.get("id", f"stmt_{int(time.time())}")
content = statement.get("text", "")
if not content.strip():
continue
# Create fingerprint for new statement
fingerprint = self._create_statement_fingerprint(statement_id, content, statement)
self.statement_fingerprints[statement_id] = fingerprint
processing_report["fingerprints_created"] += 1
# Detect conflicts with existing statements
conflicts = self._detect_conflicts_for_statement(fingerprint)
for conflict in conflicts:
if conflict.confidence_score >= self.min_confidence_score:
self.detected_conflicts.append(conflict)
processing_report["new_conflicts"].append(
{
"conflict_id": self._generate_conflict_id(conflict),
"statement_a": conflict.statement_a_id,
"statement_b": conflict.statement_b_id,
"conflict_type": conflict.conflict_type.value,
"confidence_score": conflict.confidence_score,
"opposition_indicators": conflict.opposition_indicators,
}
)
# Categorize by confidence
if conflict.confidence_score >= 0.8:
processing_report["conflict_summary"]["high_confidence"] += 1 # type: ignore
elif conflict.confidence_score >= 0.6:
processing_report["conflict_summary"]["medium_confidence"] += 1 # type: ignore
else:
processing_report["conflict_summary"]["low_confidence"] += 1 # type: ignore
# Cleanup old statements
self._cleanup_old_statements()
# Update metrics
elapsed_ms = (time.time() - start_time) * 1000
self.metrics["statements_processed"] += len(statements)
self.metrics["conflicts_detected"] += len(processing_report["new_conflicts"])
self.metrics["processing_time_ms"] += elapsed_ms
if self.detected_conflicts:
self.metrics["average_confidence"] = sum(
c.confidence_score for c in self.detected_conflicts
) / len(self.detected_conflicts)
processing_report["elapsed_ms"] = elapsed_ms
processing_report["total_active_statements"] = len(self.statement_fingerprints)
processing_report["total_conflicts_detected"] = len(self.detected_conflicts)
return processing_report
def get_conflict_analysis(self, statement_id: str) -> Dict[str, Any]:
"""
Get detailed conflict analysis for a specific statement.
Returns conflicts involving the statement and recommendations.
"""
conflicts_involving_statement = [
conflict
for conflict in self.detected_conflicts
if conflict.statement_a_id == statement_id or conflict.statement_b_id == statement_id
]
if not conflicts_involving_statement:
return {
"statement_id": statement_id,
"conflicts_found": 0,
"status": "no_conflicts",
"recommendation": "Statement appears consistent with existing knowledge",
}
# Analyze conflict patterns
conflict_types: Dict[str, int] = {}
max_confidence: float = 0.0
opposing_statements: Set[str] = set()
for conflict in conflicts_involving_statement:
conflict_type = conflict.conflict_type.value
conflict_types[conflict_type] = conflict_types.get(conflict_type, 0) + 1
max_confidence = max(max_confidence, conflict.confidence_score)
# Add opposing statement
if conflict.statement_a_id == statement_id:
opposing_statements.add(conflict.statement_b_id)
else:
opposing_statements.add(conflict.statement_a_id)
# Generate recommendation
recommendation = self._generate_conflict_recommendation(
len(conflicts_involving_statement), max_confidence, conflict_types
)
return {
"statement_id": statement_id,
"conflicts_found": len(conflicts_involving_statement),
"max_confidence": max_confidence,
"conflict_types": conflict_types,
"opposing_statements": list(opposing_statements),
"status": "conflicts_detected" if conflicts_involving_statement else "no_conflicts",
"recommendation": recommendation,
"detailed_conflicts": [
{
"opposing_statement": (
conflict.statement_b_id
if conflict.statement_a_id == statement_id
else conflict.statement_a_id
),
"conflict_type": conflict.conflict_type.value,
"confidence": conflict.confidence_score,
"evidence": conflict.opposition_indicators,
"age_seconds": conflict.get_age_seconds(),
}
for conflict in conflicts_involving_statement
],
}
def get_global_conflict_summary(self) -> Dict[str, Any]:
"""Get summary of all conflicts in the system."""
if not self.detected_conflicts:
return {
"total_conflicts": 0,
"conflict_types": {},
"confidence_distribution": {"high": 0, "medium": 0, "low": 0},
"recent_conflicts_1h": 0,
"status": "healthy",
"system_health_score": 1.0,
"recommendations": ["Continue monitoring for new conflicts"],
"metrics": self.metrics.copy(),
}
# Analyze conflict distribution
conflict_types: Dict[str, int] = {}
confidence_distribution: Dict[str, int] = {"high": 0, "medium": 0, "low": 0}
recent_conflicts: int = 0
for conflict in self.detected_conflicts:
# Count by type
conflict_type = conflict.conflict_type.value
conflict_types[conflict_type] = conflict_types.get(conflict_type, 0) + 1
# Count by confidence
if conflict.confidence_score >= 0.8:
confidence_distribution["high"] += 1
elif conflict.confidence_score >= 0.6:
confidence_distribution["medium"] += 1
else:
confidence_distribution["low"] += 1
# Count recent conflicts (last hour)
if conflict.get_age_seconds() < 3600:
recent_conflicts += 1
# Determine system health
high_confidence_conflicts: int = confidence_distribution["high"]
status: str = "healthy"
if high_confidence_conflicts > 5:
status = "critical"
elif high_confidence_conflicts > 2:
status = "warning"
elif confidence_distribution["medium"] + confidence_distribution["low"] > 10:
status = "monitoring"
else:
status = "healthy"
health_score = self._calculate_health_score()
recommendations = self._generate_system_recommendations(status, conflict_types)
return {
"total_conflicts": len(self.detected_conflicts),
"conflict_types": conflict_types,
"confidence_distribution": confidence_distribution,
"recent_conflicts_1h": recent_conflicts,
"status": status,
"system_health_score": health_score,
"recommendations": recommendations,
"metrics": self.metrics.copy(),
}
def resolve_conflict(self, conflict_id: str, resolution: str) -> bool:
"""
Mark a conflict as resolved with explanation.
Args:
conflict_id: ID of conflict to resolve
resolution: Explanation of how conflict was resolved
Returns:
True if conflict was found and resolved
"""
# Note: resolution parameter is kept for API consistency, may be used in future logging
for i, conflict in enumerate(self.detected_conflicts):
if self._generate_conflict_id(conflict) == conflict_id:
# Move to history
resolved_conflict = conflict
self.conflict_history.append(resolved_conflict)
self.detected_conflicts.pop(i)
self.metrics["false_positives_resolved"] += 1
return True
return False
def _create_statement_fingerprint(
self, statement_id: str, content: str, metadata: Dict[str, Any]
) -> StatementFingerprint:
"""Create semantic and structural fingerprint for a statement."""
# Generate embedding if provider available
embedding = []
if self.embedding_provider:
try:
embedding = self.embedding_provider.embed_text(content, metadata)
except Exception: # pylint: disable=broad-except
# Fallback to empty embedding
pass
# Detect negation indicators
content_lower = content.lower()
negation_indicators = [
pattern for pattern in self.negation_patterns if pattern in content_lower
]
# Calculate assertion strength
assertion_indicators = [
pattern for pattern in self.assertion_patterns if pattern in content_lower
]
assertion_strength = min(len(assertion_indicators) * 0.2, 1.0)
# Extract temporal markers
temporal_markers = [
pattern for pattern in self.temporal_patterns if pattern in content_lower
]
# Extract domain tags (simple keyword-based)
domain_tags = set()
if "debug" in content_lower or "development" in content_lower:
domain_tags.add("development")
if "memory" in content_lower or "storage" in content_lower:
domain_tags.add("memory")
if "process" in content_lower or "algorithm" in content_lower:
domain_tags.add("processing")
if "semantic" in content_lower or "meaning" in content_lower:
domain_tags.add("semantics")
return StatementFingerprint(
statement_id=statement_id,
content=content,
embedding=embedding,
negation_indicators=negation_indicators,
assertion_strength=assertion_strength,
temporal_markers=temporal_markers,
domain_tags=domain_tags,
creation_timestamp=time.time(),
)
def _detect_conflicts_for_statement(
self, new_fingerprint: StatementFingerprint
) -> List[ConflictEvidence]:
"""Detect conflicts between new statement and existing statements."""
conflicts = []
for existing_id, existing_fingerprint in self.statement_fingerprints.items():
if existing_id == new_fingerprint.statement_id:
continue # Don't compare with self
# Check for semantic opposition
if (
self.embedding_provider
and new_fingerprint.embedding
and existing_fingerprint.embedding
):
similarity = self.embedding_provider.calculate_similarity(
new_fingerprint.embedding, existing_fingerprint.embedding
)
# High semantic similarity with negation indicators suggests opposition
if similarity > self.semantic_similarity_threshold:
opposition_score = self._calculate_opposition_score(
new_fingerprint, existing_fingerprint
)
if opposition_score > self.opposition_threshold:
# Calculate context overlap
context_overlap = len(
new_fingerprint.domain_tags & existing_fingerprint.domain_tags
) / max(
len(new_fingerprint.domain_tags | existing_fingerprint.domain_tags), 1
)
# Collect opposition evidence
opposition_indicators: List[str] = []
if (
new_fingerprint.negation_indicators
and not existing_fingerprint.negation_indicators
):
opposition_indicators.extend(new_fingerprint.negation_indicators)
elif (
existing_fingerprint.negation_indicators
and not new_fingerprint.negation_indicators
):
opposition_indicators.extend(existing_fingerprint.negation_indicators)
# Determine conflict type
conflict_type = self._determine_conflict_type(
new_fingerprint, existing_fingerprint
)
# Calculate confidence score
confidence = self._calculate_confidence_score(
similarity, opposition_score, context_overlap, opposition_indicators
)
if confidence >= self.min_confidence_score:
conflict = ConflictEvidence(
statement_a_id=new_fingerprint.statement_id,
statement_b_id=existing_fingerprint.statement_id,
conflict_type=conflict_type,
confidence_score=confidence,
semantic_distance=1.0 - similarity,
opposition_indicators=opposition_indicators,
context_overlap=context_overlap,
detection_timestamp=time.time(),
)
conflicts.append(conflict)
return conflicts
def _calculate_opposition_score(
self, fp1: StatementFingerprint, fp2: StatementFingerprint
) -> float:
"""Calculate how much two statements oppose each other."""
score: float = 0.0
# Negation opposition (one has negation, other doesn't)
if (fp1.negation_indicators and not fp2.negation_indicators) or (
fp2.negation_indicators and not fp1.negation_indicators
):
score += 0.4
# Strong assertion differences
assertion_diff = abs(fp1.assertion_strength - fp2.assertion_strength)
if assertion_diff > 0.5:
score += 0.3
# Temporal conflicts
if fp1.temporal_markers and fp2.temporal_markers:
# Simple temporal conflict detection
if any(marker in ["before", "earlier"] for marker in fp1.temporal_markers) and any(
marker in ["after", "later"] for marker in fp2.temporal_markers
):
score += 0.3
return min(score, 1.0)
def _determine_conflict_type(
self, fp1: StatementFingerprint, fp2: StatementFingerprint
) -> ConflictType:
"""Determine the type of conflict between two statements."""
# Check for semantic opposition
if (fp1.negation_indicators and not fp2.negation_indicators) or (
fp2.negation_indicators and not fp1.negation_indicators
):
return ConflictType.SEMANTIC_OPPOSITION
# Check for temporal conflicts
if fp1.temporal_markers and fp2.temporal_markers:
return ConflictType.TEMPORAL_CONFLICT
# Check for logical contradiction (high assertion strength difference)
if abs(fp1.assertion_strength - fp2.assertion_strength) > 0.6:
return ConflictType.LOGICAL_CONTRADICTION
# Default to factual inconsistency
return ConflictType.FACTUAL_INCONSISTENCY
def _calculate_confidence_score(
self,
similarity: float,
opposition_score: float,
context_overlap: float,
indicators: List[str],
) -> float:
"""Calculate confidence score for a conflict detection."""
base_score = (similarity * 0.4) + (opposition_score * 0.4) + (context_overlap * 0.2)
# Boost confidence if we have clear opposition indicators
indicator_boost = min(len(indicators) * 0.1, 0.2)
return min(base_score + indicator_boost, 1.0)
def _cleanup_old_statements(self):
"""Remove old statements that exceed the maximum age."""
current_time = time.time()
max_age_seconds = self.max_statement_age_hours * 3600
old_statement_ids = [
stmt_id
for stmt_id, fingerprint in self.statement_fingerprints.items()
if current_time - fingerprint.creation_timestamp > max_age_seconds
]
for stmt_id in old_statement_ids:
del self.statement_fingerprints[stmt_id]
# Also cleanup old conflicts
self.detected_conflicts = [
conflict
for conflict in self.detected_conflicts
if current_time - conflict.detection_timestamp < max_age_seconds
]
def _generate_conflict_id(self, conflict: ConflictEvidence) -> str:
"""Generate unique ID for a conflict."""
content = (
f"{conflict.statement_a_id}_{conflict.statement_b_id}_{conflict.conflict_type.value}"
)
return hashlib.md5(content.encode()).hexdigest()[:12]
def _generate_conflict_recommendation(
self, conflict_count: int, max_confidence: float, conflict_types: Dict[str, int]
) -> str:
if conflict_count == 0:
return "No conflicts detected - statement appears consistent"
# Generate base recommendation based on confidence
if max_confidence > 0.9:
recommendation = f"{conflict_count} high confidence conflicts detected - review required"
elif max_confidence > 0.7:
recommendation = f"{conflict_count} moderate conflicts detected - verify statement accuracy"
else:
recommendation = f"{conflict_count} low confidence conflicts - monitor for patterns"
# Add type-specific guidance
semantic_oppositions = conflict_types.get("semantic_opposition", 0)
temporal_conflicts = conflict_types.get("temporal_conflict", 0)
advice_parts = []
if semantic_oppositions > conflict_count // 2:
advice_parts.append("check for negation errors")
if temporal_conflicts > conflict_count // 2:
advice_parts.append("verify timeline consistency")
if advice_parts:
recommendation += f" ({'; '.join(advice_parts)})"
return recommendation
def _generate_system_recommendations(
self, status: str, conflict_types: Dict[str, int]
) -> List[str]:
"""Generate system-level recommendations."""
recommendations = []
if status == "critical":
recommendations.append("Immediate review required - multiple high-confidence conflicts")
recommendations.append("Consider statement validation workflow")
elif status == "warning":
recommendations.append("Monitor conflicts closely - elevated conflict level")
recommendations.append("Review recent statements for accuracy")
# Type-specific recommendations
if conflict_types.get("semantic_opposition", 0) > 3:
recommendations.append(
"Multiple semantic oppositions detected - check for negation errors"
)
if conflict_types.get("temporal_conflict", 0) > 2:
recommendations.append("Temporal conflicts detected - verify timeline consistency")
if not recommendations:
recommendations.append("System operating normally - continue monitoring")
return recommendations
def _calculate_health_score(self) -> float:
"""Calculate overall system health score (0.0 to 1.0)."""
if not self.detected_conflicts:
return 1.0
high_confidence_conflicts: int = sum(
1 for conflict in self.detected_conflicts if conflict.confidence_score > 0.8
)
total_statements: int = len(self.statement_fingerprints)
if total_statements == 0:
return 1.0
# Health score decreases with conflict ratio
conflict_ratio: float = len(self.detected_conflicts) / total_statements
high_confidence_penalty: float = float(high_confidence_conflicts) * 0.1
health_score = 1.0 - min(conflict_ratio + high_confidence_penalty, 0.9)
return max(health_score, 0.1) # Minimum 0.1 health score