Spaces:
Running
on
Zero
Running
on
Zero
| """Test suite for conflict detector. | |
| Tests the ConflictDetector class for detecting semantic conflicts and contradictions. | |
| """ | |
| import time | |
| from typing import List | |
| import pytest | |
| from warbler_cda.conflict_detector import ( | |
| ConflictDetector, | |
| ConflictType, | |
| ConflictEvidence, | |
| StatementFingerprint | |
| ) | |
| # Mock embedding provider for testing | |
| class MockEmbeddingProvider: | |
| """Mock embedding provider for testing.""" | |
| def __init__(self): | |
| self.embed_text_calls = [] | |
| self.calculate_similarity_calls = [] | |
| def embed_text(self, text: str, *args, **kwargs) -> List[float]: | |
| """Mock text embedding.""" | |
| self.embed_text_calls.append(text) | |
| # Return a simple mock embedding based on text length | |
| return [len(text) / 100.0] * 384 | |
| def calculate_similarity(self, emb1: List[float], emb2: List[float]) -> float: | |
| """Mock similarity calculation.""" | |
| self.calculate_similarity_calls.append((emb1, emb2)) | |
| # Return similarity based on embedding values (0.0 to 1.0) | |
| # For testing semantic conflicts, return high similarity for texts of same length | |
| if abs(emb1[0] - emb2[0]) < 0.1: # Similar length texts | |
| return 0.95 # Very similar | |
| return 0.3 # Different | |
| class TestConflictDetector: | |
| """Test the ConflictDetector class.""" | |
| def setup_method(self): | |
| """Setup before each test.""" | |
| self.mock_provider = MockEmbeddingProvider() # pylint: disable=W0201 | |
| self.detector = ConflictDetector(embedding_provider=self.mock_provider) # pylint: disable=W0201 | |
| def test_initialization_default_config(self): | |
| """Test ConflictDetector initialization with default config.""" | |
| detector = ConflictDetector() | |
| assert detector.config == {} | |
| assert detector.opposition_threshold == 0.7 | |
| assert detector.semantic_similarity_threshold == 0.8 | |
| assert detector.min_confidence_score == 0.6 | |
| assert detector.max_statement_age_hours == 24 | |
| assert len(detector.statement_fingerprints) == 0 | |
| assert len(detector.detected_conflicts) == 0 | |
| assert len(detector.conflict_history) == 0 | |
| def test_initialization_custom_config(self): | |
| """Test ConflictDetector initialization with custom config.""" | |
| config = { | |
| "opposition_threshold": 0.8, | |
| "semantic_similarity_threshold": 0.9, | |
| "min_confidence_score": 0.7, | |
| "max_statement_age_hours": 48 | |
| } | |
| detector = ConflictDetector(config=config) | |
| assert detector.opposition_threshold == 0.8 | |
| assert detector.semantic_similarity_threshold == 0.9 | |
| assert detector.min_confidence_score == 0.7 | |
| assert detector.max_statement_age_hours == 48 | |
| def test_process_statements_empty_list(self): | |
| """Test processing empty statement list.""" | |
| result = self.detector.process_statements([]) | |
| assert result["statements_processed"] == 0 | |
| assert result["fingerprints_created"] == 0 | |
| assert result["new_conflicts"] == [] # pylint: disable=C1803 | |
| assert result["total_active_statements"] == 0 | |
| assert result["total_conflicts_detected"] == 0 | |
| def test_process_statements_single_statement(self): | |
| """Test processing a single statement.""" | |
| statements = [{"id": "stmt_1", "text": "This is a test statement about memory storage"}] | |
| result = self.detector.process_statements(statements) | |
| assert result["statements_processed"] == 1 | |
| assert result["fingerprints_created"] == 1 | |
| assert len(self.detector.statement_fingerprints) == 1 | |
| assert "stmt_1" in self.detector.statement_fingerprints | |
| fingerprint = self.detector.statement_fingerprints["stmt_1"] | |
| assert fingerprint.statement_id == "stmt_1" | |
| assert fingerprint.content == "This is a test statement about memory storage" | |
| assert fingerprint.domain_tags == {"memory"} | |
| assert "memory" in fingerprint.domain_tags | |
| def test_process_statements_without_ids(self): | |
| """Test processing statements without IDs.""" | |
| statements = [{"text": "First statement"}] | |
| result = self.detector.process_statements(statements) | |
| assert result["statements_processed"] == 1 | |
| assert result["fingerprints_created"] == 1 | |
| assert len(self.detector.statement_fingerprints) == 1 | |
| # Check that ID was generated | |
| stmt_id = list(self.detector.statement_fingerprints.keys())[0] | |
| assert stmt_id.startswith("stmt_") | |
| def test_process_statements_empty_content(self): | |
| """Test processing statements with empty content.""" | |
| statements = [ | |
| {"id": "stmt_1", "text": ""}, | |
| {"id": "stmt_2", "text": " "}, | |
| {"id": "stmt_3", "text": "Valid content"} | |
| ] | |
| result = self.detector.process_statements(statements) | |
| # Only the valid statement should be processed | |
| assert result["statements_processed"] == 3 | |
| assert result["fingerprints_created"] == 1 | |
| assert len(self.detector.statement_fingerprints) == 1 | |
| assert "stmt_3" in self.detector.statement_fingerprints | |
| def test_semantic_opposition_detection(self): | |
| """Test detection of semantic opposition with negation.""" | |
| # Use custom config with lower thresholds to enable conflict detection with mock | |
| config = { | |
| "opposition_threshold": 0.3, # Lower than default 0.7 to trigger with negation diff | |
| "semantic_similarity_threshold": 0.8, | |
| "min_confidence_score": 0.6 | |
| } | |
| detector = ConflictDetector(config=config, embedding_provider=self.mock_provider) | |
| # Add first statement without negation | |
| statements1 = [{"id": "stmt_1", "text": "This algorithm is correct and efficient"}] | |
| detector.process_statements(statements1) | |
| # Add opposing statement with negation | |
| statements2 = [{"id": "stmt_2", "text": "This algorithm is not correct and efficient"}] | |
| result = detector.process_statements(statements2) | |
| # Verify conflict detection works | |
| assert len(result["new_conflicts"]) == 1 | |
| conflict = result["new_conflicts"][0] | |
| assert conflict["statement_a"] == "stmt_2" | |
| assert conflict["statement_b"] == "stmt_1" | |
| assert conflict["conflict_type"] == "semantic_opposition" | |
| assert conflict["confidence_score"] >= 0.6 | |
| assert "not" in conflict["opposition_indicators"] | |
| # Verify that fingerprint creation worked and has proper negation detection | |
| assert "stmt_1" in detector.statement_fingerprints | |
| assert "stmt_2" in detector.statement_fingerprints | |
| fp1 = detector.statement_fingerprints["stmt_1"] | |
| fp2 = detector.statement_fingerprints["stmt_2"] | |
| assert len(fp1.negation_indicators) == 0 # No negation in first statement | |
| assert "not" in fp2.negation_indicators # "not" found in second statement | |
| # Verify mock was called | |
| assert len(self.mock_provider.calculate_similarity_calls) > 0 | |
| # Verify conflict is stored in detector | |
| assert len(detector.detected_conflicts) == 1 | |
| stored_conflict = detector.detected_conflicts[0] | |
| assert stored_conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION | |
| assert stored_conflict.confidence_score >= 0.6 | |
| def test_conflict_evidence_creation(self): | |
| """Test that conflict evidence is properly created.""" | |
| # Setup test data | |
| conflict = ConflictEvidence( | |
| statement_a_id="stmt_a", | |
| statement_b_id="stmt_b", | |
| conflict_type=ConflictType.SEMANTIC_OPPOSITION, | |
| confidence_score=0.85, | |
| semantic_distance=0.15, | |
| opposition_indicators=["not"], | |
| context_overlap=0.7, | |
| detection_timestamp=time.time() | |
| ) | |
| assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION | |
| assert conflict.confidence_score == 0.85 | |
| assert conflict.opposition_indicators == ["not"] | |
| assert conflict.get_age_seconds() >= 0 | |
| def test_temporal_conflict_detection(self): | |
| """Test detection of temporal conflicts.""" | |
| # Add statements with temporal markers | |
| statements1 = [{"id": "stmt_1", "text": "The algorithm will finish before tomorrow"}] | |
| self.detector.process_statements(statements1) | |
| statements2 = [{"id": "stmt_2", "text": "The algorithm will finish after later today"}] | |
| result = self.detector.process_statements(statements2) | |
| # Should detect temporal conflict | |
| assert len(result["new_conflicts"]) >= 0 # May not always trigger due | |
| # to similarity requirements | |
| def test_get_conflict_analysis_no_conflicts(self): | |
| """Test conflict analysis for statement with no conflicts.""" | |
| statements = [{"id": "stmt_1", "text": "This is a simple statement"}] | |
| self.detector.process_statements(statements) | |
| analysis = self.detector.get_conflict_analysis("stmt_1") | |
| assert analysis["conflicts_found"] == 0 | |
| assert analysis["status"] == "no_conflicts" | |
| assert "consistent" in analysis["recommendation"] | |
| def test_get_global_conflict_summary(self): | |
| """Test global conflict summary generation.""" | |
| # Start with no conflicts | |
| summary = self.detector.get_global_conflict_summary() | |
| assert summary["total_conflicts"] == 0 | |
| assert summary["status"] == "healthy" | |
| assert summary["system_health_score"] == 1.0 | |
| # Add some statements and conflicts | |
| statements1 = [{"id": "stmt_1", "text": "This is definitely correct"}] | |
| self.detector.process_statements(statements1) | |
| # Create conflict manually for testing | |
| conflict = ConflictEvidence( | |
| statement_a_id="stmt_1", | |
| statement_b_id="stmt_2", | |
| conflict_type=ConflictType.SEMANTIC_OPPOSITION, | |
| confidence_score=0.95, | |
| semantic_distance=0.05, | |
| opposition_indicators=["not"], | |
| context_overlap=0.5, | |
| detection_timestamp=time.time() | |
| ) | |
| self.detector.detected_conflicts.append(conflict) | |
| summary = self.detector.get_global_conflict_summary() | |
| assert summary["total_conflicts"] == 1 | |
| assert summary["confidence_distribution"]["high"] == 1 | |
| assert summary["status"] == "healthy" # One conflict doesn't trigger warning (>2 needed) | |
| # Actually assert healthy as per the comment above | |
| def test_resolve_conflict_success(self): | |
| """Test successful conflict resolution.""" | |
| # Create a conflict | |
| conflict = ConflictEvidence( | |
| statement_a_id="stmt_1", | |
| statement_b_id="stmt_2", | |
| conflict_type=ConflictType.SEMANTIC_OPPOSITION, | |
| confidence_score=0.8, | |
| semantic_distance=0.2, | |
| opposition_indicators=["not"], | |
| context_overlap=0.5, | |
| detection_timestamp=time.time() | |
| ) | |
| self.detector.detected_conflicts.append(conflict) | |
| # Generate conflict ID and resolve | |
| conflict_id = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212 | |
| resolved = self.detector.resolve_conflict(conflict_id, "User confirmed resolution") | |
| assert resolved is True | |
| assert len(self.detector.detected_conflicts) == 0 | |
| assert len(self.detector.conflict_history) == 1 | |
| assert self.detector.metrics["false_positives_resolved"] == 1 | |
| def test_resolve_conflict_not_found(self): | |
| """Test conflict resolution when conflict ID doesn't exist.""" | |
| resolved = self.detector.resolve_conflict("nonexistent_id", "Test resolution") | |
| assert resolved is False | |
| def test_domain_tag_extraction(self): | |
| """Test that domain tags are correctly extracted from statements.""" | |
| # Test memory domain | |
| statements = [{"id": "stmt_1", "text": "The storage memory needs optimization"}] | |
| self.detector.process_statements(statements) | |
| fingerprint = self.detector.statement_fingerprints["stmt_1"] | |
| assert "memory" in fingerprint.domain_tags | |
| # Test development domain | |
| statements2 = [{"id": "stmt_2", "text": "Debug the development process"}] | |
| self.detector.process_statements(statements2) | |
| fingerprint2 = self.detector.statement_fingerprints["stmt_2"] | |
| assert "development" in fingerprint2.domain_tags | |
| def test_assertion_strength_calculation(self): | |
| """Test that assertion strength is correctly calculated.""" | |
| # Statement with multiple assertion words - should reach max strength | |
| statements = [{"id": "stmt_1", | |
| "text": "This is definitely always absolutely certainly and must be guaranteed"}] | |
| self.detector.process_statements(statements) | |
| fingerprint = self.detector.statement_fingerprints["stmt_1"] | |
| # Should have max assertion strength due to multiple indicators | |
| assert fingerprint.assertion_strength == 1.0 | |
| # Statement with weak assertions | |
| statements2 = [{"id": "stmt_2", "text": "This might be okay"}] | |
| self.detector.process_statements(statements2) | |
| fingerprint2 = self.detector.statement_fingerprints["stmt_2"] | |
| # Should have lower assertion strength | |
| assert fingerprint2.assertion_strength < 1.0 | |
| def test_negation_indicator_detection(self): | |
| """Test that negation indicators are correctly detected.""" | |
| statements = [{"id": "stmt_1", "text": "This is not wrong or incorrect"}] | |
| self.detector.process_statements(statements) | |
| fingerprint = self.detector.statement_fingerprints["stmt_1"] | |
| assert "not" in fingerprint.negation_indicators | |
| assert "incorrect" in fingerprint.negation_indicators | |
| def test_fingerprint_creation_without_embedding_provider(self): | |
| """Test fingerprint creation when no embedding provider is available.""" | |
| detector = ConflictDetector() # No embedding provider | |
| statements = [{"id": "stmt_1", "text": "Test statement without embeddings"}] | |
| result = detector.process_statements(statements) | |
| assert result["fingerprints_created"] == 1 | |
| fingerprint = detector.statement_fingerprints["stmt_1"] | |
| assert fingerprint.embedding == [] # Should be empty when no provider | |
| def test_metrics_update(self): | |
| """Test that metrics are correctly updated during processing.""" | |
| initial_statements = self.detector.metrics["statements_processed"] | |
| statements = [{"id": "stmt_1", "text": "First statement"}] | |
| self.detector.process_statements(statements) | |
| assert self.detector.metrics["statements_processed"] == initial_statements + 1 | |
| def test_conflict_type_enum_values(self): | |
| """Test that conflict type enum has correct values.""" | |
| assert ConflictType.SEMANTIC_OPPOSITION.value == "semantic_opposition" | |
| assert ConflictType.LOGICAL_CONTRADICTION.value == "logical_contradiction" | |
| assert ConflictType.FACTUAL_INCONSISTENCY.value == "factual_inconsistency" | |
| assert ConflictType.TEMPORAL_CONFLICT.value == "temporal_conflict" | |
| assert ConflictType.SCOPE_MISMATCH.value == "scope_mismatch" | |
| def test_conflict_id_generation(self): | |
| """Test that conflict IDs are consistently generated.""" | |
| conflict = ConflictEvidence( | |
| statement_a_id="stmt_a", | |
| statement_b_id="stmt_b", | |
| conflict_type=ConflictType.SEMANTIC_OPPOSITION, | |
| confidence_score=0.8, | |
| semantic_distance=0.2, | |
| opposition_indicators=[], | |
| context_overlap=0.5, | |
| detection_timestamp=1000000.0 | |
| ) | |
| id1 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212 | |
| id2 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212 | |
| # Same conflict should generate same ID | |
| assert id1 == id2 | |
| assert len(id1) == 12 # MD5 hash truncated to 12 chars | |
| class TestStatementFingerprint: | |
| """Test the StatementFingerprint dataclass.""" | |
| def test_fingerprint_creation(self): | |
| """Test basic fingerprint creation.""" | |
| from typing import Set # pylint: disable=W0201 w0611 C0415 # noqa: F401 | |
| fingerprint = StatementFingerprint( | |
| statement_id="test_123", | |
| content="This is test content about semantic processing", | |
| embedding=[0.1, 0.2, 0.3], | |
| negation_indicators=["not"], | |
| assertion_strength=0.8, | |
| temporal_markers=["before"], | |
| domain_tags={"semantics", "processing"}, | |
| creation_timestamp=1234567890.0 | |
| ) | |
| assert fingerprint.statement_id == "test_123" | |
| assert fingerprint.content == "This is test content about semantic processing" | |
| assert fingerprint.embedding == [0.1, 0.2, 0.3] | |
| assert fingerprint.negation_indicators == ["not"] | |
| assert fingerprint.assertion_strength == 0.8 | |
| assert fingerprint.temporal_markers == ["before"] | |
| assert "semantics" in fingerprint.domain_tags | |
| assert "processing" in fingerprint.domain_tags | |
| def test_fingerprint_equality(self): | |
| """Test fingerprint equality comparison.""" | |
| fp1 = StatementFingerprint( | |
| statement_id="id1", content="content", embedding=[], | |
| negation_indicators=[], assertion_strength=0.5, | |
| temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0 | |
| ) | |
| fp2 = StatementFingerprint( | |
| statement_id="id1", content="content", embedding=[], | |
| negation_indicators=[], assertion_strength=0.5, | |
| temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0 | |
| ) | |
| assert fp1 == fp2 | |
| class TestConflictEvidence: | |
| """Test the ConflictEvidence dataclass.""" | |
| def test_conflict_evidence_creation(self): | |
| """Test basic conflict evidence creation.""" | |
| conflict = ConflictEvidence( | |
| statement_a_id="stmt_1", | |
| statement_b_id="stmt_2", | |
| conflict_type=ConflictType.SEMANTIC_OPPOSITION, | |
| confidence_score=0.75, | |
| semantic_distance=0.25, | |
| opposition_indicators=["not", "incorrect"], | |
| context_overlap=0.8, | |
| detection_timestamp=1234567890.0 | |
| ) | |
| assert conflict.statement_a_id == "stmt_1" | |
| assert conflict.statement_b_id == "stmt_2" | |
| assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION | |
| assert conflict.confidence_score == 0.75 | |
| assert conflict.semantic_distance == 0.25 | |
| assert conflict.opposition_indicators == ["not", "incorrect"] | |
| assert conflict.context_overlap == 0.8 | |
| # Test age calculation (will be small since timestamp is old) | |
| age = conflict.get_age_seconds() | |
| assert age > 0 | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) | |