Spaces:
Sleeping
Sleeping
| """ | |
| Main Surf Spot Finder Tool - Complete Workflow Orchestration. | |
| This module provides the primary MCP tool for finding and ranking surf spots. | |
| It orchestrates the complete workflow from user input to AI-powered recommendations: | |
| 1. Location resolution (address/coordinates) | |
| 2. Nearby spot discovery (distance filtering) | |
| 3. Real-time condition analysis (wave/wind data) | |
| 4. Multi-factor evaluation and scoring | |
| 5. AI-powered reasoning and explanations | |
| The tool integrates multiple data sources: | |
| - MCP resources for surf spot database | |
| - Stormglass API for marine conditions | |
| - Nominatim for geocoding | |
| - LLM providers for natural language reasoning | |
| Example: | |
| >>> finder = SurfSpotFinder() | |
| >>> input_data = SpotFinderInput( | |
| ... user_location="Málaga, Spain", | |
| ... max_distance_km=50, | |
| ... top_n=3, | |
| ... user_preferences={"skill_level": "intermediate"} | |
| ... ) | |
| >>> result = await finder.run(input_data) | |
| >>> print(f"Found {len(result.spots)} spots") | |
| Author: Surf Spot Finder Team | |
| License: MIT | |
| """ | |
| import json | |
| import os | |
| from typing import Dict, Any, List, Optional | |
| from pydantic import BaseModel, Field | |
| from geopy.distance import geodesic | |
| import asyncio | |
| import logging | |
| from .location_tool import LocationTool, LocationInput | |
| from .stormglass_tool import create_stormglass_tool | |
| from .surf_eval_tool import SurfEvaluatorTool | |
| from .llm_agent_tool import SurfLLMAgent, LLMAgentInput | |
| logger = logging.getLogger(__name__) | |
| class SpotFinderInput(BaseModel): | |
| """Input schema for the surf spot finder tool. | |
| Attributes: | |
| user_location: User's location as address, city name, or coordinates. | |
| max_distance_km: Maximum search radius in kilometers (default: 50). | |
| top_n: Number of top-ranked spots to return (default: 3). | |
| user_preferences: Dict containing skill_level, board_type, etc. | |
| """ | |
| user_location: str = Field(description="User's location (name, address, or coordinates)") | |
| max_distance_km: float = Field(default=50, description="Maximum distance to search for spots (km)") | |
| top_n: int = Field(default=3, description="Number of top spots to return") | |
| user_preferences: Dict[str, Any] = Field(default_factory=dict, description="User surfing preferences") | |
| class SpotFinderOutput(BaseModel): | |
| """Output schema for surf spot finder results. | |
| Attributes: | |
| success: Whether the operation completed successfully. | |
| user_location: Resolved coordinates as {"lat": float, "lon": float}. | |
| spots: List of ranked surf spots with scores and conditions. | |
| ai_summary: Brief AI-generated summary of recommendations. | |
| ai_reasoning: Detailed AI analysis and explanations. | |
| error: Error message if success is False. | |
| """ | |
| success: bool | |
| user_location: Optional[Dict[str, float]] = None | |
| spots: List[Dict[str, Any]] = [] | |
| ai_summary: str = "" | |
| ai_reasoning: str = "" | |
| error: str = "" | |
| class SurfSpotFinder: | |
| """Main orchestration tool for finding optimal surf spots. | |
| This class coordinates the complete surf recommendation workflow by | |
| integrating location services, marine data APIs, evaluation algorithms, | |
| and AI reasoning to provide ranked surf spot recommendations. | |
| The workflow: | |
| 1. Resolves user location to coordinates | |
| 2. Filters surf spots by distance | |
| 3. Fetches real-time wave conditions | |
| 4. Evaluates each spot using multi-factor scoring | |
| 5. Generates AI-powered analysis and explanations | |
| Attributes: | |
| name: Tool identifier for MCP registration. | |
| description: Human-readable tool description. | |
| location_tool: Service for address/coordinate resolution. | |
| stormglass_tool: API client for marine condition data. | |
| evaluator: Surf condition evaluation algorithm. | |
| llm_agent: AI reasoning and natural language generation. | |
| spots_db: Cached surf spot database from MCP resources. | |
| Example: | |
| >>> finder = SurfSpotFinder() | |
| >>> input_data = SpotFinderInput(user_location="Lisbon") | |
| >>> result = await finder.run(input_data) | |
| >>> print(f"Best spot: {result.spots[0]['name']}") | |
| """ | |
| name = "find_surf_spots" | |
| description = "Find and rank the best surf spots near a given location based on current conditions" | |
| def __init__(self): | |
| self.location_tool = LocationTool() | |
| self.stormglass_tool = create_stormglass_tool()["function"] | |
| self.evaluator = SurfEvaluatorTool() | |
| self.llm_agent = SurfLLMAgent() | |
| self.spots_db = self._load_surf_spots() | |
| def _load_surf_spots(self) -> List[Dict[str, Any]]: | |
| """ | |
| Load surf spots database via MCP resources with fallback. | |
| Attempts to load surf spots using MCP resource primitive first, | |
| then falls back to direct file I/O if MCP resources are unavailable. | |
| This ensures reliability across different execution contexts. | |
| MCP Resource Path: surf://spots/database | |
| Fallback Path: ../data/surf_spots.json | |
| Returns: | |
| List[Dict[str, Any]]: List of surf spot dictionaries | |
| Note: | |
| Import is done locally to avoid circular dependency issues. | |
| Uses fallback to file I/O if async context is unavailable. | |
| """ | |
| try: | |
| # Try MCP resources first (preferred method) | |
| import asyncio | |
| # Local import to avoid circular dependency issues | |
| try: | |
| from mcp_server.mcp_server import read_resource | |
| except ImportError: | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from mcp_server import read_resource | |
| # Check if we're already in an async context | |
| try: | |
| loop = asyncio.get_running_loop() | |
| # Already in async context - can't create new loop | |
| # Fall back to file I/O | |
| raise RuntimeError("Already in async context, using fallback") | |
| except RuntimeError: | |
| # Not in async context - safe to create new loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Read from MCP resource | |
| content = loop.run_until_complete(read_resource("surf://spots/database")) | |
| loop.close() | |
| data = json.loads(content) | |
| spots = data.get("spots", []) | |
| logger.info(f"✅ Loaded {len(spots)} surf spots via MCP resources") | |
| return spots | |
| except Exception as e: | |
| # MCP resources unavailable - use fallback | |
| logger.warning(f"⚠️ MCP resources not available in this context, using file I/O fallback") | |
| # Fallback: Direct file I/O for reliability | |
| try: | |
| spots_path = os.path.join(os.path.dirname(__file__), "..", "data", "surf_spots.json") | |
| with open(spots_path, 'r') as f: | |
| data = json.load(f) | |
| spots = data["spots"] | |
| logger.info(f"✅ Loaded {len(spots)} surf spots via fallback file I/O") | |
| return spots | |
| except Exception as fallback_error: | |
| # Total failure - return empty list to prevent crash | |
| logger.error(f"❌ Both MCP and fallback failed: {fallback_error}") | |
| return [] | |
| def find_nearby_spots(self, user_lat: float, user_lon: float, max_distance_km: float) -> List[Dict[str, Any]]: | |
| """Find surf spots within specified distance of user location. | |
| Uses haversine formula to calculate spherical distances between | |
| user coordinates and each surf spot in the database. | |
| Args: | |
| user_lat: User's latitude in decimal degrees. | |
| user_lon: User's longitude in decimal degrees. | |
| max_distance_km: Maximum search radius in kilometers. | |
| Returns: | |
| List of surf spots within radius, sorted by distance. | |
| Each spot includes original data plus distance_km field. | |
| """ | |
| nearby_spots = [] | |
| user_location = (user_lat, user_lon) | |
| for spot in self.spots_db: | |
| spot_location = (spot["latitude"], spot["longitude"]) | |
| distance = geodesic(user_location, spot_location).kilometers | |
| if distance <= max_distance_km: | |
| spot_with_distance = spot.copy() | |
| spot_with_distance["distance_km"] = round(distance, 1) | |
| nearby_spots.append(spot_with_distance) | |
| # Sort by distance | |
| nearby_spots.sort(key=lambda x: x["distance_km"]) | |
| return nearby_spots | |
| async def get_spot_conditions(self, spot: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Get current wave conditions for a specific surf spot. | |
| Fetches real-time marine data including wave height, direction, | |
| period, wind speed/direction, and tide information. | |
| Args: | |
| spot: Surf spot dictionary with latitude/longitude. | |
| Returns: | |
| Dict containing current conditions, or None if fetch fails. | |
| Includes wave_height, wave_direction, wind_speed, etc. | |
| """ | |
| try: | |
| # Use the Stormglass tool to get wave data | |
| from .stormglass_tool import WaveDataInput | |
| wave_input = WaveDataInput(lat=spot["latitude"], lon=spot["longitude"]) | |
| conditions = self.stormglass_tool(wave_input) | |
| # Convert to dict for compatibility | |
| if hasattr(conditions, 'dict'): | |
| return conditions.dict() | |
| elif hasattr(conditions, '__dict__'): | |
| return conditions.__dict__ | |
| return conditions | |
| except Exception as e: | |
| logger.error(f"Failed to get conditions for {spot['name']}: {e}") | |
| return None | |
| async def evaluate_spot(self, spot: Dict[str, Any], conditions: Dict[str, Any], user_prefs: Dict[str, Any]) -> Dict[str, Any]: | |
| """Evaluate a single surf spot using current conditions and user preferences. | |
| Applies multi-factor scoring algorithm considering wave conditions, | |
| wind analysis, swell direction, and skill compatibility. | |
| Args: | |
| spot: Surf spot data including location and characteristics. | |
| conditions: Current wave/wind conditions from marine APIs. | |
| user_prefs: User preferences including skill level, board type. | |
| Returns: | |
| Dict containing evaluated spot with score, explanation, and | |
| breakdown of individual scoring factors. | |
| """ | |
| evaluation = await self.evaluator.run({ | |
| "spot": spot, | |
| "conditions": conditions, | |
| "prefs": user_prefs | |
| }) | |
| return { | |
| "id": spot["id"], | |
| "name": spot["name"], | |
| "location": f"{spot['location']}, {spot['region']}", | |
| "latitude": spot["latitude"], | |
| "longitude": spot["longitude"], | |
| "distance_km": spot["distance_km"], | |
| "score": evaluation["score"], | |
| "explanation": evaluation["explanation"], | |
| "breakdown": evaluation["breakdown"], | |
| "conditions": conditions, | |
| "characteristics": spot["characteristics"] | |
| } | |
| async def run(self, input_data: SpotFinderInput) -> SpotFinderOutput: | |
| """Execute the complete surf spot finding workflow. | |
| This is the main entry point that orchestrates all steps of the | |
| surf recommendation process from user input to final rankings. | |
| Args: | |
| input_data: User request containing location, preferences, and filters. | |
| Returns: | |
| Complete results including ranked spots, AI analysis, and metadata. | |
| Raises: | |
| No exceptions - all errors are captured in SpotFinderOutput.error | |
| for graceful degradation and user-friendly error messages. | |
| """ | |
| try: | |
| # Step 1: Resolve user location | |
| logger.info(f"Resolving location: {input_data.user_location}") | |
| location_result = self.location_tool.run( | |
| LocationInput(location_query=input_data.user_location) | |
| ) | |
| if not location_result.success: | |
| return SpotFinderOutput( | |
| success=False, | |
| error=f"Could not resolve location: {location_result.error}" | |
| ) | |
| user_coords = location_result.coordinates | |
| user_lat, user_lon = user_coords["lat"], user_coords["lon"] | |
| logger.info(f"User location resolved to: {user_lat}, {user_lon}") | |
| # Step 2: Find nearby surf spots | |
| nearby_spots = self.find_nearby_spots(user_lat, user_lon, input_data.max_distance_km) | |
| if not nearby_spots: | |
| return SpotFinderOutput( | |
| success=False, | |
| error=f"No surf spots found within {input_data.max_distance_km}km of {input_data.user_location}" | |
| ) | |
| logger.info(f"Found {len(nearby_spots)} nearby spots") | |
| # Step 3: Get conditions and evaluate each spot | |
| evaluated_spots = [] | |
| # Process spots concurrently for speed | |
| semaphore = asyncio.Semaphore(5) # Limit concurrent API calls | |
| async def process_spot(spot): | |
| async with semaphore: | |
| conditions = await self.get_spot_conditions(spot) | |
| if conditions: | |
| return await self.evaluate_spot(spot, conditions, input_data.user_preferences) | |
| return None | |
| tasks = [process_spot(spot) for spot in nearby_spots] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.error(f"Spot evaluation failed: {result}") | |
| elif result: | |
| evaluated_spots.append(result) | |
| # Step 4: Sort by score and return top N | |
| evaluated_spots.sort(key=lambda x: x["score"], reverse=True) | |
| top_spots = evaluated_spots[:input_data.top_n] | |
| # Step 5: Generate AI reasoning and summary | |
| ai_summary = "" | |
| ai_reasoning = "" | |
| try: | |
| logger.info("Generating AI analysis of surf conditions...") | |
| llm_result = await self.llm_agent.run(LLMAgentInput( | |
| user_location=input_data.user_location, | |
| user_preferences=input_data.user_preferences, | |
| surf_spots=top_spots | |
| )) | |
| if llm_result.success: | |
| ai_summary = llm_result.summary | |
| ai_reasoning = llm_result.reasoning | |
| logger.info("AI analysis completed successfully") | |
| else: | |
| logger.warning(f"LLM agent failed: {llm_result.error}") | |
| ai_summary = f"Found {len(top_spots)} surf spots. Top recommendation: {top_spots[0]['name']} with {top_spots[0]['score']}/100 score." | |
| except Exception as e: | |
| logger.error(f"AI analysis failed: {e}") | |
| ai_summary = f"Found {len(top_spots)} surf spots near {input_data.user_location}. Analysis based on wave conditions and user preferences." | |
| return SpotFinderOutput( | |
| success=True, | |
| user_location=user_coords, | |
| spots=top_spots, | |
| ai_summary=ai_summary, | |
| ai_reasoning=ai_reasoning | |
| ) | |
| except Exception as e: | |
| logger.error(f"Surf spot finder error: {e}") | |
| return SpotFinderOutput( | |
| success=False, | |
| error=f"Internal error: {str(e)}" | |
| ) | |
| def create_spot_finder_tool() -> Dict[str, Any]: | |
| """Factory function to create the surf spot finder tool. | |
| Creates and configures the main MCP tool for surf spot recommendations. | |
| Returns tool specification compatible with MCP protocol. | |
| Returns: | |
| Dict containing tool name, description, schema, and function reference. | |
| Example: | |
| >>> tool = create_spot_finder_tool() | |
| >>> result = await tool["function"](input_data) | |
| """ | |
| tool = SurfSpotFinder() | |
| return { | |
| "name": tool.name, | |
| "description": tool.description, | |
| "input_schema": SpotFinderInput.schema(), | |
| "function": tool.run, | |
| } |