DeepCritical / docs /brainstorming /implementation /17_PHASE_RATE_LIMITING.md
Joseph Pollack
Initial commit - Independent repository - Breaking fork relationship
016b413
|
raw
history blame
14.7 kB

Phase 17: Rate Limiting with limits Library

Priority: P0 CRITICAL - Prevents API blocks Effort: ~1 hour Dependencies: None


CRITICAL: Async Safety Requirements

WARNING: The rate limiter MUST be async-safe. Blocking the event loop will freeze:

  • The Gradio UI
  • All parallel searches
  • The orchestrator

Rules:

  1. NEVER use time.sleep() - Always use await asyncio.sleep()
  2. NEVER use blocking while loops - Use async-aware polling
  3. The limits library check is synchronous - Wrap it carefully

The implementation below uses a polling pattern that:

  • Checks the limit (synchronous, fast)
  • If exceeded, await asyncio.sleep() (non-blocking)
  • Retry the check

Alternative: If limits proves problematic, use aiolimiter which is pure-async.


Overview

Replace naive asyncio.sleep rate limiting with proper rate limiter using the limits library, which provides:

  • Moving window rate limiting
  • Per-API configurable limits
  • Thread-safe storage
  • Already used in reference repo

Why This Matters?

  • NCBI will block us without proper rate limiting (3/sec without key, 10/sec with)
  • Current implementation only has simple sleep delay
  • Need coordinated limits across all PubMed calls
  • Professional-grade rate limiting prevents production issues

Current State

What We Have (src/tools/pubmed.py:20-21, 34-41)

RATE_LIMIT_DELAY = 0.34  # ~3 requests/sec without API key

async def _rate_limit(self) -> None:
    """Enforce NCBI rate limiting."""
    loop = asyncio.get_running_loop()
    now = loop.time()
    elapsed = now - self._last_request_time
    if elapsed < self.RATE_LIMIT_DELAY:
        await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed)
    self._last_request_time = loop.time()

Problems

  1. Not shared across instances: Each PubMedTool() has its own counter
  2. Simple delay vs moving window: Doesn't handle bursts properly
  3. Hardcoded rate: Doesn't adapt to API key presence
  4. No backoff on 429: Just retries blindly

TDD Implementation Plan

Step 1: Add Dependency

File: pyproject.toml

dependencies = [
    # ... existing deps ...
    "limits>=3.0",
]

Then run:

uv sync

Step 2: Write the Tests First

File: tests/unit/tools/test_rate_limiting.py

"""Tests for rate limiting functionality."""

import asyncio
import time

import pytest

from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter


class TestRateLimiter:
    """Test suite for rate limiter."""

    def test_create_limiter_without_api_key(self) -> None:
        """Should create 3/sec limiter without API key."""
        limiter = RateLimiter(rate="3/second")
        assert limiter.rate == "3/second"

    def test_create_limiter_with_api_key(self) -> None:
        """Should create 10/sec limiter with API key."""
        limiter = RateLimiter(rate="10/second")
        assert limiter.rate == "10/second"

    @pytest.mark.asyncio
    async def test_limiter_allows_requests_under_limit(self) -> None:
        """Should allow requests under the rate limit."""
        limiter = RateLimiter(rate="10/second")

        # 3 requests should all succeed immediately
        for _ in range(3):
            allowed = await limiter.acquire()
            assert allowed is True

    @pytest.mark.asyncio
    async def test_limiter_blocks_when_exceeded(self) -> None:
        """Should wait when rate limit exceeded."""
        limiter = RateLimiter(rate="2/second")

        # First 2 should be instant
        await limiter.acquire()
        await limiter.acquire()

        # Third should block briefly
        start = time.monotonic()
        await limiter.acquire()
        elapsed = time.monotonic() - start

        # Should have waited ~0.5 seconds (half second window for 2/sec)
        assert elapsed >= 0.3

    @pytest.mark.asyncio
    async def test_limiter_resets_after_window(self) -> None:
        """Rate limit should reset after time window."""
        limiter = RateLimiter(rate="5/second")

        # Use up the limit
        for _ in range(5):
            await limiter.acquire()

        # Wait for window to pass
        await asyncio.sleep(1.1)

        # Should be allowed again
        start = time.monotonic()
        await limiter.acquire()
        elapsed = time.monotonic() - start

        assert elapsed < 0.1  # Should be nearly instant


class TestGetPubmedLimiter:
    """Test PubMed-specific limiter factory."""

    def test_limiter_without_api_key(self) -> None:
        """Should return 3/sec limiter without key."""
        limiter = get_pubmed_limiter(api_key=None)
        assert "3" in limiter.rate

    def test_limiter_with_api_key(self) -> None:
        """Should return 10/sec limiter with key."""
        limiter = get_pubmed_limiter(api_key="my-api-key")
        assert "10" in limiter.rate

    def test_limiter_is_singleton(self) -> None:
        """Same API key should return same limiter instance."""
        limiter1 = get_pubmed_limiter(api_key="key1")
        limiter2 = get_pubmed_limiter(api_key="key1")
        assert limiter1 is limiter2

    def test_different_keys_different_limiters(self) -> None:
        """Different API keys should return different limiters."""
        limiter1 = get_pubmed_limiter(api_key="key1")
        limiter2 = get_pubmed_limiter(api_key="key2")
        # Clear cache for clean test
        # Actually, different keys SHOULD share the same limiter
        # since we're limiting against the same API
        assert limiter1 is limiter2  # Shared NCBI rate limit

Step 3: Create Rate Limiter Module

File: src/tools/rate_limiter.py

"""Rate limiting utilities using the limits library."""

import asyncio
from typing import ClassVar

from limits import RateLimitItem, parse
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter


class RateLimiter:
    """
    Async-compatible rate limiter using limits library.

    Uses moving window algorithm for smooth rate limiting.
    """

    def __init__(self, rate: str) -> None:
        """
        Initialize rate limiter.

        Args:
            rate: Rate string like "3/second" or "10/second"
        """
        self.rate = rate
        self._storage = MemoryStorage()
        self._limiter = MovingWindowRateLimiter(self._storage)
        self._rate_limit: RateLimitItem = parse(rate)
        self._identity = "default"  # Single identity for shared limiting

    async def acquire(self, wait: bool = True) -> bool:
        """
        Acquire permission to make a request.

        ASYNC-SAFE: Uses asyncio.sleep(), never time.sleep().
        The polling pattern allows other coroutines to run while waiting.

        Args:
            wait: If True, wait until allowed. If False, return immediately.

        Returns:
            True if allowed, False if not (only when wait=False)
        """
        while True:
            # Check if we can proceed (synchronous, fast - ~microseconds)
            if self._limiter.hit(self._rate_limit, self._identity):
                return True

            if not wait:
                return False

            # CRITICAL: Use asyncio.sleep(), NOT time.sleep()
            # This yields control to the event loop, allowing other
            # coroutines (UI, parallel searches) to run
            await asyncio.sleep(0.1)

    def reset(self) -> None:
        """Reset the rate limiter (for testing)."""
        self._storage.reset()


# Singleton limiter for PubMed/NCBI
_pubmed_limiter: RateLimiter | None = None


def get_pubmed_limiter(api_key: str | None = None) -> RateLimiter:
    """
    Get the shared PubMed rate limiter.

    Rate depends on whether API key is provided:
    - Without key: 3 requests/second
    - With key: 10 requests/second

    Args:
        api_key: NCBI API key (optional)

    Returns:
        Shared RateLimiter instance
    """
    global _pubmed_limiter

    if _pubmed_limiter is None:
        rate = "10/second" if api_key else "3/second"
        _pubmed_limiter = RateLimiter(rate)

    return _pubmed_limiter


def reset_pubmed_limiter() -> None:
    """Reset the PubMed limiter (for testing)."""
    global _pubmed_limiter
    _pubmed_limiter = None


# Factory for other APIs
class RateLimiterFactory:
    """Factory for creating/getting rate limiters for different APIs."""

    _limiters: ClassVar[dict[str, RateLimiter]] = {}

    @classmethod
    def get(cls, api_name: str, rate: str) -> RateLimiter:
        """
        Get or create a rate limiter for an API.

        Args:
            api_name: Unique identifier for the API
            rate: Rate limit string (e.g., "10/second")

        Returns:
            RateLimiter instance (shared for same api_name)
        """
        if api_name not in cls._limiters:
            cls._limiters[api_name] = RateLimiter(rate)
        return cls._limiters[api_name]

    @classmethod
    def reset_all(cls) -> None:
        """Reset all limiters (for testing)."""
        cls._limiters.clear()

Step 4: Update PubMed Tool

File: src/tools/pubmed.py (replace rate limiting code)

# Replace imports and rate limiting

from src.tools.rate_limiter import get_pubmed_limiter


class PubMedTool:
    """Search tool for PubMed/NCBI."""

    BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
    HTTP_TOO_MANY_REQUESTS = 429

    def __init__(self, api_key: str | None = None) -> None:
        self.api_key = api_key or settings.ncbi_api_key
        if self.api_key == "your-ncbi-key-here":
            self.api_key = None
        # Use shared rate limiter
        self._limiter = get_pubmed_limiter(self.api_key)

    async def _rate_limit(self) -> None:
        """Enforce NCBI rate limiting using shared limiter."""
        await self._limiter.acquire()

    # ... rest of class unchanged ...

Step 5: Add Rate Limiters for Other APIs

File: src/tools/clinicaltrials.py (optional)

from src.tools.rate_limiter import RateLimiterFactory


class ClinicalTrialsTool:
    def __init__(self) -> None:
        # ClinicalTrials.gov doesn't document limits, but be conservative
        self._limiter = RateLimiterFactory.get("clinicaltrials", "5/second")

    async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
        await self._limiter.acquire()
        # ... rest of method ...

File: src/tools/europepmc.py (optional)

from src.tools.rate_limiter import RateLimiterFactory


class EuropePMCTool:
    def __init__(self) -> None:
        # Europe PMC is generous, but still be respectful
        self._limiter = RateLimiterFactory.get("europepmc", "10/second")

    async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
        await self._limiter.acquire()
        # ... rest of method ...

Demo Script

File: examples/rate_limiting_demo.py

#!/usr/bin/env python3
"""Demo script to verify rate limiting works correctly."""

import asyncio
import time

from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter, reset_pubmed_limiter
from src.tools.pubmed import PubMedTool


async def test_basic_limiter():
    """Test basic rate limiter behavior."""
    print("=" * 60)
    print("Rate Limiting Demo")
    print("=" * 60)

    # Test 1: Basic limiter
    print("\n[Test 1] Testing 3/second limiter...")
    limiter = RateLimiter("3/second")

    start = time.monotonic()
    for i in range(6):
        await limiter.acquire()
        elapsed = time.monotonic() - start
        print(f"  Request {i+1} at {elapsed:.2f}s")

    total = time.monotonic() - start
    print(f"  Total time for 6 requests: {total:.2f}s (expected ~2s)")


async def test_pubmed_limiter():
    """Test PubMed-specific limiter."""
    print("\n[Test 2] Testing PubMed limiter (shared)...")

    reset_pubmed_limiter()  # Clean state

    # Without API key: 3/sec
    limiter = get_pubmed_limiter(api_key=None)
    print(f"  Rate without key: {limiter.rate}")

    # Multiple tools should share the same limiter
    tool1 = PubMedTool()
    tool2 = PubMedTool()

    # Verify they share the limiter
    print(f"  Tools share limiter: {tool1._limiter is tool2._limiter}")


async def test_concurrent_requests():
    """Test rate limiting under concurrent load."""
    print("\n[Test 3] Testing concurrent request limiting...")

    limiter = RateLimiter("5/second")

    async def make_request(i: int):
        await limiter.acquire()
        return time.monotonic()

    start = time.monotonic()
    # Launch 10 concurrent requests
    tasks = [make_request(i) for i in range(10)]
    times = await asyncio.gather(*tasks)

    # Calculate distribution
    relative_times = [t - start for t in times]
    print(f"  Request times: {[f'{t:.2f}s' for t in sorted(relative_times)]}")

    total = max(relative_times)
    print(f"  All 10 requests completed in {total:.2f}s (expected ~2s)")


async def main():
    await test_basic_limiter()
    await test_pubmed_limiter()
    await test_concurrent_requests()

    print("\n" + "=" * 60)
    print("Demo complete!")


if __name__ == "__main__":
    asyncio.run(main())

Verification Checklist

Unit Tests

# Run rate limiting tests
uv run pytest tests/unit/tools/test_rate_limiting.py -v

# Expected: All tests pass

Integration Test (Manual)

# Run demo
uv run python examples/rate_limiting_demo.py

# Expected: Requests properly spaced

Full Test Suite

make check
# Expected: All tests pass, mypy clean

Success Criteria

  1. limits library installed: Dependency added to pyproject.toml
  2. RateLimiter class works: Can create and use limiters
  3. PubMed uses new limiter: Shared limiter across instances
  4. Rate adapts to API key: 3/sec without, 10/sec with
  5. Concurrent requests handled: Multiple async requests properly queued
  6. No regressions: All existing tests pass

API Rate Limit Reference

API Without Key With Key
PubMed/NCBI 3/sec 10/sec
ClinicalTrials.gov Undocumented (~5/sec safe) N/A
Europe PMC ~10-20/sec (generous) N/A
OpenAlex ~100k/day (no per-sec limit) Faster with mailto

Notes

  • limits library uses moving window algorithm (fairer than fixed window)
  • Singleton pattern ensures all PubMed calls share the limit
  • The factory pattern allows easy extension to other APIs
  • Consider adding 429 response detection + exponential backoff
  • In production, consider Redis storage for distributed rate limiting