"""Unit tests for Pinecone client.""" from __future__ import annotations import tempfile from datetime import datetime, timezone from pathlib import Path from unittest.mock import MagicMock, patch import pytest from pinecone.exceptions import PineconeException from tools.pinecone_client import PineconeClient from tools.pinecone_models import PineconeRecord class TestUploadTracking: """Tests for upload tracking marker file operations.""" def test_is_uploaded_returns_false_when_marker_missing(self): """is_uploaded() returns False when marker file doesn't exist.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) assert PineconeClient.is_uploaded(set_dir) is False def test_is_uploaded_returns_true_when_marker_exists(self): """is_uploaded() returns True when marker file exists.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) marker_file = set_dir / ".pinecone_uploaded" marker_file.write_text("2025-01-15T14:30:00Z") assert PineconeClient.is_uploaded(set_dir) is True def test_mark_uploaded_creates_marker_file(self): """mark_uploaded() creates marker file with ISO 8601 timestamp.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) marker_file = set_dir / ".pinecone_uploaded" assert not marker_file.exists() PineconeClient.mark_uploaded(set_dir) assert marker_file.exists() # Verify timestamp format timestamp = marker_file.read_text(encoding="utf-8").strip() # Should be valid ISO 8601 format datetime.fromisoformat(timestamp.replace("Z", "+00:00")) def test_mark_uploaded_writes_utc_timestamp(self): """mark_uploaded() writes UTC timestamp in ISO 8601 format.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) PineconeClient.mark_uploaded(set_dir) marker_file = set_dir / ".pinecone_uploaded" timestamp_str = marker_file.read_text(encoding="utf-8").strip() # Parse and verify it's UTC if timestamp_str.endswith("Z"): timestamp_str = timestamp_str[:-1] + "+00:00" parsed = datetime.fromisoformat(timestamp_str) assert parsed.tzinfo == timezone.utc def test_get_upload_timestamp_returns_none_when_marker_missing(self): """get_upload_timestamp() returns None when marker file doesn't exist.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) assert PineconeClient.get_upload_timestamp(set_dir) is None def test_get_upload_timestamp_returns_timestamp_when_marker_exists(self): """get_upload_timestamp() returns timestamp string when marker exists.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) expected_timestamp = "2025-01-15T14:30:00Z" marker_file = set_dir / ".pinecone_uploaded" marker_file.write_text(expected_timestamp) result = PineconeClient.get_upload_timestamp(set_dir) assert result == expected_timestamp def test_get_upload_timestamp_handles_read_error(self): """get_upload_timestamp() returns None if marker file can't be read.""" with tempfile.TemporaryDirectory() as tmpdir: set_dir = Path(tmpdir) marker_file = set_dir / ".pinecone_uploaded" marker_file.write_text("test") # Make file unreadable (on Unix systems) marker_file.chmod(0o000) try: result = PineconeClient.get_upload_timestamp(set_dir) # Should return None or handle gracefully assert result is None or isinstance(result, str) finally: # Restore permissions for cleanup marker_file.chmod(0o644) class TestPineconeClientCore: """Tests for core Pinecone client functionality.""" @patch("tools.pinecone_client.Pinecone") @patch("tools.pinecone_client.get_settings") def test_init_raises_error_when_api_key_missing(self, mock_get_settings): """__init__() raises ValueError when API key is not set.""" mock_settings = MagicMock() mock_settings.pinecone_api_key = "" mock_get_settings.return_value = mock_settings with pytest.raises(ValueError, match="PINECONE_API_KEY"): PineconeClient() @patch("tools.pinecone_client.Pinecone") @patch("tools.pinecone_client.get_settings") def test_init_initializes_pinecone_client(self, mock_get_settings): """__init__() initializes Pinecone SDK with API key.""" mock_settings = MagicMock() mock_settings.pinecone_api_key = "test-api-key" mock_settings.pinecone_index_name = "test-index" mock_settings.pinecone_namespace = "test-namespace" mock_get_settings.return_value = mock_settings mock_pc = MagicMock() mock_pc.Index.return_value = MagicMock() mock_pc.has_index.return_value = True mock_pinecone_class = MagicMock(return_value=mock_pc) with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): client = PineconeClient() assert client.pc == mock_pc assert client.index_name == "test-index" assert client.namespace == "test-namespace" @patch("tools.pinecone_client.Pinecone") @patch("tools.pinecone_client.get_settings") def test_validate_index_raises_error_when_index_missing(self, mock_get_settings): """validate_index() raises ValueError when index doesn't exist.""" mock_settings = MagicMock() mock_settings.pinecone_api_key = "test-api-key" mock_settings.pinecone_index_name = "missing-index" mock_get_settings.return_value = mock_settings mock_pc = MagicMock() mock_pc.has_index.return_value = False mock_pinecone_class = MagicMock(return_value=mock_pc) with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): client = PineconeClient() with pytest.raises(ValueError, match="Index 'missing-index' not found"): client.validate_index() @patch("tools.pinecone_client.Pinecone") @patch("tools.pinecone_client.get_settings") def test_validate_index_succeeds_when_index_exists(self, mock_get_settings): """validate_index() succeeds when index exists.""" mock_settings = MagicMock() mock_settings.pinecone_api_key = "test-api-key" mock_settings.pinecone_index_name = "existing-index" mock_get_settings.return_value = mock_settings mock_pc = MagicMock() mock_pc.has_index.return_value = True mock_pinecone_class = MagicMock(return_value=mock_pc) with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): client = PineconeClient() # Should not raise client.validate_index() def test_exponential_backoff_retry_succeeds_on_first_attempt(self): """exponential_backoff_retry() succeeds when function succeeds immediately.""" func = MagicMock(return_value="success") result = PineconeClient.exponential_backoff_retry(func) assert result == "success" func.assert_called_once() @patch("tools.pinecone_client.time.sleep") def test_exponential_backoff_retry_retries_on_429(self, mock_sleep): """exponential_backoff_retry() retries on 429 rate limit errors.""" error_429 = PineconeException("Rate limited") error_429.status = 429 func = MagicMock(side_effect=[error_429, "success"]) result = PineconeClient.exponential_backoff_retry(func, max_retries=2) assert result == "success" assert func.call_count == 2 mock_sleep.assert_called_once_with(1) # 2^0 = 1 @patch("tools.pinecone_client.time.sleep") def test_exponential_backoff_retry_retries_on_5xx(self, mock_sleep): """exponential_backoff_retry() retries on 5xx server errors.""" error_500 = PineconeException("Server error") error_500.status = 500 func = MagicMock(side_effect=[error_500, "success"]) result = PineconeClient.exponential_backoff_retry(func, max_retries=2) assert result == "success" assert func.call_count == 2 mock_sleep.assert_called_once_with(1) def test_exponential_backoff_retry_fails_on_4xx(self): """exponential_backoff_retry() fails immediately on 4xx client errors.""" error_400 = PineconeException("Bad request") error_400.status = 400 func = MagicMock(side_effect=error_400) with pytest.raises(PineconeException): PineconeClient.exponential_backoff_retry(func, max_retries=3) # Should only try once (no retries for 4xx) assert func.call_count == 1 @patch("tools.pinecone_client.time.sleep") def test_exponential_backoff_retry_caps_delay_at_60s(self, mock_sleep): """exponential_backoff_retry() caps delay at 60 seconds.""" error_500 = PineconeException("Server error") error_500.status = 500 func = MagicMock(side_effect=[error_500, error_500, "success"]) result = PineconeClient.exponential_backoff_retry(func, max_retries=3) assert result == "success" # First retry: 2^0 = 1s, second retry: min(2^1, 60) = 2s assert mock_sleep.call_count == 2 mock_sleep.assert_any_call(1) mock_sleep.assert_any_call(2) def test_record_to_dict_omits_none_optional_fields(self): """_record_to_dict() omits None values for optional fields.""" record = PineconeRecord( _id="test-id", content="test content", standard_set_id="set-id", standard_set_title="Test Set", subject="Math", education_levels=["01"], document_id="doc-id", document_valid="2021", jurisdiction_id="jur-id", jurisdiction_title="Test Jurisdiction", depth=0, is_leaf=True, is_root=True, root_id="test-id", ancestor_ids=[], child_ids=[], sibling_count=0, # Optional fields set to None normalized_subject=None, publication_status=None, asn_identifier=None, statement_notation=None, statement_label=None, parent_id=None, ) record_dict = PineconeClient._record_to_dict(record) # Verify _id is serialized (not id) assert "_id" in record_dict assert record_dict["_id"] == "test-id" assert "id" not in record_dict # Optional fields should be omitted assert "asn_identifier" not in record_dict assert "statement_notation" not in record_dict assert "statement_label" not in record_dict assert "normalized_subject" not in record_dict assert "publication_status" not in record_dict # parent_id should be present as null assert "parent_id" in record_dict assert record_dict["parent_id"] is None def test_record_to_dict_includes_present_optional_fields(self): """_record_to_dict() includes optional fields when they have values.""" record = PineconeRecord( _id="test-id", content="test content", standard_set_id="set-id", standard_set_title="Test Set", subject="Math", normalized_subject="Math", education_levels=["01"], document_id="doc-id", document_valid="2021", publication_status="Published", jurisdiction_id="jur-id", jurisdiction_title="Test Jurisdiction", asn_identifier="ASN123", statement_notation="1.2.3", statement_label="Standard", depth=1, is_leaf=True, is_root=False, parent_id="parent-id", root_id="root-id", ancestor_ids=["root-id"], child_ids=[], sibling_count=0, ) record_dict = PineconeClient._record_to_dict(record) # Verify _id is serialized (not id) assert "_id" in record_dict assert record_dict["_id"] == "test-id" assert "id" not in record_dict # Optional fields should be included when present assert record_dict["asn_identifier"] == "ASN123" assert record_dict["statement_notation"] == "1.2.3" assert record_dict["statement_label"] == "Standard" assert record_dict["normalized_subject"] == "Math" assert record_dict["publication_status"] == "Published" assert record_dict["parent_id"] == "parent-id"