Spaces:
Running
Running
File size: 5,429 Bytes
3e435ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# shared/utils.py
"""Shared utilities for MCP servers"""
import asyncio
import time
import logging
from typing import Optional, Callable, Any
from mcp.types import TextContent
import httpx
logger = logging.getLogger(__name__)
class RateLimiter:
"""Rate limiter for API calls"""
def __init__(self, delay: float):
"""
Initialize rate limiter
Args:
delay: Minimum delay between requests in seconds
"""
self.delay = delay
self.last_request_time: Optional[float] = None
async def wait(self) -> None:
"""Wait if necessary to respect rate limit"""
if self.last_request_time is not None:
elapsed = time.time() - self.last_request_time
if elapsed < self.delay:
await asyncio.sleep(self.delay - elapsed)
self.last_request_time = time.time()
async def safe_api_call(
func: Callable,
*args: Any,
timeout: float = 30.0,
error_prefix: str = "API",
**kwargs: Any
) -> list[TextContent]:
"""
Safely execute an API call with comprehensive error handling
Args:
func: Async function to call
*args: Positional arguments for func
timeout: Timeout in seconds
error_prefix: Prefix for error messages
**kwargs: Keyword arguments for func
Returns:
list[TextContent]: Result or error message
"""
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
except asyncio.TimeoutError:
logger.error(f"{error_prefix} request timed out after {timeout}s")
return [TextContent(
type="text",
text=f"Error: {error_prefix} request timed out after {timeout} seconds. Please try again."
)]
except httpx.TimeoutException:
logger.error(f"{error_prefix} request timed out")
return [TextContent(
type="text",
text=f"Error: {error_prefix} request timed out. Please try again."
)]
except httpx.HTTPStatusError as e:
logger.error(f"{error_prefix} error: HTTP {e.response.status_code}")
return [TextContent(
type="text",
text=f"Error: {error_prefix} returned status {e.response.status_code}"
)]
except httpx.RequestError as e:
logger.error(f"{error_prefix} request error: {e}")
return [TextContent(
type="text",
text=f"Error: Failed to connect to {error_prefix}. Please check your connection."
)]
except Exception as e:
logger.error(f"Unexpected error in {error_prefix}: {e}", exc_info=True)
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
def truncate_text(text: str, max_chars: int = 8000, suffix: str = "...") -> str:
"""
Truncate text to maximum length with suffix
Args:
text: Text to truncate
max_chars: Maximum character count
suffix: Suffix to add when truncated
Returns:
Truncated text
"""
if len(text) <= max_chars:
return text
return text[:max_chars] + f"\n\n[Content truncated at {max_chars} characters]{suffix}"
def format_authors(authors: str, max_authors: int = 3) -> str:
"""
Format author list with et al. if needed
Args:
authors: Semicolon-separated author list
max_authors: Maximum authors to show
Returns:
Formatted author string
"""
if not authors or authors == "Unknown":
return "Unknown authors"
author_list = [a.strip() for a in authors.split(";")]
if len(author_list) <= max_authors:
return ", ".join(author_list)
return ", ".join(author_list[:max_authors]) + " et al."
def clean_whitespace(text: str) -> str:
"""
Clean up excessive whitespace in text
Args:
text: Text to clean
Returns:
Cleaned text
"""
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
return '\n'.join(chunk for chunk in chunks if chunk)
class ErrorFormatter:
"""Consistent error message formatting"""
@staticmethod
def not_found(resource_type: str, identifier: str) -> str:
"""Format not found error"""
return f"No {resource_type} found with identifier: {identifier}"
@staticmethod
def no_results(query: str, time_period: str = "") -> str:
"""Format no results error"""
time_str = f" {time_period}" if time_period else ""
return f"No results found for query: {query}{time_str}"
@staticmethod
def validation_error(field: str, issue: str) -> str:
"""Format validation error"""
return f"Validation error: {field} - {issue}"
@staticmethod
def api_error(service: str, status_code: int) -> str:
"""Format API error"""
return f"Error: {service} API returned status {status_code}"
def create_citation(
identifier: str,
identifier_type: str,
url: Optional[str] = None
) -> str:
"""
Create a formatted citation string
Args:
identifier: Citation identifier (PMID, DOI, NCT ID)
identifier_type: Type of identifier
url: Optional URL
Returns:
Formatted citation
"""
citation = f"{identifier_type}: {identifier}"
if url:
citation += f" | URL: {url}"
return citation
|