Spaces:
Running
Phase 2 Implementation Spec: Search Vertical Slice
Goal: Implement the "Eyes and Ears" of the agent β retrieving real biomedical data. Philosophy: "Real data, mocked connections." Prerequisite: Phase 1 complete (all tests passing)
β οΈ Implementation Note (2025-01-27): The DuckDuckGo WebTool specified in this phase was removed in favor of the Europe PMC tool (see Phase 11). Europe PMC provides better coverage for biomedical research by including preprints, peer-reviewed articles, and patents. The current implementation uses PubMed, ClinicalTrials.gov, and Europe PMC as search sources.
1. The Slice Definition
This slice covers:
- Input: A string query (e.g., "metformin Alzheimer's disease").
- Process:
- Fetch from PubMed (E-utilities API).
Fetch from Web (DuckDuckGo).REMOVED - Replaced by Europe PMC in Phase 11- Normalize results into
Evidencemodels.
- Output: A list of
Evidenceobjects.
Files to Create:
src/utils/models.py- Pydantic models (Evidence, Citation, SearchResult)src/tools/pubmed.py- PubMed E-utilities toolREMOVED - See Phase 11 for Europe PMC replacementsrc/tools/websearch.py- DuckDuckGo search toolsrc/tools/search_handler.py- Orchestrates multiple toolssrc/tools/__init__.py- Exports
Additional Files (Post-Phase 2 Enhancements):
src/tools/query_utils.py- Query preprocessing (removes question words, expands medical synonyms)
2. PubMed E-utilities API Reference
Base URL: https://eutils.ncbi.nlm.nih.gov/entrez/eutils/
Key Endpoints
| Endpoint | Purpose | Example |
|---|---|---|
esearch.fcgi |
Search for article IDs | ?db=pubmed&term=metformin+alzheimer&retmax=10 |
efetch.fcgi |
Fetch article details | ?db=pubmed&id=12345,67890&rettype=abstract&retmode=xml |
Rate Limiting (CRITICAL!)
NCBI requires rate limiting:
- Without API key: 3 requests/second
- With API key: 10 requests/second
Get a free API key: https://www.ncbi.nlm.nih.gov/account/settings/
# Add to .env
NCBI_API_KEY=your-key-here # Optional but recommended
Example Search Flow
1. esearch: "metformin alzheimer" β [PMID: 12345, 67890, ...]
2. efetch: PMIDs β Full abstracts/metadata
3. Parse XML β Evidence objects
3. Models (src/utils/models.py)
"""Data models for the Search feature."""
from pydantic import BaseModel, Field
from typing import Literal
class Citation(BaseModel):
"""A citation to a source document."""
source: Literal["pubmed", "web"] = Field(description="Where this came from")
title: str = Field(min_length=1, max_length=500)
url: str = Field(description="URL to the source")
date: str = Field(description="Publication date (YYYY-MM-DD or 'Unknown')")
authors: list[str] = Field(default_factory=list)
@property
def formatted(self) -> str:
"""Format as a citation string."""
author_str = ", ".join(self.authors[:3])
if len(self.authors) > 3:
author_str += " et al."
return f"{author_str} ({self.date}). {self.title}. {self.source.upper()}"
class Evidence(BaseModel):
"""A piece of evidence retrieved from search."""
content: str = Field(min_length=1, description="The actual text content")
citation: Citation
relevance: float = Field(default=0.0, ge=0.0, le=1.0, description="Relevance score 0-1")
class Config:
frozen = True # Immutable after creation
class SearchResult(BaseModel):
"""Result of a search operation."""
query: str
evidence: list[Evidence]
sources_searched: list[Literal["pubmed", "web"]]
total_found: int
errors: list[str] = Field(default_factory=list)
4. Tool Protocol (src/tools/pubmed.py and src/tools/websearch.py)
The Interface (Protocol) - Add to src/tools/__init__.py
"""Search tools package."""
from typing import Protocol, List
# Import implementations
from src.tools.pubmed import PubMedTool
from src.tools.websearch import WebTool
from src.tools.search_handler import SearchHandler
# Re-export
__all__ = ["SearchTool", "PubMedTool", "WebTool", "SearchHandler"]
class SearchTool(Protocol):
"""Protocol defining the interface for all search tools."""
@property
def name(self) -> str:
"""Human-readable name of this tool."""
...
async def search(self, query: str, max_results: int = 10) -> List["Evidence"]:
"""
Execute a search and return evidence.
Args:
query: The search query string
max_results: Maximum number of results to return
Returns:
List of Evidence objects
Raises:
SearchError: If the search fails
RateLimitError: If we hit rate limits
"""
...
PubMed Tool Implementation (src/tools/pubmed.py)
"""PubMed search tool using NCBI E-utilities."""
import asyncio
import httpx
import xmltodict
from typing import List
from tenacity import retry, stop_after_attempt, wait_exponential
from src.utils.config import settings
from src.utils.exceptions import SearchError, RateLimitError
from src.utils.models import Evidence, Citation
class PubMedTool:
"""Search tool for PubMed/NCBI."""
BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
RATE_LIMIT_DELAY = 0.34 # ~3 requests/sec without API key
def __init__(self, api_key: str | None = None):
self.api_key = api_key or getattr(settings, "ncbi_api_key", None)
self._last_request_time = 0.0
@property
def name(self) -> str:
return "pubmed"
async def _rate_limit(self) -> None:
"""Enforce NCBI rate limiting."""
now = asyncio.get_event_loop().time()
elapsed = now - self._last_request_time
if elapsed < self.RATE_LIMIT_DELAY:
await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed)
self._last_request_time = asyncio.get_event_loop().time()
def _build_params(self, **kwargs) -> dict:
"""Build request params with optional API key."""
params = {**kwargs, "retmode": "json"}
if self.api_key:
params["api_key"] = self.api_key
return params
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
"""
Search PubMed and return evidence.
1. ESearch: Get PMIDs matching query
2. EFetch: Get abstracts for those PMIDs
3. Parse and return Evidence objects
"""
await self._rate_limit()
async with httpx.AsyncClient(timeout=30.0) as client:
# Step 1: Search for PMIDs
search_params = self._build_params(
db="pubmed",
term=query,
retmax=max_results,
sort="relevance",
)
try:
search_resp = await client.get(
f"{self.BASE_URL}/esearch.fcgi",
params=search_params,
)
search_resp.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
raise RateLimitError("PubMed rate limit exceeded")
raise SearchError(f"PubMed search failed: {e}")
search_data = search_resp.json()
pmids = search_data.get("esearchresult", {}).get("idlist", [])
if not pmids:
return []
# Step 2: Fetch abstracts
await self._rate_limit()
fetch_params = self._build_params(
db="pubmed",
id=",".join(pmids),
rettype="abstract",
)
# Use XML for fetch (more reliable parsing)
fetch_params["retmode"] = "xml"
fetch_resp = await client.get(
f"{self.BASE_URL}/efetch.fcgi",
params=fetch_params,
)
fetch_resp.raise_for_status()
# Step 3: Parse XML to Evidence
return self._parse_pubmed_xml(fetch_resp.text)
def _parse_pubmed_xml(self, xml_text: str) -> List[Evidence]:
"""Parse PubMed XML into Evidence objects."""
try:
data = xmltodict.parse(xml_text)
except Exception as e:
raise SearchError(f"Failed to parse PubMed XML: {e}")
articles = data.get("PubmedArticleSet", {}).get("PubmedArticle", [])
# Handle single article (xmltodict returns dict instead of list)
if isinstance(articles, dict):
articles = [articles]
evidence_list = []
for article in articles:
try:
evidence = self._article_to_evidence(article)
if evidence:
evidence_list.append(evidence)
except Exception:
continue # Skip malformed articles
return evidence_list
def _article_to_evidence(self, article: dict) -> Evidence | None:
"""Convert a single PubMed article to Evidence."""
medline = article.get("MedlineCitation", {})
article_data = medline.get("Article", {})
# Extract PMID
pmid = medline.get("PMID", {})
if isinstance(pmid, dict):
pmid = pmid.get("#text", "")
# Extract title
title = article_data.get("ArticleTitle", "")
if isinstance(title, dict):
title = title.get("#text", str(title))
# Extract abstract
abstract_data = article_data.get("Abstract", {}).get("AbstractText", "")
if isinstance(abstract_data, list):
abstract = " ".join(
item.get("#text", str(item)) if isinstance(item, dict) else str(item)
for item in abstract_data
)
elif isinstance(abstract_data, dict):
abstract = abstract_data.get("#text", str(abstract_data))
else:
abstract = str(abstract_data)
if not abstract or not title:
return None
# Extract date
pub_date = article_data.get("Journal", {}).get("JournalIssue", {}).get("PubDate", {})
year = pub_date.get("Year", "Unknown")
month = pub_date.get("Month", "01")
day = pub_date.get("Day", "01")
date_str = f"{year}-{month}-{day}" if year != "Unknown" else "Unknown"
# Extract authors
author_list = article_data.get("AuthorList", {}).get("Author", [])
if isinstance(author_list, dict):
author_list = [author_list]
authors = []
for author in author_list[:5]: # Limit to 5 authors
last = author.get("LastName", "")
first = author.get("ForeName", "")
if last:
authors.append(f"{last} {first}".strip())
return Evidence(
content=abstract[:2000], # Truncate long abstracts
citation=Citation(
source="pubmed",
title=title[:500],
url=f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/",
date=date_str,
authors=authors,
),
)
DuckDuckGo Tool Implementation (src/tools/websearch.py)
"""Web search tool using DuckDuckGo."""
from typing import List
from duckduckgo_search import DDGS
from src.utils.exceptions import SearchError
from src.utils.models import Evidence, Citation
class WebTool:
"""Search tool for general web search via DuckDuckGo."""
def __init__(self):
pass
@property
def name(self) -> str:
return "web"
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
"""
Search DuckDuckGo and return evidence.
Note: duckduckgo-search is synchronous, so we run it in executor.
"""
import asyncio
loop = asyncio.get_event_loop()
try:
results = await loop.run_in_executor(
None,
lambda: self._sync_search(query, max_results),
)
return results
except Exception as e:
raise SearchError(f"Web search failed: {e}")
def _sync_search(self, query: str, max_results: int) -> List[Evidence]:
"""Synchronous search implementation."""
evidence_list = []
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
for result in results:
evidence_list.append(
Evidence(
content=result.get("body", "")[:1000],
citation=Citation(
source="web",
title=result.get("title", "Unknown")[:500],
url=result.get("href", ""),
date="Unknown",
authors=[],
),
)
)
return evidence_list
5. Search Handler (src/tools/search_handler.py)
The handler orchestrates multiple tools using the Scatter-Gather pattern.
"""Search handler - orchestrates multiple search tools."""
import asyncio
from typing import List, Protocol
import structlog
from src.utils.exceptions import SearchError
from src.utils.models import Evidence, SearchResult
logger = structlog.get_logger()
class SearchTool(Protocol):
"""Protocol defining the interface for all search tools."""
@property
def name(self) -> str:
...
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
...
def flatten(nested: List[List[Evidence]]) -> List[Evidence]:
"""Flatten a list of lists into a single list."""
return [item for sublist in nested for item in sublist]
class SearchHandler:
"""Orchestrates parallel searches across multiple tools."""
def __init__(self, tools: List[SearchTool], timeout: float = 30.0):
"""
Initialize the search handler.
Args:
tools: List of search tools to use
timeout: Timeout for each search in seconds
"""
self.tools = tools
self.timeout = timeout
async def execute(self, query: str, max_results_per_tool: int = 10) -> SearchResult:
"""
Execute search across all tools in parallel.
Args:
query: The search query
max_results_per_tool: Max results from each tool
Returns:
SearchResult containing all evidence and metadata
"""
logger.info("Starting search", query=query, tools=[t.name for t in self.tools])
# Create tasks for parallel execution
tasks = [
self._search_with_timeout(tool, query, max_results_per_tool)
for tool in self.tools
]
# Gather results (don't fail if one tool fails)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
all_evidence: List[Evidence] = []
sources_searched: List[str] = []
errors: List[str] = []
for tool, result in zip(self.tools, results):
if isinstance(result, Exception):
errors.append(f"{tool.name}: {str(result)}")
logger.warning("Search tool failed", tool=tool.name, error=str(result))
else:
all_evidence.extend(result)
sources_searched.append(tool.name)
logger.info("Search tool succeeded", tool=tool.name, count=len(result))
return SearchResult(
query=query,
evidence=all_evidence,
sources_searched=sources_searched,
total_found=len(all_evidence),
errors=errors,
)
async def _search_with_timeout(
self,
tool: SearchTool,
query: str,
max_results: int,
) -> List[Evidence]:
"""Execute a single tool search with timeout."""
try:
return await asyncio.wait_for(
tool.search(query, max_results),
timeout=self.timeout,
)
except asyncio.TimeoutError:
raise SearchError(f"{tool.name} search timed out after {self.timeout}s")
6. TDD Workflow
Test File: tests/unit/tools/test_pubmed.py
"""Unit tests for PubMed tool."""
import pytest
from unittest.mock import AsyncMock, MagicMock
# Sample PubMed XML response for mocking
SAMPLE_PUBMED_XML = """<?xml version="1.0" ?>
<PubmedArticleSet>
<PubmedArticle>
<MedlineCitation>
<PMID>12345678</PMID>
<Article>
<ArticleTitle>Metformin in Alzheimer's Disease: A Systematic Review</ArticleTitle>
<Abstract>
<AbstractText>Metformin shows neuroprotective properties...</AbstractText>
</Abstract>
<AuthorList>
<Author>
<LastName>Smith</LastName>
<ForeName>John</ForeName>
</Author>
</AuthorList>
<Journal>
<JournalIssue>
<PubDate>
<Year>2024</Year>
<Month>01</Month>
</PubDate>
</JournalIssue>
</Journal>
</Article>
</MedlineCitation>
</PubmedArticle>
</PubmedArticleSet>
"""
class TestPubMedTool:
"""Tests for PubMedTool."""
@pytest.mark.asyncio
async def test_search_returns_evidence(self, mocker):
"""PubMedTool should return Evidence objects from search."""
from src.tools.pubmed import PubMedTool
# Mock the HTTP responses
mock_search_response = MagicMock()
mock_search_response.json.return_value = {
"esearchresult": {"idlist": ["12345678"]}
}
mock_search_response.raise_for_status = MagicMock()
mock_fetch_response = MagicMock()
mock_fetch_response.text = SAMPLE_PUBMED_XML
mock_fetch_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[mock_search_response, mock_fetch_response])
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
# Act
tool = PubMedTool()
results = await tool.search("metformin alzheimer")
# Assert
assert len(results) == 1
assert results[0].citation.source == "pubmed"
assert "Metformin" in results[0].citation.title
assert "12345678" in results[0].citation.url
@pytest.mark.asyncio
async def test_search_empty_results(self, mocker):
"""PubMedTool should return empty list when no results."""
from src.tools.pubmed import PubMedTool
mock_response = MagicMock()
mock_response.json.return_value = {"esearchresult": {"idlist": []}}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
tool = PubMedTool()
results = await tool.search("xyznonexistentquery123")
assert results == []
def test_parse_pubmed_xml(self):
"""PubMedTool should correctly parse XML."""
from src.tools.pubmed import PubMedTool
tool = PubMedTool()
results = tool._parse_pubmed_xml(SAMPLE_PUBMED_XML)
assert len(results) == 1
assert results[0].citation.source == "pubmed"
assert "Smith John" in results[0].citation.authors
Test File: tests/unit/tools/test_websearch.py
"""Unit tests for WebTool."""
import pytest
from unittest.mock import MagicMock
class TestWebTool:
"""Tests for WebTool."""
@pytest.mark.asyncio
async def test_search_returns_evidence(self, mocker):
"""WebTool should return Evidence objects from search."""
from src.tools.websearch import WebTool
mock_results = [
{
"title": "Drug Repurposing Article",
"href": "https://example.com/article",
"body": "Some content about drug repurposing...",
}
]
mock_ddgs = MagicMock()
mock_ddgs.__enter__ = MagicMock(return_value=mock_ddgs)
mock_ddgs.__exit__ = MagicMock(return_value=None)
mock_ddgs.text = MagicMock(return_value=mock_results)
mocker.patch("src.tools.websearch.DDGS", return_value=mock_ddgs)
tool = WebTool()
results = await tool.search("drug repurposing")
assert len(results) == 1
assert results[0].citation.source == "web"
assert "Drug Repurposing" in results[0].citation.title
Test File: tests/unit/tools/test_search_handler.py
"""Unit tests for SearchHandler."""
import pytest
from unittest.mock import AsyncMock
from src.utils.models import Evidence, Citation
from src.utils.exceptions import SearchError
class TestSearchHandler:
"""Tests for SearchHandler."""
@pytest.mark.asyncio
async def test_execute_aggregates_results(self):
"""SearchHandler should aggregate results from all tools."""
from src.tools.search_handler import SearchHandler
# Create mock tools
mock_tool_1 = AsyncMock()
mock_tool_1.name = "mock1"
mock_tool_1.search = AsyncMock(return_value=[
Evidence(
content="Result 1",
citation=Citation(source="pubmed", title="T1", url="u1", date="2024"),
)
])
mock_tool_2 = AsyncMock()
mock_tool_2.name = "mock2"
mock_tool_2.search = AsyncMock(return_value=[
Evidence(
content="Result 2",
citation=Citation(source="web", title="T2", url="u2", date="2024"),
)
])
handler = SearchHandler(tools=[mock_tool_1, mock_tool_2])
result = await handler.execute("test query")
assert result.total_found == 2
assert "mock1" in result.sources_searched
assert "mock2" in result.sources_searched
assert len(result.errors) == 0
@pytest.mark.asyncio
async def test_execute_handles_tool_failure(self):
"""SearchHandler should continue if one tool fails."""
from src.tools.search_handler import SearchHandler
mock_tool_ok = AsyncMock()
mock_tool_ok.name = "ok_tool"
mock_tool_ok.search = AsyncMock(return_value=[
Evidence(
content="Good result",
citation=Citation(source="pubmed", title="T", url="u", date="2024"),
)
])
mock_tool_fail = AsyncMock()
mock_tool_fail.name = "fail_tool"
mock_tool_fail.search = AsyncMock(side_effect=SearchError("API down"))
handler = SearchHandler(tools=[mock_tool_ok, mock_tool_fail])
result = await handler.execute("test")
assert result.total_found == 1
assert "ok_tool" in result.sources_searched
assert len(result.errors) == 1
assert "fail_tool" in result.errors[0]
7. Integration Test (Optional, Real API)
# tests/integration/test_pubmed_live.py
"""Integration tests that hit real APIs (run manually)."""
import pytest
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.asyncio
async def test_pubmed_live_search():
"""Test real PubMed search (requires network)."""
from src.tools.pubmed import PubMedTool
tool = PubMedTool()
results = await tool.search("metformin diabetes", max_results=3)
assert len(results) > 0
assert results[0].citation.source == "pubmed"
assert "pubmed.ncbi.nlm.nih.gov" in results[0].citation.url
# Run with: uv run pytest tests/integration -m integration
8. Implementation Checklist
- Create
src/utils/models.pywith all Pydantic models (Evidence, Citation, SearchResult) - COMPLETE - Create
src/tools/__init__.pywith SearchTool Protocol and exports - COMPLETE - Implement
src/tools/pubmed.pywith PubMedTool class - COMPLETE -
Implement- REMOVED (replaced by Europe PMC in Phase 11)src/tools/websearch.pywith WebTool class - Create
src/tools/search_handler.pywith SearchHandler class - COMPLETE - Write tests in
tests/unit/tools/test_pubmed.py- COMPLETE (basic tests) - Write tests in
tests/unit/tools/test_websearch.py- N/A (WebTool removed) - Write tests in
tests/unit/tools/test_search_handler.py- COMPLETE (basic tests) - Run
uv run pytest tests/unit/tools/ -vβ ALL TESTS MUST PASS - PASSING - (Optional) Run integration test:
uv run pytest -m integration - Add edge case tests (rate limiting, error handling, timeouts) - PENDING
- Commit:
git commit -m "feat: phase 2 search slice complete"- DONE
Post-Phase 2 Enhancements:
- Query preprocessing (
src/tools/query_utils.py) - ADDED - Europe PMC tool (Phase 11) - ADDED
- ClinicalTrials tool (Phase 10) - ADDED
9. Definition of Done
Phase 2 is COMPLETE when:
- β
All unit tests pass:
uv run pytest tests/unit/tools/ -v- PASSING - β
SearchHandlercan execute with search tools - WORKING - β Graceful degradation: if one tool fails, other tools still return results - IMPLEMENTED
- β Rate limiting is enforced (verify no 429 errors) - IMPLEMENTED
- β Can run this in Python REPL:
import asyncio
from src.tools.pubmed import PubMedTool
from src.tools.search_handler import SearchHandler
async def test():
handler = SearchHandler([PubMedTool()])
result = await handler.execute("metformin alzheimer")
print(f"Found {result.total_found} results")
for e in result.evidence[:3]:
print(f"- {e.citation.title}")
asyncio.run(test())
Note: WebTool was removed in favor of Europe PMC (Phase 11). The current implementation uses PubMed as the primary Phase 2 tool, with Europe PMC and ClinicalTrials added in later phases.
Proceed to Phase 3 ONLY after all checkboxes are complete.