from langchain_core.tools import tool from youtube_search import YoutubeSearch import requests import re import ast import operator from typing import List, Optional SAFE_OPERATORS = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv, ast.Mod: operator.mod, ast.Pow: operator.pow, ast.UAdd: operator.pos, ast.USub: operator.neg, } MAX_EXPONENT = 100 def safe_eval(formula: str, variables: dict) -> float: """Safely evaluate a math formula with only basic arithmetic operations.""" try: tree = ast.parse(formula, mode='eval') except SyntaxError as e: raise ValueError(f"Invalid formula syntax: {e}") def _eval(node): if isinstance(node, ast.Expression): return _eval(node.body) elif isinstance(node, ast.Constant): if isinstance(node.value, (int, float)): return node.value raise ValueError(f"Unsupported constant type: {type(node.value).__name__}") elif isinstance(node, ast.Num): return node.n elif isinstance(node, ast.Name): if node.id in variables: return variables[node.id] raise ValueError(f"Unknown variable: {node.id}") elif isinstance(node, ast.BinOp): if type(node.op) not in SAFE_OPERATORS: raise ValueError(f"Unsupported operator: {type(node.op).__name__}") left = _eval(node.left) right = _eval(node.right) if isinstance(node.op, ast.Pow) and abs(right) > MAX_EXPONENT: raise ValueError(f"Exponent too large (max {MAX_EXPONENT})") if isinstance(node.op, (ast.Div, ast.FloorDiv, ast.Mod)) and right == 0: raise ValueError("Division by zero") return SAFE_OPERATORS[type(node.op)](left, right) elif isinstance(node, ast.UnaryOp): if type(node.op) not in SAFE_OPERATORS: raise ValueError(f"Unsupported operator: {type(node.op).__name__}") return SAFE_OPERATORS[type(node.op)](_eval(node.operand)) else: raise ValueError(f"Unsupported expression: {type(node).__name__}") return _eval(tree) @tool def hex_to_decimal(hex_value: str) -> str: """Convert a hexadecimal value to decimal. Args: hex_value: A hexadecimal string (e.g., "1A", "FF", "0x2B", "1A F8") Returns: The decimal equivalent """ try: hex_value = hex_value.strip().upper().replace("0X", "").replace(" ", "") decimal_value = int(hex_value, 16) return f"Hex '{hex_value}' = Decimal {decimal_value}" except ValueError as e: return f"Error: Invalid hexadecimal value '{hex_value}'. {str(e)}" @tool def combine_bytes(byte_a: str, byte_b: str, byte_c: Optional[str] = None, byte_d: Optional[str] = None) -> str: """Combine 2-4 hex bytes into a larger value (big-endian, MSB first). Args: byte_a: First (most significant) byte as hex (e.g., "01") byte_b: Second byte as hex (e.g., "F4") byte_c: Optional third byte as hex byte_d: Optional fourth byte as hex Returns: The combined decimal value with byte breakdown """ try: bytes_list = [ byte_a.strip().upper().replace("0X", ""), byte_b.strip().upper().replace("0X", "") ] if byte_c: bytes_list.append(byte_c.strip().upper().replace("0X", "")) if byte_d: bytes_list.append(byte_d.strip().upper().replace("0X", "")) result = 0 for byte_hex in bytes_list: result = (result << 8) + int(byte_hex, 16) byte_labels = [] for i, b in enumerate(bytes_list): label = chr(ord('A') + i) byte_labels.append(f"{label}=0x{b}({int(b, 16)})") return (f"Bytes: {', '.join(byte_labels)}\n" f"Combined: Decimal {result}") except ValueError as e: return f"Error: Invalid byte value. {str(e)}" @tool def calculate_obd_value(formula: str, byte_a: str, byte_b: Optional[str] = None, byte_c: Optional[str] = None, byte_d: Optional[str] = None) -> str: """Calculate an OBD-II PID value using a formula and hex byte inputs. Common formulas: - Engine RPM: "(A * 256 + B) / 4" - Coolant Temp: "A - 40" - Throttle Position: "(A * 100) / 255" - MAF Rate: "(A * 256 + B) / 100" - Timing Advance: "(A - 128) / 2" - Fuel Trim: "(A - 128) * 100 / 128" Args: formula: Math formula using A, B, C, D as variables byte_a: First byte as hex byte_b: Optional second byte as hex byte_c: Optional third byte as hex byte_d: Optional fourth byte as hex Returns: The calculated result """ try: A = int(byte_a.strip().upper().replace("0X", ""), 16) B = int(byte_b.strip().upper().replace("0X", ""), 16) if byte_b else 0 C = int(byte_c.strip().upper().replace("0X", ""), 16) if byte_c else 0 D = int(byte_d.strip().upper().replace("0X", ""), 16) if byte_d else 0 inputs = [f"A=0x{byte_a.strip().upper()}({A})"] if byte_b: inputs.append(f"B=0x{byte_b.strip().upper()}({B})") if byte_c: inputs.append(f"C=0x{byte_c.strip().upper()}({C})") if byte_d: inputs.append(f"D=0x{byte_d.strip().upper()}({D})") result = safe_eval(formula, {"A": A, "B": B, "C": C, "D": D}) if isinstance(result, float) and result == int(result): result_str = str(int(result)) elif isinstance(result, float): result_str = f"{result:.4f}".rstrip('0').rstrip('.') else: result_str = str(result) return (f"Formula: {formula}\n" f"Inputs: {', '.join(inputs)}\n" f"Result: {result_str}") except ValueError as e: return f"Error: {str(e)}" except Exception as e: return f"Error calculating formula: {str(e)}" # YouTube search tool for the agent @tool def search_youtube_video(query: str) -> str: """Search YouTube for a video tutorial and return the video URL. Args: query: Search query for YouTube (e.g., "how to fix P0103 MAF sensor") Returns: The URL of the first matching YouTube video """ try: results = YoutubeSearch(query, max_results=1).to_dict() if results: video_id = results[0]['id'] return f"https://www.youtube.com/watch?v={video_id}" return "No video found" except Exception as e: return f"Error searching YouTube: {str(e)}" @tool def decode_vin(vin: str) -> str: """Decode a Vehicle Identification Number (VIN) using the NHTSA API. Args: vin: A 17-character VIN to decode (e.g., "5TDKRKEC7PS142916") Returns: A formatted string with important vehicle details like make, model, year, etc. """ # Validate VIN format: must be exactly 17 alphanumeric characters # VINs do not use I, O, or Q to avoid confusion with 1 and 0 vin = vin.strip().upper() if not re.match(r'^[A-HJ-NPR-Z0-9]{17}$', vin): return "Error: Invalid VIN format. VIN must be exactly 17 characters and contain only letters (excluding I, O, Q) and numbers." try: # Call NHTSA API url = f"https://vpic.nhtsa.dot.gov/api/vehicles/decodevin/{vin}?format=json" response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() if data.get('Count') == 0: return "Error: No data returned from NHTSA API" # Extract important fields from the Results array results = data.get('Results', []) # Create a dictionary of variable names to values for easy lookup vin_data = {item['Variable']: item['Value'] for item in results if item.get('Value')} # Extract key information important_fields = [ 'Make', 'Model', 'Model Year', 'Vehicle Type', 'Body Class', 'Manufacturer Name', 'Plant City', 'Plant State', 'Plant Country', 'Trim', 'Engine Number of Cylinders', 'Displacement (L)', 'Engine Model', 'Fuel Type - Primary', 'Fuel Type - Secondary', 'Electrification Level', 'Transmission Style', 'Drive Type', 'Number of Doors', 'Number of Seats', 'Gross Vehicle Weight Rating From', 'Error Code', 'Error Text' ] # Build output string with only available important fields output_lines = [f"VIN Decoded: {vin}\n"] for field in important_fields: value = vin_data.get(field) if value and value != 'null' and value != 'Not Applicable': output_lines.append(f"{field}: {value}") if len(output_lines) == 1: return "Error: No meaningful data could be extracted from the VIN" return '\n'.join(output_lines) except requests.Timeout: return "Error: Request to NHTSA API timed out" except requests.RequestException as e: return f"Error calling NHTSA API: {str(e)}" except Exception as e: return f"Error decoding VIN: {str(e)}"