Spaces:
Sleeping
Sleeping
| from dotenv import load_dotenv | |
| import os | |
| import googlemaps | |
| import re | |
| import gradio as gr | |
| from datetime import datetime | |
| import requests | |
| from geopy.geocoders import Nominatim | |
| from datetime import datetime, date, timedelta | |
| import pandas as pd | |
| import json | |
| from pathlib import Path | |
| dotenv_path = os.getenv('DOTENV_PATH', None) | |
| if dotenv_path: | |
| load_dotenv(dotenv_path) | |
| else: | |
| load_dotenv() | |
| api_key = os.getenv("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise EnvironmentError("GOOGLE_API_KEY not found in environment variables.") | |
| gmaps = googlemaps.Client(key=api_key) | |
| def get_directions( | |
| source: str, | |
| destination: str, | |
| mode: str, | |
| departure_time: str): | |
| """ | |
| Returns a Markdown-formatted string with step-by-step directions and a static map URL. | |
| Args: | |
| source: Starting address or place name. | |
| destination: Ending address or place name. | |
| mode: Transport mode: driving, walking, bicycling, or transit. | |
| departure_time: Trip start time in "YYYY-MM-DD HH:MM" format. | |
| waypoints: Optional comma-separated list of waypoints. | |
| Returns: | |
| A single Markdown string summarizing the route or an error message. | |
| """ | |
| # Parse departure time | |
| try: | |
| dt = datetime.strptime(departure_time.strip(), "%Y-%m-%d %H:%M") | |
| except ValueError: | |
| return ( | |
| "**⚠️ Invalid time format.** Please enter departure time as YYYY-MM-DD HH:MM." | |
| ) | |
| # Build waypoints list | |
| wp_list = None | |
| # Call Directions API | |
| directions = gmaps.directions( | |
| origin=source, | |
| destination=destination, | |
| mode=mode.lower(), | |
| departure_time=dt, | |
| waypoints=wp_list, | |
| optimize_waypoints=True, | |
| ) | |
| if not directions: | |
| return "**No route found.**" | |
| # Use first route & leg | |
| route = directions[0] | |
| leg = route["legs"][0] | |
| # Build summary | |
| summary = f"**Trip from '{leg['start_address']}' to '{leg['end_address']}'**\n" | |
| summary += f"- Total distance: {leg['distance']['text']}\n" | |
| summary += f"- Estimated duration: {leg['duration']['text']}\n\n" | |
| summary += "### Step-by-Step Directions:\n" | |
| for i, step in enumerate(leg["steps"], start=1): | |
| travel_mode = step.get("travel_mode", "").upper() | |
| if travel_mode == "TRANSIT": | |
| details = step.get("transit_details", {}) | |
| line = details.get("line", {}) | |
| vehicle = line.get("vehicle", {}).get("type", "Transit") | |
| line_name = line.get("short_name") or line.get("name", "") | |
| dep = details.get("departure_stop", {}).get("name", "") | |
| arr = details.get("arrival_stop", {}).get("name", "") | |
| stops = details.get("num_stops", "") | |
| instr = f"Take {vehicle} {line_name} from {dep} to {arr} ({stops} stops)" | |
| dist = step.get("distance", {}).get("text", "") | |
| dur = details.get("departure_time",{}).get("text","") | |
| else: | |
| raw = step.get("html_instructions", "") | |
| instr = re.sub(r"<[^>]+>", "", raw) | |
| dist = step.get("distance", {}).get("text", "") | |
| dur = step.get("departure_time",{}).get("text","") | |
| summary += f"{i}. {instr} ({dist},{dur})\n" | |
| # Static map snapshot | |
| poly = route.get("overview_polyline", {}).get("points") | |
| if poly: | |
| static_map_url = ( | |
| "https://maps.googleapis.com/maps/api/staticmap" | |
| f"?size=600x400&path=enc:{poly}&key={api_key}" | |
| ) | |
| summary += "\n---\n" | |
| summary += ("Here’s a **static map snapshot** of this route:\n" + static_map_url) | |
| return summary | |
| WEATHER_CODES = { | |
| 0: "Clear sky", 1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast", | |
| 45: "Fog", 48: "Depositing rime fog", 51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle", | |
| 61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain", 71: "Slight snowfall", 73: "Moderate snowfall", | |
| 75: "Heavy snowfall", 80: "Slight rain showers", 81: "Moderate rain showers", 82: "Violent rain showers", | |
| 95: "Thunderstorm", 96: "Thunderstorm with hail" | |
| } | |
| def get_weather_forecast_range(location_name, start_date, end_date): | |
| try: | |
| if isinstance(start_date, str): | |
| start_date = datetime.strptime(start_date, "%Y-%m-%d").date() | |
| if isinstance(end_date, str): | |
| end_date = datetime.strptime(end_date, "%Y-%m-%d").date() | |
| today = date.today() | |
| days_ahead = (end_date - today).days | |
| if days_ahead > 15: | |
| return {"error": "Weather data only available up to 15 days from today."} | |
| geolocator = Nominatim(user_agent="weather_api") | |
| location = geolocator.geocode(location_name) | |
| if not location: | |
| return {"error": f"Could not find coordinates for '{location_name}'."} | |
| lat, lon = location.latitude, location.longitude | |
| include_hourly = days_ahead <= 6 | |
| url = "https://api.open-meteo.com/v1/forecast" | |
| params = { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "daily": "sunrise,sunset,uv_index_max,temperature_2m_max,temperature_2m_min,weather_code", | |
| "temperature_unit": "celsius", | |
| "windspeed_unit": "kmh", | |
| "timeformat": "iso8601", | |
| "timezone": "auto", | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat() | |
| } | |
| if include_hourly: | |
| params["hourly"] = "temperature_2m,weather_code,uv_index,visibility" | |
| response = requests.get(url, params=params) | |
| if response.status_code != 200: | |
| return {"error": f"API error {response.status_code}: {response.text}"} | |
| raw = response.json() | |
| forecasts = [] | |
| for idx, d in enumerate(raw["daily"]["time"]): | |
| day_result = { | |
| "date": d, | |
| "sunrise": raw["daily"]["sunrise"][idx].split("T")[1], | |
| "sunset": raw["daily"]["sunset"][idx].split("T")[1], | |
| "uv_max": round(raw["daily"]["uv_index_max"][idx], 1), | |
| "temp_min": round(raw["daily"]["temperature_2m_min"][idx]), | |
| "temp_max": round(raw["daily"]["temperature_2m_max"][idx]), | |
| "weather": WEATHER_CODES.get(int(raw["daily"]["weather_code"][idx]), "Unknown") | |
| } | |
| if include_hourly and "hourly" in raw: | |
| hourly_df = pd.DataFrame({ | |
| "time": raw["hourly"]["time"], | |
| "temp": raw["hourly"]["temperature_2m"], | |
| "code": raw["hourly"]["weather_code"], | |
| "uv": raw["hourly"]["uv_index"], | |
| "visibility": [v / 1000 for v in raw["hourly"]["visibility"]] | |
| }) | |
| hourly_df["time"] = pd.to_datetime(hourly_df["time"]) | |
| target_date = datetime.strptime(d, "%Y-%m-%d").date() | |
| df_day = hourly_df[hourly_df["time"].dt.date == target_date] | |
| df_day["weather"] = df_day["code"].apply(lambda c: WEATHER_CODES.get(int(c), "Unknown")) | |
| day_result["hourly"] = [ | |
| { | |
| "time": t.strftime("%Y-%m-%d %H:%M"), | |
| "temp": f"{round(temp)}°C", | |
| "weather": w, | |
| "uv": round(uv, 1), | |
| "visibility": f"{round(vis, 1)} km" | |
| } | |
| for t, temp, w, uv, vis in zip( | |
| df_day["time"], df_day["temp"], df_day["weather"], | |
| df_day["uv"], df_day["visibility"] | |
| ) | |
| ] | |
| else: | |
| day_result["note"] = "Hourly weather data is only available for the next 7 days." | |
| forecasts.append(day_result) | |
| return forecasts | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def get_weather_forecast_range_ui(location_name, start_date, end_date): | |
| result = get_weather_forecast_range(location_name, start_date, end_date) | |
| if "error" in result: | |
| return f"**⚠️ Error:** {result['error']}" | |
| # Save JSON silently | |
| save_dir = Path("logs") | |
| save_dir.mkdir(exist_ok=True) | |
| save_path = save_dir / f"weather_{location_name}_{start_date}_to_{end_date}.json" | |
| with open(save_path, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2) | |
| # Generate Markdown summary | |
| md = f"## Weather Forecast for {location_name}\n\n" | |
| for day in result: | |
| md += f"### 📅 {day['date']}\n" | |
| md += f"- 🌅 Sunrise: {day['sunrise']}\n" | |
| md += f"- 🌇 Sunset: {day['sunset']}\n" | |
| md += f"- 🌡️ Temp: {day['temp_min']}°C – {day['temp_max']}°C\n" | |
| md += f"- 🌤️ Weather: {day['weather']}\n" | |
| md += f"- ☀️ UV Index Max: {day['uv_max']}\n" | |
| if "hourly" in day: | |
| md += f"**Hourly Forecast:**\n" | |
| for h in day["hourly"]: | |
| md += f" - {h['time']}: {h['temp']}, {h['weather']}, UV {h['uv']}, Visibility {h['visibility']}\n" | |
| elif "note" in day: | |
| md += f"_{day['note']}_\n" | |
| md += "\n" | |
| return md | |
| demo = gr.TabbedInterface( | |
| [ | |
| gr.Interface( | |
| fn=get_directions, | |
| inputs=[ | |
| gr.Textbox(label="Source Location", placeholder="e.g. New York City"), | |
| gr.Textbox(label="Destination Location", placeholder="e.g. Los Angeles"), | |
| gr.Radio(["driving", "walking", "bicycling", "transit"], label="Travel Mode", value="driving"), | |
| gr.Textbox(label="Departure Time (YYYY-MM-DD HH:MM)", placeholder="e.g. 2025-06-08 14:30"), | |
| ], | |
| outputs=gr.Markdown(label="Directions Summary"), | |
| api_name="get direction" | |
| ), | |
| gr.Interface( | |
| fn=get_weather_forecast_range_ui, | |
| inputs=[ | |
| gr.Textbox(label="Location Name", placeholder="Enter a city or place name"), | |
| gr.Textbox(label="Start Date (YYYY-MM-DD)", value=date.today()), | |
| gr.Textbox(label="End Date (YYYY-MM-DD)", value=date.today() + timedelta(days=6)), | |
| ], | |
| outputs=gr.Markdown(label="Readable Forecast"), | |
| api_name="get weather forecast" | |
| ) | |
| ], | |
| tab_names=["Google Maps Directions", "Weather Forecast"] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |