warbler-cda / tests /test_conflict_detector.py
Bellok's picture
Upload folder using huggingface_hub
0ccf2f0 verified
raw
history blame
19 kB
"""Test suite for conflict detector.
Tests the ConflictDetector class for detecting semantic conflicts and contradictions.
"""
import time
from typing import List
import pytest
from warbler_cda.conflict_detector import (
ConflictDetector,
ConflictType,
ConflictEvidence,
StatementFingerprint
)
# Mock embedding provider for testing
class MockEmbeddingProvider:
"""Mock embedding provider for testing."""
def __init__(self):
self.embed_text_calls = []
self.calculate_similarity_calls = []
def embed_text(self, text: str, *args, **kwargs) -> List[float]:
"""Mock text embedding."""
self.embed_text_calls.append(text)
# Return a simple mock embedding based on text length
return [len(text) / 100.0] * 384
def calculate_similarity(self, emb1: List[float], emb2: List[float]) -> float:
"""Mock similarity calculation."""
self.calculate_similarity_calls.append((emb1, emb2))
# Return similarity based on embedding values (0.0 to 1.0)
# For testing semantic conflicts, return high similarity for texts of same length
if abs(emb1[0] - emb2[0]) < 0.1: # Similar length texts
return 0.95 # Very similar
return 0.3 # Different
class TestConflictDetector:
"""Test the ConflictDetector class."""
def setup_method(self):
"""Setup before each test."""
self.mock_provider = MockEmbeddingProvider() # pylint: disable=W0201
self.detector = ConflictDetector(embedding_provider=self.mock_provider) # pylint: disable=W0201
def test_initialization_default_config(self):
"""Test ConflictDetector initialization with default config."""
detector = ConflictDetector()
assert detector.config == {}
assert detector.opposition_threshold == 0.7
assert detector.semantic_similarity_threshold == 0.8
assert detector.min_confidence_score == 0.6
assert detector.max_statement_age_hours == 24
assert len(detector.statement_fingerprints) == 0
assert len(detector.detected_conflicts) == 0
assert len(detector.conflict_history) == 0
def test_initialization_custom_config(self):
"""Test ConflictDetector initialization with custom config."""
config = {
"opposition_threshold": 0.8,
"semantic_similarity_threshold": 0.9,
"min_confidence_score": 0.7,
"max_statement_age_hours": 48
}
detector = ConflictDetector(config=config)
assert detector.opposition_threshold == 0.8
assert detector.semantic_similarity_threshold == 0.9
assert detector.min_confidence_score == 0.7
assert detector.max_statement_age_hours == 48
def test_process_statements_empty_list(self):
"""Test processing empty statement list."""
result = self.detector.process_statements([])
assert result["statements_processed"] == 0
assert result["fingerprints_created"] == 0
assert result["new_conflicts"] == [] # pylint: disable=C1803
assert result["total_active_statements"] == 0
assert result["total_conflicts_detected"] == 0
def test_process_statements_single_statement(self):
"""Test processing a single statement."""
statements = [{"id": "stmt_1", "text": "This is a test statement about memory storage"}]
result = self.detector.process_statements(statements)
assert result["statements_processed"] == 1
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
assert "stmt_1" in self.detector.statement_fingerprints
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert fingerprint.statement_id == "stmt_1"
assert fingerprint.content == "This is a test statement about memory storage"
assert fingerprint.domain_tags == {"memory"}
assert "memory" in fingerprint.domain_tags
def test_process_statements_without_ids(self):
"""Test processing statements without IDs."""
statements = [{"text": "First statement"}]
result = self.detector.process_statements(statements)
assert result["statements_processed"] == 1
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
# Check that ID was generated
stmt_id = list(self.detector.statement_fingerprints.keys())[0]
assert stmt_id.startswith("stmt_")
def test_process_statements_empty_content(self):
"""Test processing statements with empty content."""
statements = [
{"id": "stmt_1", "text": ""},
{"id": "stmt_2", "text": " "},
{"id": "stmt_3", "text": "Valid content"}
]
result = self.detector.process_statements(statements)
# Only the valid statement should be processed
assert result["statements_processed"] == 3
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
assert "stmt_3" in self.detector.statement_fingerprints
def test_semantic_opposition_detection(self):
"""Test detection of semantic opposition with negation."""
# Use custom config with lower thresholds to enable conflict detection with mock
config = {
"opposition_threshold": 0.3, # Lower than default 0.7 to trigger with negation diff
"semantic_similarity_threshold": 0.8,
"min_confidence_score": 0.6
}
detector = ConflictDetector(config=config, embedding_provider=self.mock_provider)
# Add first statement without negation
statements1 = [{"id": "stmt_1", "text": "This algorithm is correct and efficient"}]
detector.process_statements(statements1)
# Add opposing statement with negation
statements2 = [{"id": "stmt_2", "text": "This algorithm is not correct and efficient"}]
result = detector.process_statements(statements2)
# Verify conflict detection works
assert len(result["new_conflicts"]) == 1
conflict = result["new_conflicts"][0]
assert conflict["statement_a"] == "stmt_2"
assert conflict["statement_b"] == "stmt_1"
assert conflict["conflict_type"] == "semantic_opposition"
assert conflict["confidence_score"] >= 0.6
assert "not" in conflict["opposition_indicators"]
# Verify that fingerprint creation worked and has proper negation detection
assert "stmt_1" in detector.statement_fingerprints
assert "stmt_2" in detector.statement_fingerprints
fp1 = detector.statement_fingerprints["stmt_1"]
fp2 = detector.statement_fingerprints["stmt_2"]
assert len(fp1.negation_indicators) == 0 # No negation in first statement
assert "not" in fp2.negation_indicators # "not" found in second statement
# Verify mock was called
assert len(self.mock_provider.calculate_similarity_calls) > 0
# Verify conflict is stored in detector
assert len(detector.detected_conflicts) == 1
stored_conflict = detector.detected_conflicts[0]
assert stored_conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert stored_conflict.confidence_score >= 0.6
def test_conflict_evidence_creation(self):
"""Test that conflict evidence is properly created."""
# Setup test data
conflict = ConflictEvidence(
statement_a_id="stmt_a",
statement_b_id="stmt_b",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.85,
semantic_distance=0.15,
opposition_indicators=["not"],
context_overlap=0.7,
detection_timestamp=time.time()
)
assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert conflict.confidence_score == 0.85
assert conflict.opposition_indicators == ["not"]
assert conflict.get_age_seconds() >= 0
def test_temporal_conflict_detection(self):
"""Test detection of temporal conflicts."""
# Add statements with temporal markers
statements1 = [{"id": "stmt_1", "text": "The algorithm will finish before tomorrow"}]
self.detector.process_statements(statements1)
statements2 = [{"id": "stmt_2", "text": "The algorithm will finish after later today"}]
result = self.detector.process_statements(statements2)
# Should detect temporal conflict
assert len(result["new_conflicts"]) >= 0 # May not always trigger due
# to similarity requirements
def test_get_conflict_analysis_no_conflicts(self):
"""Test conflict analysis for statement with no conflicts."""
statements = [{"id": "stmt_1", "text": "This is a simple statement"}]
self.detector.process_statements(statements)
analysis = self.detector.get_conflict_analysis("stmt_1")
assert analysis["conflicts_found"] == 0
assert analysis["status"] == "no_conflicts"
assert "consistent" in analysis["recommendation"]
def test_get_global_conflict_summary(self):
"""Test global conflict summary generation."""
# Start with no conflicts
summary = self.detector.get_global_conflict_summary()
assert summary["total_conflicts"] == 0
assert summary["status"] == "healthy"
assert summary["system_health_score"] == 1.0
# Add some statements and conflicts
statements1 = [{"id": "stmt_1", "text": "This is definitely correct"}]
self.detector.process_statements(statements1)
# Create conflict manually for testing
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.95,
semantic_distance=0.05,
opposition_indicators=["not"],
context_overlap=0.5,
detection_timestamp=time.time()
)
self.detector.detected_conflicts.append(conflict)
summary = self.detector.get_global_conflict_summary()
assert summary["total_conflicts"] == 1
assert summary["confidence_distribution"]["high"] == 1
assert summary["status"] == "healthy" # One conflict doesn't trigger warning (>2 needed)
# Actually assert healthy as per the comment above
def test_resolve_conflict_success(self):
"""Test successful conflict resolution."""
# Create a conflict
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.8,
semantic_distance=0.2,
opposition_indicators=["not"],
context_overlap=0.5,
detection_timestamp=time.time()
)
self.detector.detected_conflicts.append(conflict)
# Generate conflict ID and resolve
conflict_id = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
resolved = self.detector.resolve_conflict(conflict_id, "User confirmed resolution")
assert resolved is True
assert len(self.detector.detected_conflicts) == 0
assert len(self.detector.conflict_history) == 1
assert self.detector.metrics["false_positives_resolved"] == 1
def test_resolve_conflict_not_found(self):
"""Test conflict resolution when conflict ID doesn't exist."""
resolved = self.detector.resolve_conflict("nonexistent_id", "Test resolution")
assert resolved is False
def test_domain_tag_extraction(self):
"""Test that domain tags are correctly extracted from statements."""
# Test memory domain
statements = [{"id": "stmt_1", "text": "The storage memory needs optimization"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert "memory" in fingerprint.domain_tags
# Test development domain
statements2 = [{"id": "stmt_2", "text": "Debug the development process"}]
self.detector.process_statements(statements2)
fingerprint2 = self.detector.statement_fingerprints["stmt_2"]
assert "development" in fingerprint2.domain_tags
def test_assertion_strength_calculation(self):
"""Test that assertion strength is correctly calculated."""
# Statement with multiple assertion words - should reach max strength
statements = [{"id": "stmt_1",
"text": "This is definitely always absolutely certainly and must be guaranteed"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
# Should have max assertion strength due to multiple indicators
assert fingerprint.assertion_strength == 1.0
# Statement with weak assertions
statements2 = [{"id": "stmt_2", "text": "This might be okay"}]
self.detector.process_statements(statements2)
fingerprint2 = self.detector.statement_fingerprints["stmt_2"]
# Should have lower assertion strength
assert fingerprint2.assertion_strength < 1.0
def test_negation_indicator_detection(self):
"""Test that negation indicators are correctly detected."""
statements = [{"id": "stmt_1", "text": "This is not wrong or incorrect"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert "not" in fingerprint.negation_indicators
assert "incorrect" in fingerprint.negation_indicators
def test_fingerprint_creation_without_embedding_provider(self):
"""Test fingerprint creation when no embedding provider is available."""
detector = ConflictDetector() # No embedding provider
statements = [{"id": "stmt_1", "text": "Test statement without embeddings"}]
result = detector.process_statements(statements)
assert result["fingerprints_created"] == 1
fingerprint = detector.statement_fingerprints["stmt_1"]
assert fingerprint.embedding == [] # Should be empty when no provider
def test_metrics_update(self):
"""Test that metrics are correctly updated during processing."""
initial_statements = self.detector.metrics["statements_processed"]
statements = [{"id": "stmt_1", "text": "First statement"}]
self.detector.process_statements(statements)
assert self.detector.metrics["statements_processed"] == initial_statements + 1
def test_conflict_type_enum_values(self):
"""Test that conflict type enum has correct values."""
assert ConflictType.SEMANTIC_OPPOSITION.value == "semantic_opposition"
assert ConflictType.LOGICAL_CONTRADICTION.value == "logical_contradiction"
assert ConflictType.FACTUAL_INCONSISTENCY.value == "factual_inconsistency"
assert ConflictType.TEMPORAL_CONFLICT.value == "temporal_conflict"
assert ConflictType.SCOPE_MISMATCH.value == "scope_mismatch"
def test_conflict_id_generation(self):
"""Test that conflict IDs are consistently generated."""
conflict = ConflictEvidence(
statement_a_id="stmt_a",
statement_b_id="stmt_b",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.8,
semantic_distance=0.2,
opposition_indicators=[],
context_overlap=0.5,
detection_timestamp=1000000.0
)
id1 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
id2 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
# Same conflict should generate same ID
assert id1 == id2
assert len(id1) == 12 # MD5 hash truncated to 12 chars
class TestStatementFingerprint:
"""Test the StatementFingerprint dataclass."""
def test_fingerprint_creation(self):
"""Test basic fingerprint creation."""
from typing import Set # pylint: disable=W0201 w0611 C0415 # noqa: F401
fingerprint = StatementFingerprint(
statement_id="test_123",
content="This is test content about semantic processing",
embedding=[0.1, 0.2, 0.3],
negation_indicators=["not"],
assertion_strength=0.8,
temporal_markers=["before"],
domain_tags={"semantics", "processing"},
creation_timestamp=1234567890.0
)
assert fingerprint.statement_id == "test_123"
assert fingerprint.content == "This is test content about semantic processing"
assert fingerprint.embedding == [0.1, 0.2, 0.3]
assert fingerprint.negation_indicators == ["not"]
assert fingerprint.assertion_strength == 0.8
assert fingerprint.temporal_markers == ["before"]
assert "semantics" in fingerprint.domain_tags
assert "processing" in fingerprint.domain_tags
def test_fingerprint_equality(self):
"""Test fingerprint equality comparison."""
fp1 = StatementFingerprint(
statement_id="id1", content="content", embedding=[],
negation_indicators=[], assertion_strength=0.5,
temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0
)
fp2 = StatementFingerprint(
statement_id="id1", content="content", embedding=[],
negation_indicators=[], assertion_strength=0.5,
temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0
)
assert fp1 == fp2
class TestConflictEvidence:
"""Test the ConflictEvidence dataclass."""
def test_conflict_evidence_creation(self):
"""Test basic conflict evidence creation."""
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.75,
semantic_distance=0.25,
opposition_indicators=["not", "incorrect"],
context_overlap=0.8,
detection_timestamp=1234567890.0
)
assert conflict.statement_a_id == "stmt_1"
assert conflict.statement_b_id == "stmt_2"
assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert conflict.confidence_score == 0.75
assert conflict.semantic_distance == 0.25
assert conflict.opposition_indicators == ["not", "incorrect"]
assert conflict.context_overlap == 0.8
# Test age calculation (will be small since timestamp is old)
age = conflict.get_age_seconds()
assert age > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])