| | """ |
| | Encode structured tool declaration to typescript style string. |
| | """ |
| | import dataclasses |
| | import json |
| | import logging |
| | from collections.abc import Sequence |
| | from typing import Any |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | _TS_INDENT = " " |
| | _TS_FIELD_DELIMITER = ",\n" |
| |
|
| |
|
| | class _SchemaRegistry: |
| | """Registry for schema definitions to handle $ref resolution""" |
| |
|
| | def __init__(self): |
| | self.definitions = {} |
| | self.has_self_ref = False |
| |
|
| | def register_definitions(self, defs: dict[str, Any]): |
| | """Register schema definitions from $defs section""" |
| | if not defs: |
| | return |
| | for def_name, def_schema in defs.items(): |
| | self.definitions[def_name] = def_schema |
| |
|
| | def resolve_ref(self, ref: str) -> dict[str, Any]: |
| | """Resolve a reference to its schema definition""" |
| | if ref == "#": |
| | self.has_self_ref = True |
| | return {"$self_ref": True} |
| | elif ref.startswith("#/$defs/"): |
| | def_name = ref.split("/")[-1] |
| | if def_name not in self.definitions: |
| | raise ValueError(f"Reference not found: {ref}") |
| | return self.definitions[def_name] |
| | else: |
| | raise ValueError(f"Unsupported reference format: {ref}") |
| |
|
| |
|
| | def _format_description(description: str, indent: str = "") -> str: |
| | return "\n".join([ |
| | f"{indent}// {line}" if line else "" |
| | for line in description.split("\n") |
| | ]) |
| |
|
| |
|
| | class _BaseType: |
| | description: str |
| | constraints: dict[str, Any] |
| |
|
| | def __init__( |
| | self, |
| | extra_props: dict[str, Any], |
| | *, |
| | allowed_constraint_keys: Sequence[str] = (), |
| | ): |
| | self.description = extra_props.get("description", "") |
| | self.constraints = { |
| | k: v |
| | for k, v in extra_props.items() if k in allowed_constraint_keys |
| | } |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | raise NotImplementedError |
| |
|
| | def format_docstring(self, indent: str) -> str: |
| | lines = [] |
| | if self.description: |
| | lines.append(_format_description(self.description, indent)) |
| | if self.constraints: |
| | constraints_str = ", ".join(f"{k}: {v}" for k, v in sorted( |
| | self.constraints.items(), key=lambda kv: kv[0])) |
| | lines.append(f"{indent}// {constraints_str}") |
| |
|
| | return "".join(x + "\n" for x in lines) |
| |
|
| |
|
| | class _ParameterTypeScalar(_BaseType): |
| | type: str |
| |
|
| | def __init__(self, type: str, extra_props: dict[str, Any] | None = None): |
| | self.type = type |
| |
|
| | allowed_constraint_keys: list[str] = [] |
| | if self.type == "string": |
| | allowed_constraint_keys = ["maxLength", "minLength", "pattern"] |
| | elif self.type in ("number", "integer"): |
| | allowed_constraint_keys = ["maximum", "minimum"] |
| |
|
| | super().__init__(extra_props or {}, |
| | allowed_constraint_keys=allowed_constraint_keys) |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | |
| | if self.type == "integer": |
| | return "number" |
| | return self.type |
| |
|
| |
|
| | class _ParameterTypeObject(_BaseType): |
| | properties: list["_Parameter"] |
| | additional_properties: Any | None = None |
| |
|
| | def __init__(self, |
| | json_schema_object: dict[str, Any], |
| | registry: _SchemaRegistry | None = None): |
| | super().__init__(json_schema_object) |
| |
|
| | self.properties = [] |
| | self.additional_properties = None |
| |
|
| | if not json_schema_object: |
| | return |
| |
|
| | if "$defs" in json_schema_object and registry: |
| | registry.register_definitions(json_schema_object["$defs"]) |
| |
|
| | self.additional_properties = json_schema_object.get( |
| | "additionalProperties") |
| | if isinstance(self.additional_properties, dict): |
| | self.additional_properties = _parse_parameter_type( |
| | self.additional_properties, registry) |
| |
|
| | if "properties" not in json_schema_object: |
| | return |
| |
|
| | required_parameters = json_schema_object.get("required", []) |
| | optional_parameters = set( |
| | json_schema_object["properties"].keys()) - set(required_parameters) |
| |
|
| | self.properties = [ |
| | _Parameter( |
| | name=name, |
| | type=_parse_parameter_type(prop, registry), |
| | optional=name in optional_parameters, |
| | default=prop.get("default") |
| | if isinstance(prop, dict) else None, |
| | ) for name, prop in json_schema_object["properties"].items() |
| | ] |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | |
| | parameters = [p for p in self.properties if not p.optional] |
| | opt_params = [p for p in self.properties if p.optional] |
| |
|
| | parameters = sorted(parameters, key=lambda p: p.name) |
| | parameters.extend(sorted(opt_params, key=lambda p: p.name)) |
| |
|
| | param_strs = [] |
| | for p in parameters: |
| | one = p.to_typescript_style(indent=indent + _TS_INDENT) |
| | param_strs.append(one) |
| |
|
| | if self.additional_properties is not None: |
| | ap_type_str = "any" |
| | if self.additional_properties is True: |
| | ap_type_str = "any" |
| | elif self.additional_properties is False: |
| | ap_type_str = "never" |
| | elif isinstance(self.additional_properties, _ParameterType): |
| | ap_type_str = self.additional_properties.to_typescript_style( |
| | indent=indent + _TS_INDENT) |
| | else: |
| | raise ValueError( |
| | f"Unknown additionalProperties: {self.additional_properties}" |
| | ) |
| | param_strs.append( |
| | f"{indent + _TS_INDENT}[k: string]: {ap_type_str}") |
| |
|
| | if not param_strs: |
| | return "{}" |
| |
|
| | params_str = _TS_FIELD_DELIMITER.join(param_strs) |
| | if params_str: |
| | |
| | params_str = f"\n{params_str}\n" |
| | |
| | return f"{{{params_str}{indent}}}" |
| |
|
| |
|
| | class _ParameterTypeArray(_BaseType): |
| | item: "_ParameterType" |
| |
|
| | def __init__(self, |
| | json_schema_object: dict[str, Any], |
| | registry: _SchemaRegistry | None = None): |
| | super().__init__(json_schema_object, |
| | allowed_constraint_keys=("minItems", "maxItems")) |
| | if json_schema_object.get("items"): |
| | self.item = _parse_parameter_type(json_schema_object["items"], |
| | registry) |
| | else: |
| | self.item = _ParameterTypeScalar(type="any") |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | item_docstring = self.item.format_docstring(indent + _TS_INDENT) |
| | if item_docstring: |
| | return ("Array<\n" + item_docstring + indent + _TS_INDENT + |
| | self.item.to_typescript_style(indent=indent + _TS_INDENT) + |
| | "\n" + indent + ">") |
| | else: |
| | return f"Array<{self.item.to_typescript_style(indent=indent)}>" |
| |
|
| |
|
| | class _ParameterTypeEnum(_BaseType): |
| | |
| | enum: list[str | int | float | bool | None] |
| |
|
| | def __init__(self, json_schema_object: dict[str, Any]): |
| | super().__init__(json_schema_object) |
| | self.enum = json_schema_object["enum"] |
| |
|
| | |
| | if "type" in json_schema_object: |
| | typ = json_schema_object["type"] |
| | if isinstance(typ, list): |
| | if len(typ) == 1: |
| | typ = typ[0] |
| | elif len(typ) == 2: |
| | if "null" not in typ: |
| | raise ValueError(f"Enum type {typ} is not supported") |
| | else: |
| | typ = typ[0] if typ[0] != "null" else typ[1] |
| | else: |
| | raise ValueError(f"Enum type {typ} is not supported") |
| | for val in self.enum: |
| | if val is None: |
| | continue |
| | if typ == "string" and not isinstance(val, str): |
| | raise ValueError(f"Enum value {val} is not a string") |
| | elif typ == "number" and not isinstance(val, (int, float)): |
| | raise ValueError(f"Enum value {val} is not a number") |
| | elif typ == "integer" and not isinstance(val, int): |
| | raise ValueError(f"Enum value {val} is not an integer") |
| | elif typ == "boolean" and not isinstance(val, bool): |
| | raise ValueError(f"Enum value {val} is not a boolean") |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | return " | ".join( |
| | [f'"{e}"' if isinstance(e, str) else str(e) for e in self.enum]) |
| |
|
| |
|
| | class _ParameterTypeAnyOf(_BaseType): |
| | types: list["_ParameterType"] |
| |
|
| | def __init__( |
| | self, |
| | json_schema_object: dict[str, Any], |
| | registry: _SchemaRegistry | None = None, |
| | ): |
| | super().__init__(json_schema_object) |
| | self.types = [ |
| | _parse_parameter_type(t, registry) |
| | for t in json_schema_object["anyOf"] |
| | ] |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | return " | ".join( |
| | [t.to_typescript_style(indent=indent) for t in self.types]) |
| |
|
| |
|
| | class _ParameterTypeUnion(_BaseType): |
| | types: list[str] |
| |
|
| | def __init__(self, json_schema_object: dict[str, Any]): |
| | super().__init__(json_schema_object) |
| |
|
| | mapping = { |
| | "string": "string", |
| | "number": "number", |
| | "integer": "number", |
| | "boolean": "boolean", |
| | "null": "null", |
| | "object": "{}", |
| | "array": "Array<any>", |
| | } |
| | self.types = [mapping[t] for t in json_schema_object["type"]] |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | return " | ".join(self.types) |
| |
|
| |
|
| | class _ParameterTypeRef(_BaseType): |
| | ref_name: str |
| | is_self_ref: bool = False |
| |
|
| | def __init__(self, json_schema_object: dict[str, Any], |
| | registry: _SchemaRegistry): |
| | super().__init__(json_schema_object) |
| |
|
| | ref = json_schema_object["$ref"] |
| | resolved_schema = registry.resolve_ref(ref) |
| |
|
| | if resolved_schema.get("$self_ref", False): |
| | self.ref_name = "parameters" |
| | self.is_self_ref = True |
| | else: |
| | self.ref_name = ref.split("/")[-1] |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | return self.ref_name |
| |
|
| |
|
| | _ParameterType = (_ParameterTypeScalar |
| | | _ParameterTypeObject |
| | | _ParameterTypeArray |
| | | _ParameterTypeEnum |
| | | _ParameterTypeAnyOf |
| | | _ParameterTypeUnion |
| | | _ParameterTypeRef) |
| |
|
| |
|
| | @dataclasses.dataclass |
| | class _Parameter: |
| | """ |
| | A parameter in a function, or a field in a object. |
| | It consists of the type as well as the name. |
| | """ |
| |
|
| | type: _ParameterType |
| | name: str = "_" |
| | optional: bool = True |
| | default: Any | None = None |
| |
|
| | @classmethod |
| | def parse_extended(cls, attributes: dict[str, Any]) -> "_Parameter": |
| | if not attributes: |
| | raise ValueError("attributes is empty") |
| |
|
| | return cls( |
| | name=attributes.get("name", "_"), |
| | type=_parse_parameter_type(attributes), |
| | optional=attributes.get("optional", False), |
| | default=attributes.get("default"), |
| | ) |
| |
|
| | def to_typescript_style(self, indent: str = "") -> str: |
| | comments = self.type.format_docstring(indent) |
| |
|
| | if self.default is not None: |
| | default_repr = (json.dumps(self.default, ensure_ascii=False) |
| | if not isinstance(self.default, (int, float, bool)) |
| | else repr(self.default)) |
| | comments += f"{indent}// Default: {default_repr}\n" |
| |
|
| | return ( |
| | comments + |
| | f"{indent}{self.name}{'?' if self.optional else ''}: {self.type.to_typescript_style(indent=indent)}" |
| | ) |
| |
|
| |
|
| | def _parse_parameter_type( |
| | json_schema_object: dict[str, Any] | bool, |
| | registry: _SchemaRegistry | None = None) -> _ParameterType: |
| | if isinstance(json_schema_object, bool): |
| | if json_schema_object: |
| | return _ParameterTypeScalar(type="any") |
| | else: |
| | logger.warning( |
| | f"Warning: Boolean value {json_schema_object} is not supported, use null instead." |
| | ) |
| | return _ParameterTypeScalar(type="null") |
| |
|
| | if "$ref" in json_schema_object and registry: |
| | return _ParameterTypeRef(json_schema_object, registry) |
| |
|
| | if "anyOf" in json_schema_object: |
| | return _ParameterTypeAnyOf(json_schema_object, registry) |
| | elif "enum" in json_schema_object: |
| | return _ParameterTypeEnum(json_schema_object) |
| | elif "type" in json_schema_object: |
| | typ = json_schema_object["type"] |
| | if isinstance(typ, list): |
| | return _ParameterTypeUnion(json_schema_object) |
| | elif typ == "object": |
| | return _ParameterTypeObject(json_schema_object, registry) |
| | elif typ == "array": |
| | return _ParameterTypeArray(json_schema_object, registry) |
| | else: |
| | return _ParameterTypeScalar(typ, json_schema_object) |
| | elif json_schema_object == {}: |
| | return _ParameterTypeScalar(type="any") |
| | else: |
| | raise ValueError(f"Invalid JSON Schema object: {json_schema_object}") |
| |
|
| |
|
| | def _openai_function_to_typescript_style(function: dict[str, Any], ) -> str: |
| | """Convert OpenAI function definition (dict) to TypeScript style string.""" |
| | registry = _SchemaRegistry() |
| | parameters = function.get("parameters") or {} |
| | parsed = _ParameterTypeObject(parameters, registry) |
| |
|
| | interfaces = [] |
| | root_interface_name = None |
| | if registry.has_self_ref: |
| | root_interface_name = "parameters" |
| | params_str = _TS_FIELD_DELIMITER.join([ |
| | p.to_typescript_style(indent=_TS_INDENT) for p in parsed.properties |
| | ]) |
| | params_str = f"\n{params_str}\n" if params_str else "" |
| | interface_def = f"interface {root_interface_name} {{{params_str}}}" |
| | interfaces.append(interface_def) |
| |
|
| | definitions_copy = dict(registry.definitions) |
| | for def_name, def_schema in definitions_copy.items(): |
| | obj_type = _parse_parameter_type(def_schema, registry) |
| | params_str = obj_type.to_typescript_style() |
| |
|
| | description_part = "" |
| | if obj_description := def_schema.get("description", ""): |
| | description_part = _format_description(obj_description) + "\n" |
| |
|
| | interface_def = f"{description_part}interface {def_name} {params_str}" |
| | interfaces.append(interface_def) |
| |
|
| | interface_str = "\n".join(interfaces) |
| | function_name = function.get("name", "function") |
| | if root_interface_name: |
| | type_def = f"type {function_name} = (_: {root_interface_name}) => any;" |
| | else: |
| | params_str = parsed.to_typescript_style() |
| | type_def = f"type {function_name} = (_: {params_str}) => any;" |
| |
|
| | description = function.get("description") |
| | return "\n".join( |
| | filter( |
| | bool, |
| | [ |
| | interface_str, |
| | ((description and _format_description(description)) or ""), |
| | type_def, |
| | ], |
| | )) |
| |
|
| |
|
| | def encode_tools_to_typescript_style(tools: list[dict[str, Any]], ) -> str: |
| | """ |
| | Convert tools (list of dict) to TypeScript style string. |
| | |
| | Supports OpenAI format: {"type": "function", "function": {...}} |
| | |
| | Args: |
| | tools: List of tool definitions in dict format |
| | |
| | Returns: |
| | TypeScript style string representation of the tools |
| | """ |
| | if not tools: |
| | return "" |
| |
|
| | functions = [] |
| |
|
| | for tool in tools: |
| | tool_type = tool.get("type") |
| | if tool_type == "function": |
| | func_def = tool.get("function", {}) |
| | if func_def: |
| | functions.append( |
| | _openai_function_to_typescript_style(func_def)) |
| | else: |
| | |
| | continue |
| |
|
| | if not functions: |
| | return "" |
| |
|
| | functions_str = "\n".join(functions) |
| | result = "# Tools\n\n" |
| |
|
| | if functions_str: |
| | result += "## functions\nnamespace functions {\n" |
| | result += functions_str + "\n" |
| | result += "}\n" |
| |
|
| | return result |
| |
|