heaven_base.baseheaventool
1# A metaprogramming wrapper for Langchain BaseTool and Tool construction 2from copy import deepcopy 3from dataclasses import dataclass, fields, replace 4from typing import Any, Dict, Optional, Type, Callable, ClassVar, Literal, List, Union 5from langchain_core.callbacks import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun 6from langchain_core.runnables import RunnableConfig 7from pydantic import BaseModel, Field, create_model, ConfigDict, VERSION 8try: 9 from pydantic import Extra # For Pydantic 2.10.6 10except ImportError: 11 Extra = None # For Pydantic 2.11+ 12 13# Check if we're using Pydantic 2.11+ (where Extra is deprecated) 14PYDANTIC_2_11_PLUS = tuple(map(int, VERSION.split('.')[:2])) >= (2, 11) 15from langchain_core.tools import BaseTool, Tool 16from langchain_core.utils.function_calling import convert_to_openai_tool 17from abc import abstractmethod, ABC 18from langchain_core.utils.json_schema import dereference_refs 19import importlib 20from collections.abc import Mapping, Iterable 21 22 23tool_log_path = "/tmp/tool_debug.log" 24 25def schema_to_pydantic_model(model_name: str, schema: dict) -> type: 26 """ 27 Convert a cleaned JSON schema back into a Pydantic model. 28 NOTE: Supports only basic 'type', 'properties', 'required', 'description'. 29 """ 30 fields = {} 31 props = schema.get("properties", {}) 32 required = set(schema.get("required", [])) 33 34 type_map = { 35 "string": str, 36 "integer": int, 37 "number": float, 38 "boolean": bool, 39 "array": list, 40 "object": dict, 41 } 42 43 for field_name, field_schema in props.items(): 44 t = type_map.get(field_schema.get("type"), Any) 45 desc = field_schema.get("description", "") 46 is_required = field_name in required 47 field_def = (t, Field(... if is_required else None, description=desc)) 48 fields[field_name] = field_def 49 50 return create_model(model_name, **fields) 51 52# These will need a Message class... 53 54@dataclass(frozen=True) 55class UserMessage: 56 content: str 57 user_id: Optional[str] = None 58 timestamp: Optional[str] = None 59 is_pseudo: Optional[bool] = False 60 61@dataclass(frozen=True) 62class AgentMessage: 63 content: str 64 agent_id: Optional[str] = None 65 tool_call: Optional["ToolUse"] = None # If tool triggered in content 66 timestamp: Optional[str] = None 67 68@dataclass(frozen=True) 69class ToolUse: 70 tool_name: str 71 arguments: dict 72 tool_call_id: Optional[str] = None 73 agent_id: Optional[str] = None 74 75 76 77 78if PYDANTIC_2_11_PLUS: 79 # Pydantic 2.11+ style - use ConfigDict 80 ForbidExtraConfig = ConfigDict(extra='forbid') 81else: 82 # Pydantic 2.10.6 style - use class config 83 class ForbidExtraConfig: 84 extra = Extra.forbid 85 86 87@dataclass(kw_only=True, frozen=True) 88class ToolResult: 89 """Represents the result of a tool execution.""" 90 output: Optional[str] = None 91 error: Optional[str] = None 92 base64_image: Optional[str] = None 93 system: Optional[str] = None 94 95 def __bool__(self): 96 return any(getattr(self, field.name) for field in fields(self)) 97 98 def __add__(self, other: "ToolResult"): 99 def combine_fields( 100 field: Optional[str], other_field: Optional[str], concatenate: bool = True 101 ): 102 if field and other_field: 103 if concatenate: 104 return field + other_field 105 raise ValueError("Cannot combine tool results") 106 return field or other_field 107 108 return ToolResult( 109 output=combine_fields(self.output, other.output), 110 error=combine_fields(self.error, other.error), 111 base64_image=combine_fields(self.base64_image, other.base64_image, False), 112 system=combine_fields(self.system, other.system), 113 ) 114 115 def replace(self, **kwargs): 116 return replace(self, **kwargs) 117 118class CLIResult(ToolResult): 119 """A ToolResult that can be rendered as a CLI output.""" 120 121class ToolFailure(ToolResult): 122 """A ToolResult that represents a failure.""" 123 124class ToolError(Exception): 125 """Raised when a tool encounters an error.""" 126 def __init__(self, message): 127 self.message = f"ERROR!!! {message}" 128 super().__init__(message) 129 130 131def fix_ref_paths(schema: dict) -> dict: 132 """Fix $ref paths in schema by replacing #/$defs/ with #/defs/""" 133 schema_copy = deepcopy(schema) 134 135 def _fix_refs_recursive(obj): 136 if isinstance(obj, dict): 137 if "$ref" in obj and isinstance(obj["$ref"], str): 138 obj["$ref"] = obj["$ref"].replace("/$defs/", "/defs/") 139 for k, v in list(obj.items()): 140 if isinstance(v, (dict, list)): 141 _fix_refs_recursive(v) 142 elif isinstance(obj, list): 143 for item in obj: 144 if isinstance(item, (dict, list)): 145 _fix_refs_recursive(item) 146 147 _fix_refs_recursive(schema_copy) 148 return schema_copy 149 150def flatten_array_anyof(schema: dict) -> dict: 151 """ 152 If the schema has an 'anyOf' that contains one branch with type "array" 153 and another with type "null", flatten it to a single array schema with 154 'nullable': true. 155 """ 156 if "anyOf" in schema and isinstance(schema["anyOf"], list): 157 array_branch = None 158 null_branch = False 159 for branch in schema["anyOf"]: 160 if branch.get("type") == "array": 161 array_branch = branch 162 elif branch.get("type") == "null": 163 null_branch = True 164 if array_branch and null_branch: 165 new_schema = dict(schema) 166 new_schema.pop("anyOf") 167 new_schema["type"] = "array" 168 new_schema["items"] = array_branch.get("items", {}) 169 if "default" in schema: 170 new_schema["default"] = schema["default"] 171 new_schema["nullable"] = True 172 if "description" in schema: 173 new_schema["description"] = schema["description"] 174 return new_schema 175 return schema 176 177def recursive_flatten(schema: Union[dict, list]) -> Union[dict, list]: 178 if isinstance(schema, dict): 179 new_schema = flatten_array_anyof(schema) 180 for key, value in new_schema.items(): 181 if isinstance(value, dict) or isinstance(value, list): 182 new_schema[key] = recursive_flatten(value) 183 return new_schema 184 elif isinstance(schema, list): 185 return [recursive_flatten(item) if isinstance(item, dict) else item for item in schema] 186 else: 187 return schema 188 189def fix_empty_object_properties(schema: Union[dict, list]) -> Union[dict, list]: 190 """ 191 Recursively fixes any object-type schema that has an empty 'properties' 192 dict by removing 'properties' and adding 'additionalProperties': True. 193 """ 194 if isinstance(schema, dict): 195 # Check if this is an object with empty properties. 196 if schema.get("type") == "object": 197 if "properties" in schema and not schema["properties"]: 198 # Remove the empty properties and allow arbitrary keys. 199 del schema["properties"] 200 schema["additionalProperties"] = True 201 # Recurse over dictionary values. 202 new_schema = {} 203 for key, value in schema.items(): 204 new_schema[key] = fix_empty_object_properties(value) if isinstance(value, (dict, list)) else value 205 return new_schema 206 elif isinstance(schema, list): 207 return [fix_empty_object_properties(item) if isinstance(item, (dict, list)) else item for item in schema] 208 return schema 209 210def generate_dereferenced_schema(schema: Union[dict, Type[BaseModel]]) -> dict: 211 """ 212 Returns a fully dereferenced (flattened) JSON schema. 213 If a Pydantic model is passed, generate its JSON schema; 214 if a dict is passed, assume it's already a JSON schema. 215 Additionally, flatten array schemas that use an "anyOf" and fix empty 216 object properties to support Gemini. 217 """ 218 if isinstance(schema, dict): 219 raw_schema = schema 220 else: 221 raw_schema = schema.model_json_schema(ref_template="#/defs/{model}") 222 # ADDED FOR ADK COMPLIANCE 223 # Fix $ref paths before renaming $defs to defs 224 raw_schema = fix_ref_paths(raw_schema) 225 ######## 226 if "$defs" in raw_schema: 227 raw_schema["defs"] = raw_schema.pop("$defs") 228 inlined = dereference_refs(raw_schema) 229 inlined.pop("defs", None) 230 # flattened = recursive_flatten(inlined) 231 # fixed = fix_empty_object_properties(flattened) 232 fixed = fix_empty_object_properties(inlined) 233 return fixed 234 235 236 237class ToolArgsSchema(BaseModel): 238 """Meta-validator for tool arguments ensuring LangChain compatibility""" 239 arguments: Dict[str, Dict[str, Any]] = Field( 240 ..., description="Validated tool argument specifications" 241 ) 242 243# V6 244 @classmethod 245 def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type: 246 mapping = {str: "string", int: "integer", float: "number", bool: "boolean"} 247 json_item_type = mapping.get(item_type, "string") 248 249 class CustomList(list): 250 @classmethod 251 def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any: 252 # Use the default core schema for lists. 253 return handler.generate_schema(list) 254 255 @classmethod 256 def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]: 257 # Get the default JSON schema from the core schema. 258 json_schema = handler(core_schema) 259 # Ensure that the "items" property exists. 260 json_schema.setdefault("items", {}) 261 # Add the "type" to the items if it isn't already provided. 262 if "type" not in json_schema["items"]: 263 json_schema["items"]["type"] = json_item_type 264 # Add the custom description. 265 json_schema["items"]["description"] = f"Item of type {item_type_str}" 266 return json_schema 267 268 return CustomList 269 270 271 272 @classmethod 273 def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]: 274 """ 275 Converts argument definitions into a dynamic Pydantic model. 276 For list fields with primitive items, we use a custom list type that populates 277 the "items" schema field properly. 278 """ 279 schema_fields = {} 280 type_mapping = { 281 'int': int, 'str': str, 'float': float, 'bool': bool, 282 'integer': int, 'string': str, 'number': float, 'boolean': bool, 283 'list': list, 'array': list, # We'll override list fields below. 284 'dict': Dict[str, Any], 'object': Dict[str, Any], 285 } 286 287 for arg_name, arg_details in arguments.items(): 288 if not isinstance(arg_details, dict): 289 continue 290 291 arg_type_str = arg_details.get('type', 'string') 292 description = arg_details.get('description', '') 293 is_required = arg_details.get('required', True) 294 default_value = arg_details.get('default', None) 295 296 # Determine the field type. 297 if arg_type_str in ('dict', 'object'): 298 schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details) 299 elif arg_type_str in ('list', 'array'): 300 item_info = arg_details.get('items', {}) 301 item_type_str = item_info.get('type', 'any') 302 if item_type_str == 'any': 303 item_type_str = 'str' 304 if item_type_str in ('dict', 'object'): 305 item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info) 306 schema_field_type = List[item_model] # Use standard list of nested model. 307 else: 308 primitive_type = type_mapping.get(item_type_str, str) 309 # Instead of List[primitive_type], use a custom list type. 310 schema_field_type = cls.custom_list_type(primitive_type, item_type_str) 311 else: 312 schema_field_type = type_mapping.get(arg_type_str, str) 313 314 # Append default info into description. 315 final_description = description 316 if default_value is not None: 317 final_description += f" (Defaults to {repr(default_value)})" 318 arg_details.pop('default', None) 319 320 field_kwargs = {"description": final_description} 321 if not is_required: 322 field_kwargs["default"] = None 323 schema_field_type = Optional[schema_field_type] 324 325 schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs)) 326 327 model_name = f"DynamicArgsSchema_{id(arguments)}" 328 329 # Handle both Pydantic 2.10.6 and 2.11+ syntax 330 if PYDANTIC_2_11_PLUS: 331 # Pydantic 2.11+ - use __config_class__ with ConfigDict 332 return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields) 333 else: 334 # Pydantic 2.10.6 - use __config__ with class 335 return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields) 336 337 @classmethod 338 def _create_nested_model_recursive(cls, model_name: str, arg_definition: Dict[str, Any]) -> Type[BaseModel]: 339 """ 340 Recursively creates a nested Pydantic model from the provided definition. 341 Now treats 'nested' blocks as true sub-models instead of flattening them. 342 """ 343 from pydantic import create_model, Field, BaseModel 344 from typing import Dict, Any, Optional, List 345 346 known_meta = {'name','type','description','required','default','items','additionalProperties','nested'} 347 type_map = { 348 'integer': int, 'int': int, 349 'string': str, 'str': str, 350 'number': float, 'float': float, 351 'boolean': bool,'bool': bool, 352 'list': list,'array': list, 353 'dict': dict,'object': dict, 354 } 355 356 # 1) Handle any direct inline fields (outside of nested) 357 schema_fields: Dict[str, Any] = {} 358 for key, val in dict(arg_definition).items(): 359 if key in known_meta: 360 continue 361 if isinstance(val, dict) and 'type' in val: 362 # primitive or list/object without a nested sub‐block 363 field_type = type_map.get(val['type'], str) 364 if val['type'] in ('list','array'): 365 # list of primitives or dicts 366 items = val.get('items', {}) 367 sub_type_str = items.get('type','string') 368 sub_py = type_map.get(sub_type_str, str) 369 # use your custom list type to keep items schema 370 field_type = cls.custom_list_type(sub_py, sub_type_str) 371 schema_fields[key] = ( 372 Optional[field_type] if not val.get('required',True) else field_type, 373 Field( 374 default=None if not val.get('required',True) else ..., 375 description=val.get('description','') 376 ) 377 ) 378 379 # 2) Now build sub‐models for each group in 'nested' 380 nested = arg_definition.get('nested', {}) or {} 381 for group_name, group_props in nested.items(): 382 sub_model = cls._create_nested_model_recursive(f"{model_name}_{group_name}", group_props) 383 # include it as a required or optional field 384 required = group_props.get('required', True) 385 schema_fields[group_name] = ( 386 Optional[sub_model] if not required else sub_model, 387 Field( 388 default=None if not required else ..., 389 description=group_props.get('description','') 390 ) 391 ) 392 393 # 3) Create & return the Pydantic model for this level 394 # Handle both Pydantic 2.10.6 and 2.11+ syntax 395 if PYDANTIC_2_11_PLUS: 396 # Pydantic 2.11+ - use __config_class__ with ConfigDict 397 return create_model( 398 model_name, 399 __config_class__ = ForbidExtraConfig, 400 **schema_fields 401 ) 402 else: 403 # Pydantic 2.10.6 - use __config__ with class 404 return create_model( 405 model_name, 406 __config__ = ForbidExtraConfig, 407 **schema_fields 408 ) 409 410 @classmethod 411 def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: 412 """ 413 Validates the argument definitions, ensuring they have required metadata 414 and supported types. Returns the validated arguments with defaults moved to descriptions. 415 """ 416 known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'} 417 supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object'] 418 419 for arg_name, arg_details in arguments.items(): 420 if not isinstance(arg_details, dict): 421 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary") 422 423 # Ensure required keys are present 424 if not all(key in arg_details for key in ['name', 'type', 'description']): 425 missing = [k for k in ['name', 'type', 'description'] if k not in arg_details] 426 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}") 427 428 # Add default 'required=True' if missing 429 if 'required' not in arg_details: 430 arg_details['required'] = True 431 432 # Validate that 'required' is a boolean 433 if not isinstance(arg_details['required'], bool): 434 raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean") 435 436 # Validate type is supported 437 if arg_details['type'] not in supported_types: 438 raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}") 439 440 # Move default to description if present 441 if 'default' in arg_details: 442 default_value = arg_details['default'] 443 arg_details['description'] += f" (Defaults to {repr(default_value)})" 444 del arg_details['default'] 445 446 # Process any nested structure the same way 447 if 'nested' in arg_details: 448 # Validate the nested structure 449 nested = arg_details['nested'] 450 if not isinstance(nested, dict): 451 raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary") 452 453 # Validate each nested item's arguments 454 for item_key, item_args in nested.items(): 455 if not isinstance(item_args, dict): 456 raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary") 457 458 # Validate each argument 459 for sub_arg_name, sub_arg_def in item_args.items(): 460 if not isinstance(sub_arg_def, dict): 461 raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 462 463 # Ensure required metadata is present 464 if 'type' not in sub_arg_def: 465 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'") 466 467 if 'description' not in sub_arg_def: 468 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'") 469 470 # Validate type is supported 471 if sub_arg_def['type'] not in supported_types: 472 raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}") 473 474 # Add default 'required=False' if missing for nested properties 475 if 'required' not in sub_arg_def: 476 sub_arg_def['required'] = False 477 478 # Validate that 'required' is a boolean 479 if not isinstance(sub_arg_def['required'], bool): 480 raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean") 481 482 # Move default to description if present 483 if 'default' in sub_arg_def: 484 default_value = sub_arg_def['default'] 485 sub_arg_def['description'] += f" (Defaults to {repr(default_value)})" 486 del sub_arg_def['default'] 487 488 # Recursively process any nested structure within this 489 if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def: 490 cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def}) 491 492 # Handle nested validations for complex types 493 if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def: 494 items = sub_arg_def['items'] 495 if not isinstance(items, dict): 496 raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 497 498 if 'type' in items and items['type'] not in supported_types and items['type'] != 'any': 499 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}") 500 501 # Recursively validate nested structures (regular pattern) 502 if arg_details['type'] in ['dict', 'object']: 503 # Check for nested definitions and validate them 504 for key, value in arg_details.items(): 505 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 506 # This is a nested property - validate it as well 507 cls._validate_nested_property(f"{arg_name}.{key}", value) 508 509 # Validate list item type if it's a list 510 if arg_details['type'] in ['list', 'array']: 511 if 'items' not in arg_details: 512 arg_details['items'] = {'type': 'any'} # Default to Any if not specified 513 elif not isinstance(arg_details['items'], dict): 514 raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary") 515 elif 'type' not in arg_details['items']: 516 arg_details['items']['type'] = 'any' # Default to Any if type not specified 517 518 items = arg_details['items'] 519 item_type = items.get('type') 520 521 if item_type not in supported_types and item_type != 'any': 522 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}") 523 524 if item_type in ['dict', 'object']: 525 # Add description to items if missing 526 if 'description' not in items: 527 items['description'] = f"Item of {arg_name} list" 528 529 # Recursively validate nested item properties if they exist 530 for key, value in items.items(): 531 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 532 cls._validate_nested_property(f"{arg_name}[].{key}", value) 533 534 return arguments 535 536 @classmethod 537 def _validate_nested_property(cls, path: str, prop_details: Dict[str, Any]): 538 """ 539 Validates a nested property definition recursively. 540 541 Args: 542 path: The path to this property (for error messages) 543 prop_details: The property details to validate 544 """ 545 # Check required metadata 546 if 'type' not in prop_details: 547 raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'type'") 548 549 # Check for description 550 if 'description' not in prop_details: 551 raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'description'") 552 553 # Validate type is supported 554 supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object'] 555 if prop_details['type'] not in supported_types: 556 raise ValueError(f"ToolArgsSchema Error: Unsupported type for {path}: {prop_details['type']}") 557 558 # Add default 'required=False' if missing for nested properties 559 if 'required' not in prop_details: 560 prop_details['required'] = False 561 562 # Validate that 'required' is a boolean 563 if not isinstance(prop_details['required'], bool): 564 raise ValueError(f"ToolArgsSchema Error: The 'required' field for {path} must be a boolean") 565 566 # Move default to description if present 567 if 'default' in prop_details: 568 default_value = prop_details['default'] 569 prop_details['description'] += f" (Defaults to {repr(default_value)})" 570 del prop_details['default'] 571 572 # Known metadata keys to skip when looking for nested properties 573 known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'} 574 575 # Handle nested structure within this property 576 if 'nested' in prop_details and isinstance(prop_details['nested'], dict): 577 nested = prop_details['nested'] 578 for item_key, item_args in nested.items(): 579 if isinstance(item_args, dict): 580 for sub_arg_name, sub_arg_def in item_args.items(): 581 if isinstance(sub_arg_def, dict) and 'type' in sub_arg_def: 582 # Add directly to the parent object without prefixing 583 prop_details[sub_arg_name] = sub_arg_def 584 # Once processed, remove the nested key to avoid double-processing 585 del prop_details['nested'] 586 587 # If this is a dict/object, recursively validate its properties 588 if prop_details['type'] in ['dict', 'object']: 589 # Check for nested definitions and validate them 590 for key, value in prop_details.items(): 591 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 592 cls._validate_nested_property(f"{path}.{key}", value) 593 594 # If this is a list/array, validate its items 595 if prop_details['type'] in ['list', 'array']: 596 if 'items' not in prop_details: 597 prop_details['items'] = {'type': 'any'} 598 elif not isinstance(prop_details['items'], dict): 599 raise ValueError(f"ToolArgsSchema Error: 'items' for {path} must be a dictionary") 600 elif 'type' not in prop_details['items']: 601 prop_details['items']['type'] = 'any' 602 603 items = prop_details['items'] 604 item_type = items.get('type') 605 606 if item_type not in supported_types and item_type != 'any': 607 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {path}: {item_type}") 608 609 if item_type in ['dict', 'object']: 610 # Add description to items if missing 611 if 'description' not in items: 612 items['description'] = f"Item of {path}" 613 614 # Recursively validate nested item properties if they exist 615 for key, value in items.items(): 616 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 617 cls._validate_nested_property(f"{path}[].{key}", value) 618 619 620 621class BaseHeavenTool(ABC): 622 """Provider-agnostic tool base class with standardized results""" 623 # Required class attributes that tools must define 624 name: str 625 description: str 626 func: Callable 627 args_schema: Type[ToolArgsSchema] 628 base_tool: BaseTool 629 is_async: bool 630 631 def __init__( 632 self, 633 base_tool: BaseTool, 634 args_schema: Type[ToolArgsSchema], 635 is_async: bool = False 636 ): 637 # Set properties (no more super() call since we're not inheriting from BaseTool) 638 self.args_schema = args_schema 639 self.base_tool = base_tool 640 self.is_async = is_async 641 642 # Create instance and validate arguments schema 643 schema_instance = args_schema() 644 ToolArgsSchema.validate_arguments(schema_instance.arguments) 645 646 def _clean_kwargs(self, obj: Any) -> Any: 647 """ 648 Recursively return a *copy* of `obj` in which every mapping key that looks 649 like "'some_key'" is replaced by "some_key". 650 651 • Works for any Mapping subclass (dict, OrderedDict, defaultdict, …). 652 • Keeps list / tuple / set semantics identical. 653 • Primitives and values are left untouched. 654 """ 655 # ---- Handle any mapping (dict-like) --------------------------- 656 if isinstance(obj, Mapping): 657 cleaned_items = {} 658 for k, v in obj.items(): 659 new_k = k 660 if ( 661 isinstance(k, str) 662 and k.startswith("'") 663 and k.endswith("'") 664 ): 665 stripped = k[1:-1] 666 if stripped != k: 667 # logging.warning( 668 # "Sanitizing malformed tool-argument key: %r → %r", 669 # k, stripped 670 # ) 671 new_k = stripped 672 673 cleaned_items[new_k] = self._clean_kwargs(v) # recurse on value 674 675 # Rebuild using the same mapping type when possible 676 try: 677 return type(obj)(cleaned_items) 678 except Exception: 679 return cleaned_items 680 681 # ---- Handle iterables that can hold nested mappings ----------- 682 if isinstance(obj, (list, tuple, set)): 683 cleaned_iter = [self._clean_kwargs(item) for item in obj] 684 return type(obj)(cleaned_iter) 685 686 # ---- Walk other iterables (e.g., generators) ------------------ 687 if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)): 688 return type(obj)(self._clean_kwargs(item) for item in obj) 689 690 # ---- Base case ------------------------------------------------ 691 return obj 692 693 694 def _run( 695 self, 696 run_manager: Optional[CallbackManagerForToolRun] = None, 697 config: Optional[RunnableConfig] = None, 698 **kwargs 699 ) -> ToolResult: 700 """Synchronous execution path using wrapped base tool""" 701 cleaned_kwargs = self._clean_kwargs(kwargs) 702 if cleaned_kwargs: 703 kwargs = cleaned_kwargs 704 try: 705 if self.is_async: 706 raise ValueError("Tool marked as async but _run was called") 707 708 if hasattr(self.base_tool, '_run'): 709 result = self.base_tool._run(run_manager=run_manager, config=config, **kwargs) 710 else: 711 # Fallback to func if _run is not available 712 result = self.base_tool.func(**kwargs) 713 714 715 if isinstance(result, ToolResult): 716 return result # Already a ToolResult (includes CLIResult) 717 return ToolResult(output=str(result)) 718 except ToolError as e: 719 return ToolResult(error=str(e)) 720 except Exception as e: 721 return ToolResult(error=f"Error in tool '{self.name}': {e}") 722 723 async def _arun( 724 self, 725 run_manager: Optional[AsyncCallbackManagerForToolRun] = None, 726 config: Optional[RunnableConfig] = None, 727 **kwargs 728 ) -> ToolResult: 729 """Asynchronous execution path using wrapped base tool""" 730 cleaned_kwargs = self._clean_kwargs(kwargs) 731 if cleaned_kwargs: 732 kwargs = cleaned_kwargs 733 try: 734 735 # Improved async handling 736 if not self.is_async: 737 # Use asyncio.to_thread for sync functions 738 import asyncio 739 740 result = await asyncio.to_thread(self.base_tool.func, **kwargs) 741 else: 742 # Check for native async methods 743 if hasattr(self.base_tool, '_arun'): 744 result = await self.base_tool._arun(run_manager=run_manager, config=config, **kwargs) 745 elif hasattr(self.base_tool, 'arun'): 746 result = await self.base_tool.arun(**kwargs) 747 else: 748 749 # Check if the function itself is async 750 import inspect 751 if inspect.iscoroutinefunction(self.base_tool.func): 752 result = await self.base_tool.func(**kwargs) 753 else: 754 # Fallback to thread-based execution only for sync functions 755 import asyncio 756 result = await asyncio.to_thread(self.base_tool.func, **kwargs) 757 758 # Handle result types 759 if isinstance(result, ToolResult): 760 return result # Already a ToolResult (includes CLIResult) 761 return ToolResult(output=str(result)) 762 except ToolError as e: 763 return ToolResult(error=str(e)) 764 except Exception as e: 765 return ToolResult(error=f"Error in tool '{self.name}': {e}") 766 767 def get_spec(self) -> dict: 768 """Get this tool's specification""" 769 return { 770 "name": self.name, 771 "description": self.description, 772 "args": self.args_schema().arguments # That's it. The dictionary is already there. 773 } 774 775 @classmethod 776 def to_openai_function(cls): 777 """Convert the tool's schema to OpenAI function format""" 778 # Create the Pydantic schema 779 schema_instance = cls.args_schema() 780 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 781 # Use LangChain's built-in converter 782 openai_function = convert_to_openai_tool( 783 Tool( 784 name=cls.name, 785 description=cls.description, 786 func=cls.func, 787 args_schema=pydantic_schema 788 ) 789 ) 790 791 return openai_function 792 793 def get_openai_function(self): 794 """Get the OpenAI function specification""" 795 return self.__class__.to_openai_function() 796 797 @classmethod 798 def create_adk_tool(cls, func): 799 from google.adk.tools import FunctionTool, BaseTool 800 801 # Step 1: Get original schema and dynamic Pydantic model 802 schema_instance = cls.args_schema() 803 DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments) 804 flat_schema = generate_dereferenced_schema(DynamicModel) 805 prop_schemas = flat_schema.get("properties", {}) 806 top_required = flat_schema.get("required", []) 807 original_defs = schema_instance.arguments 808 809 # Step 2: Build wrapped function with proper signature (from flattened schema) 810 import inspect 811 from typing import Optional, get_args, get_origin 812 813 param_defs = [] 814 for name, definition in prop_schemas.items(): 815 typ = definition.get("type", "string") 816 is_required = name in top_required 817 default = None if not is_required else inspect._empty 818 819 # Very basic type mapping 820 type_map = { 821 "string": str, 822 "integer": int, 823 "number": float, 824 "boolean": bool, 825 "array": list, 826 "object": dict 827 } 828 resolved_type = type_map.get(typ, str) 829 if not is_required: 830 resolved_type = Optional[resolved_type] 831 832 param_defs.append((name, resolved_type, default)) 833 834 # Create the function dynamically 835 def create_wrapped_function(original_func, func_name, param_defs): 836 params = [] 837 for name, typ, default in param_defs: 838 param = inspect.Parameter( 839 name, 840 inspect.Parameter.POSITIONAL_OR_KEYWORD, 841 annotation=typ, 842 default=default 843 ) 844 params.append(param) 845 846 sig = inspect.Signature(params) 847 848 async def wrapper(**kwargs): 849 import inspect, asyncio 850 851 if inspect.iscoroutinefunction(original_func): 852 return await original_func(**kwargs) 853 else: 854 # run sync code in thread to avoid blocking 855 return await asyncio.to_thread(original_func, **kwargs) 856 857 858 wrapper.__name__ = func_name 859 wrapper.__qualname__ = func_name 860 wrapper.__signature__ = sig 861 return wrapper 862 863 wrapped_func = create_wrapped_function(func, cls.name, param_defs) 864 865 # Step 3: Create ADK FunctionTool 866 orig_tool = FunctionTool(func=wrapped_func) 867 orig_tool.description = cls.description 868 # Step 4: Schema fixer merge logic 869 def merge(adk_prop, pschema, info_def=None): 870 enum_cls = type(adk_prop.type) 871 if "anyOf" in pschema: 872 branches = pschema.pop("anyOf") 873 if any(b.get("type") == "null" for b in branches): 874 adk_prop.nullable = True 875 arr = next((b for b in branches if b.get("type") == "array"), None) 876 chosen = arr or next((b for b in branches if b.get("type") != "null"), {}) 877 pschema = chosen 878 879 if isinstance(info_def, dict): 880 if info_def.get("name"): 881 adk_prop.title = info_def["name"] 882 if not getattr(adk_prop, "description", None) and info_def.get("description"): 883 adk_prop.description = info_def["description"] 884 885 if "description" in pschema: 886 adk_prop.description = pschema["description"] 887 888 if "items" in pschema or pschema.get("type") == "array": 889 adk_prop.type = enum_cls.ARRAY 890 elif "type" in pschema: 891 t = pschema["type"].upper() 892 if hasattr(enum_cls, t): 893 adk_prop.type = getattr(enum_cls, t) 894 895 if adk_prop.type == enum_cls.OBJECT: 896 adk_prop.properties = adk_prop.properties or {} 897 # nested_defs = {} 898 # if isinstance(info_def, dict): 899 # if isinstance(info_def.get("nested"), dict): 900 # nested_defs = info_def["nested"] 901 # else: 902 # nested_defs = { 903 # k: v for k, v in info_def.items() 904 # if isinstance(v, dict) and ("type" in v or "nested" in v) 905 # } 906 # always allow any dict under info_def[key] as child_info 907 for key, child_ps in pschema.get("properties", {}).items(): 908 child_info = {} 909 if isinstance(info_def, dict) and isinstance(info_def.get(key), dict): 910 child_info = info_def[key] 911 # for key, child_ps in pschema.get("properties", {}).items(): 912 # child_info = nested_defs.get(key, {}) 913 if key not in adk_prop.properties: 914 Placeholder = type(adk_prop) 915 adk_prop.properties[key] = Placeholder.model_construct( 916 __pydantic_initialised__=True, 917 name=key, 918 title=key, 919 description="", 920 properties=None, 921 items=None, 922 required=[], 923 type=adk_prop.type, 924 ) 925 merge(adk_prop.properties[key], child_ps, child_info) 926 927 # req = pschema.get("required") 928 # if req is None: 929 # req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")] 930 # adk_prop.required = req or [] 931 # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def 932 req = pschema.get("required") 933 if req is None: 934 req = [] 935 if isinstance(info_def, dict): 936 req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")] 937 adk_prop.required = req or [] 938 939 elif adk_prop.type == enum_cls.ARRAY: 940 if adk_prop.items is None: 941 Placeholder = type(adk_prop) 942 adk_prop.items = Placeholder.model_construct( 943 __pydantic_initialised__=True, 944 name=adk_prop.title or "", 945 title=adk_prop.title or "", 946 description=pschema.get("items", {}).get("description", ""), 947 properties=None, 948 items=None, 949 required=[], 950 type=adk_prop.type, 951 ) 952 merge(adk_prop.items, pschema.get("items", {}), None) 953#### LITELLM ### 954 def normalize_schema_decl(schema): 955 from enum import Enum 956 # `schema` is a google.genai.types.Schema or similar 957 # 1️⃣ normalize this node’s type 958 if hasattr(schema, "type") and isinstance(schema.type, Enum): 959 schema.type = schema.type.name.lower() 960 # 2️⃣ normalize any nested properties 961 if getattr(schema, "properties", None): 962 for prop in schema.properties.values(): 963 normalize_schema_decl(prop) 964 # 3️⃣ normalize array items 965 if getattr(schema, "items", None): 966 normalize_schema_decl(schema.items) 967 return schema 968#### LITELLM #### 969 # Step 5: Wrap in ADK BaseTool with fixed declaration 970 class RequiredFieldsFixTool(BaseTool): 971 def __init__(self, orig_tool): 972 self._orig_tool = orig_tool 973 super().__init__(name=orig_tool.name, description=orig_tool.description) 974 975 async def run_async(self, **kwargs): 976 try: 977 # 1) invoke the underlying tool (may be sync or async) 978 raw = self._orig_tool.run_async(**kwargs) 979 result = await raw if inspect.isawaitable(raw) else raw 980 981 # 2) if it's already a ToolResult or CLIResult, just return it 982 if isinstance(result, ToolResult) or isinstance(result, CLIResult): 983 return result 984 985 # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields 986 if isinstance(result, dict): 987 # ADK wraps tool output under result["response"]["result"] 988 resp = result.get("response", result) 989 payload = resp.get("result", resp) if isinstance(resp, dict) else resp 990 991 # payload may itself be a dict containing output, error, base64_image, etc. 992 if isinstance(payload, dict): 993 return ToolResult( 994 output=payload.get("output", ""), 995 error=payload.get("error", None), 996 base64_image=payload.get("base64_image", None), 997 system=payload.get("system", None), 998 ) 999 # otherwise treat payload as a plain string 1000 return ToolResult(output=str(payload)) 1001 1002 # 4) if it's just a string, wrap it 1003 if isinstance(result, str): 1004 return ToolResult(output=result) 1005 1006 # 5) fallback for anything else 1007 return ToolResult(output=str(result)) 1008 1009 except ToolError as e: 1010 return ToolResult(error=str(e)) 1011 except Exception as e: 1012 return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}") 1013 1014 def _get_declaration(self): 1015 decl = self._orig_tool._get_declaration() 1016 1017 decl.description = self.description 1018 if not (decl and decl.parameters and decl.parameters.properties): 1019 return decl 1020 1021 decl.parameters.required = top_required 1022 1023 for key, ps in prop_schemas.items(): 1024 prop = decl.parameters.properties.get(key) 1025 if not prop: 1026 continue 1027 merge(prop, ps, original_defs.get(key)) 1028 ### LITE LLM 1029 # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings 1030 # from enum import Enum 1031 # for prop in decl.parameters.properties.values(): 1032 # # only convert ADK enum values 1033 # if isinstance(prop.type, Enum): 1034 # prop.type = prop.type.name.lower() 1035 # # handle array items too 1036 # if getattr(prop, "items", None) and isinstance(prop.items.type, Enum): 1037 # prop.items.type = prop.items.type.name.lower() 1038 normalize_schema_decl(decl.parameters) 1039 ### 1040 return decl 1041 1042 return RequiredFieldsFixTool(orig_tool) 1043 1044 1045 1046 @classmethod 1047 def create(cls, adk: bool = False): 1048 """Create a tool instance using class attributes""" 1049 # with open(tool_log_path, 'a') as f: 1050 # f.write("\n\ncreate entered!\n") 1051 1052 # Create an instance of the schema 1053 1054 schema_instance = cls.args_schema() 1055 1056 # # Use your existing Pydantic schema generator 1057 1058 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1059 1060 1061 1062 if adk: 1063 1064 return cls.create_adk_tool(cls.func) # new method 1065 1066 #### LANGCHAIN 1067 else: 1068 1069 schema_instance = cls.args_schema() 1070 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1071 if cls.is_async: 1072 def sync_stub(**kwargs): 1073 raise NotImplementedError("This tool is async only") 1074 1075 base_tool = Tool( 1076 name=cls.name, 1077 description=cls.description, 1078 func=sync_stub, 1079 coroutine=cls.func, 1080 args_schema=pydantic_schema 1081 ) 1082 else: 1083 base_tool = Tool( 1084 name=cls.name, 1085 description=cls.description, 1086 func=cls.func, 1087 args_schema=pydantic_schema 1088 ) 1089 1090 1091 # return instance 1092 return cls( 1093 base_tool=base_tool, 1094 args_schema=cls.args_schema, 1095 is_async=cls.is_async 1096 ) 1097 1098 1099 1100 1101### EXAMPLE ### 1102 1103# ✅ Define Calculator function 1104def calculator_func(a: int, b: int) -> str: 1105 return str(a * b) 1106 1107# ✅ Define ToolArgsSchema for Calculator 1108class CalculatorArgsSchema(ToolArgsSchema): 1109 arguments: Dict[str, Dict[str, Any]] = { 1110 'a': { 1111 'name': 'a', 1112 'type': 'int', 1113 'description': 'First number to multiply', 1114 'required': True 1115 }, 1116 'b': { 1117 'name': 'b', 1118 'type': 'int', 1119 'description': 'Second number to multiply', 1120 'required': True 1121 } 1122 } 1123 1124# Calculator Tool 1125class CalculatorTool(BaseHeavenTool): 1126 name = "calculator" 1127 description = "A tool that multiplies two numbers." 1128 func = calculator_func 1129 args_schema = CalculatorArgsSchema 1130 is_async = False 1131 1132 1133def test_tool_func(message: str) -> str: 1134 return "The test tool returns this str:`ASDON!@DASN(IJ K@!EDWQACKNSXCBUO!LASDKANSLDALSDLK!@`\n\nMake sure to follow the original instructions while responding to this." 1135 1136# Updated Test Tool Args Schema with required flags 1137class TestToolArgsSchema(ToolArgsSchema): 1138 arguments: Dict[str, Dict[str, Any]] = { 1139 'message': { 1140 'name': 'message', 1141 'type': 'str', 1142 'description': 'Any message to send to the test tool', 1143 'required': True 1144 } 1145 } 1146 1147class TestTool(BaseHeavenTool): 1148 name = "TestTool" 1149 description = "A test tool that performs a test the user needs performed." 1150 func = test_tool_func 1151 args_schema = TestToolArgsSchema 1152 is_async = False 1153 1154 1155# Example tool definition 1156# if you need to, add imports 1157# 1158def example_util_func(example_arg_return_dict: bool = False) -> str: 1159 if return_dict: 1160 return {"example_result_dict": "example text output"} 1161 return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```" 1162 1163# from .baseheaventool import BaseHeavenTool, ToolArgsSchema 1164# from heaven_base.tools.tool_utils.example_util import example_util_func ## Doesnt exist outside of documentation purposes 1165# Updated Test Tool Args Schema with required flags 1166class ExampleToolArgsSchema(ToolArgsSchema): 1167 arguments: Dict[str, Dict[str, Any]] = { 1168 'example_arg_return_dict': { 1169 'name': 'example_arg_return_dict', 1170 'type': 'bool', 1171 'description': 'Set to True to receive a dict and False to get a string', 1172 'required': True 1173 } 1174 } 1175 1176class ExampleTool(BaseHeavenTool): 1177 name = "ExampleTool" 1178 description = "An example tool that returns an example result" 1179 func = example_util_func 1180 args_schema = ExampleToolArgsSchema 1181 is_async = False 1182 1183# Then on the agent side ie using the tool: 1184# import the tool 1185# append it to the HeavenAgentConfig.tools 1186# Initialize the agent 1187# Pass that agent to the hermes step in heaven_base.tool_utils.hermes_utils 1188# Run it or create a HermesConfig and run it 1189# Now you understand tools in HEAVEN SDK! 1190 1191# This is one possible way to allow openAi provider binding to work with tools 1192class StrictDict(BaseModel): 1193 class Config: 1194 extra = Extra.forbid 1195 1196# ADK Attempt... doesnt work 1197# class StrictDict(BaseModel): 1198# model_config = {"extra": "forbid"}
26def schema_to_pydantic_model(model_name: str, schema: dict) -> type: 27 """ 28 Convert a cleaned JSON schema back into a Pydantic model. 29 NOTE: Supports only basic 'type', 'properties', 'required', 'description'. 30 """ 31 fields = {} 32 props = schema.get("properties", {}) 33 required = set(schema.get("required", [])) 34 35 type_map = { 36 "string": str, 37 "integer": int, 38 "number": float, 39 "boolean": bool, 40 "array": list, 41 "object": dict, 42 } 43 44 for field_name, field_schema in props.items(): 45 t = type_map.get(field_schema.get("type"), Any) 46 desc = field_schema.get("description", "") 47 is_required = field_name in required 48 field_def = (t, Field(... if is_required else None, description=desc)) 49 fields[field_name] = field_def 50 51 return create_model(model_name, **fields)
Convert a cleaned JSON schema back into a Pydantic model. NOTE: Supports only basic 'type', 'properties', 'required', 'description'.
55@dataclass(frozen=True) 56class UserMessage: 57 content: str 58 user_id: Optional[str] = None 59 timestamp: Optional[str] = None 60 is_pseudo: Optional[bool] = False
62@dataclass(frozen=True) 63class AgentMessage: 64 content: str 65 agent_id: Optional[str] = None 66 tool_call: Optional["ToolUse"] = None # If tool triggered in content 67 timestamp: Optional[str] = None
69@dataclass(frozen=True) 70class ToolUse: 71 tool_name: str 72 arguments: dict 73 tool_call_id: Optional[str] = None 74 agent_id: Optional[str] = None
88@dataclass(kw_only=True, frozen=True) 89class ToolResult: 90 """Represents the result of a tool execution.""" 91 output: Optional[str] = None 92 error: Optional[str] = None 93 base64_image: Optional[str] = None 94 system: Optional[str] = None 95 96 def __bool__(self): 97 return any(getattr(self, field.name) for field in fields(self)) 98 99 def __add__(self, other: "ToolResult"): 100 def combine_fields( 101 field: Optional[str], other_field: Optional[str], concatenate: bool = True 102 ): 103 if field and other_field: 104 if concatenate: 105 return field + other_field 106 raise ValueError("Cannot combine tool results") 107 return field or other_field 108 109 return ToolResult( 110 output=combine_fields(self.output, other.output), 111 error=combine_fields(self.error, other.error), 112 base64_image=combine_fields(self.base64_image, other.base64_image, False), 113 system=combine_fields(self.system, other.system), 114 ) 115 116 def replace(self, **kwargs): 117 return replace(self, **kwargs)
Represents the result of a tool execution.
A ToolResult that can be rendered as a CLI output.
Inherited Members
A ToolResult that represents a failure.
Inherited Members
125class ToolError(Exception): 126 """Raised when a tool encounters an error.""" 127 def __init__(self, message): 128 self.message = f"ERROR!!! {message}" 129 super().__init__(message)
Raised when a tool encounters an error.
132def fix_ref_paths(schema: dict) -> dict: 133 """Fix $ref paths in schema by replacing #/$defs/ with #/defs/""" 134 schema_copy = deepcopy(schema) 135 136 def _fix_refs_recursive(obj): 137 if isinstance(obj, dict): 138 if "$ref" in obj and isinstance(obj["$ref"], str): 139 obj["$ref"] = obj["$ref"].replace("/$defs/", "/defs/") 140 for k, v in list(obj.items()): 141 if isinstance(v, (dict, list)): 142 _fix_refs_recursive(v) 143 elif isinstance(obj, list): 144 for item in obj: 145 if isinstance(item, (dict, list)): 146 _fix_refs_recursive(item) 147 148 _fix_refs_recursive(schema_copy) 149 return schema_copy
Fix $ref paths in schema by replacing #/$defs/ with #/defs/
151def flatten_array_anyof(schema: dict) -> dict: 152 """ 153 If the schema has an 'anyOf' that contains one branch with type "array" 154 and another with type "null", flatten it to a single array schema with 155 'nullable': true. 156 """ 157 if "anyOf" in schema and isinstance(schema["anyOf"], list): 158 array_branch = None 159 null_branch = False 160 for branch in schema["anyOf"]: 161 if branch.get("type") == "array": 162 array_branch = branch 163 elif branch.get("type") == "null": 164 null_branch = True 165 if array_branch and null_branch: 166 new_schema = dict(schema) 167 new_schema.pop("anyOf") 168 new_schema["type"] = "array" 169 new_schema["items"] = array_branch.get("items", {}) 170 if "default" in schema: 171 new_schema["default"] = schema["default"] 172 new_schema["nullable"] = True 173 if "description" in schema: 174 new_schema["description"] = schema["description"] 175 return new_schema 176 return schema
If the schema has an 'anyOf' that contains one branch with type "array" and another with type "null", flatten it to a single array schema with 'nullable': true.
178def recursive_flatten(schema: Union[dict, list]) -> Union[dict, list]: 179 if isinstance(schema, dict): 180 new_schema = flatten_array_anyof(schema) 181 for key, value in new_schema.items(): 182 if isinstance(value, dict) or isinstance(value, list): 183 new_schema[key] = recursive_flatten(value) 184 return new_schema 185 elif isinstance(schema, list): 186 return [recursive_flatten(item) if isinstance(item, dict) else item for item in schema] 187 else: 188 return schema
190def fix_empty_object_properties(schema: Union[dict, list]) -> Union[dict, list]: 191 """ 192 Recursively fixes any object-type schema that has an empty 'properties' 193 dict by removing 'properties' and adding 'additionalProperties': True. 194 """ 195 if isinstance(schema, dict): 196 # Check if this is an object with empty properties. 197 if schema.get("type") == "object": 198 if "properties" in schema and not schema["properties"]: 199 # Remove the empty properties and allow arbitrary keys. 200 del schema["properties"] 201 schema["additionalProperties"] = True 202 # Recurse over dictionary values. 203 new_schema = {} 204 for key, value in schema.items(): 205 new_schema[key] = fix_empty_object_properties(value) if isinstance(value, (dict, list)) else value 206 return new_schema 207 elif isinstance(schema, list): 208 return [fix_empty_object_properties(item) if isinstance(item, (dict, list)) else item for item in schema] 209 return schema
Recursively fixes any object-type schema that has an empty 'properties' dict by removing 'properties' and adding 'additionalProperties': True.
211def generate_dereferenced_schema(schema: Union[dict, Type[BaseModel]]) -> dict: 212 """ 213 Returns a fully dereferenced (flattened) JSON schema. 214 If a Pydantic model is passed, generate its JSON schema; 215 if a dict is passed, assume it's already a JSON schema. 216 Additionally, flatten array schemas that use an "anyOf" and fix empty 217 object properties to support Gemini. 218 """ 219 if isinstance(schema, dict): 220 raw_schema = schema 221 else: 222 raw_schema = schema.model_json_schema(ref_template="#/defs/{model}") 223 # ADDED FOR ADK COMPLIANCE 224 # Fix $ref paths before renaming $defs to defs 225 raw_schema = fix_ref_paths(raw_schema) 226 ######## 227 if "$defs" in raw_schema: 228 raw_schema["defs"] = raw_schema.pop("$defs") 229 inlined = dereference_refs(raw_schema) 230 inlined.pop("defs", None) 231 # flattened = recursive_flatten(inlined) 232 # fixed = fix_empty_object_properties(flattened) 233 fixed = fix_empty_object_properties(inlined) 234 return fixed
Returns a fully dereferenced (flattened) JSON schema. If a Pydantic model is passed, generate its JSON schema; if a dict is passed, assume it's already a JSON schema. Additionally, flatten array schemas that use an "anyOf" and fix empty object properties to support Gemini.
238class ToolArgsSchema(BaseModel): 239 """Meta-validator for tool arguments ensuring LangChain compatibility""" 240 arguments: Dict[str, Dict[str, Any]] = Field( 241 ..., description="Validated tool argument specifications" 242 ) 243 244# V6 245 @classmethod 246 def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type: 247 mapping = {str: "string", int: "integer", float: "number", bool: "boolean"} 248 json_item_type = mapping.get(item_type, "string") 249 250 class CustomList(list): 251 @classmethod 252 def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any: 253 # Use the default core schema for lists. 254 return handler.generate_schema(list) 255 256 @classmethod 257 def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]: 258 # Get the default JSON schema from the core schema. 259 json_schema = handler(core_schema) 260 # Ensure that the "items" property exists. 261 json_schema.setdefault("items", {}) 262 # Add the "type" to the items if it isn't already provided. 263 if "type" not in json_schema["items"]: 264 json_schema["items"]["type"] = json_item_type 265 # Add the custom description. 266 json_schema["items"]["description"] = f"Item of type {item_type_str}" 267 return json_schema 268 269 return CustomList 270 271 272 273 @classmethod 274 def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]: 275 """ 276 Converts argument definitions into a dynamic Pydantic model. 277 For list fields with primitive items, we use a custom list type that populates 278 the "items" schema field properly. 279 """ 280 schema_fields = {} 281 type_mapping = { 282 'int': int, 'str': str, 'float': float, 'bool': bool, 283 'integer': int, 'string': str, 'number': float, 'boolean': bool, 284 'list': list, 'array': list, # We'll override list fields below. 285 'dict': Dict[str, Any], 'object': Dict[str, Any], 286 } 287 288 for arg_name, arg_details in arguments.items(): 289 if not isinstance(arg_details, dict): 290 continue 291 292 arg_type_str = arg_details.get('type', 'string') 293 description = arg_details.get('description', '') 294 is_required = arg_details.get('required', True) 295 default_value = arg_details.get('default', None) 296 297 # Determine the field type. 298 if arg_type_str in ('dict', 'object'): 299 schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details) 300 elif arg_type_str in ('list', 'array'): 301 item_info = arg_details.get('items', {}) 302 item_type_str = item_info.get('type', 'any') 303 if item_type_str == 'any': 304 item_type_str = 'str' 305 if item_type_str in ('dict', 'object'): 306 item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info) 307 schema_field_type = List[item_model] # Use standard list of nested model. 308 else: 309 primitive_type = type_mapping.get(item_type_str, str) 310 # Instead of List[primitive_type], use a custom list type. 311 schema_field_type = cls.custom_list_type(primitive_type, item_type_str) 312 else: 313 schema_field_type = type_mapping.get(arg_type_str, str) 314 315 # Append default info into description. 316 final_description = description 317 if default_value is not None: 318 final_description += f" (Defaults to {repr(default_value)})" 319 arg_details.pop('default', None) 320 321 field_kwargs = {"description": final_description} 322 if not is_required: 323 field_kwargs["default"] = None 324 schema_field_type = Optional[schema_field_type] 325 326 schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs)) 327 328 model_name = f"DynamicArgsSchema_{id(arguments)}" 329 330 # Handle both Pydantic 2.10.6 and 2.11+ syntax 331 if PYDANTIC_2_11_PLUS: 332 # Pydantic 2.11+ - use __config_class__ with ConfigDict 333 return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields) 334 else: 335 # Pydantic 2.10.6 - use __config__ with class 336 return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields) 337 338 @classmethod 339 def _create_nested_model_recursive(cls, model_name: str, arg_definition: Dict[str, Any]) -> Type[BaseModel]: 340 """ 341 Recursively creates a nested Pydantic model from the provided definition. 342 Now treats 'nested' blocks as true sub-models instead of flattening them. 343 """ 344 from pydantic import create_model, Field, BaseModel 345 from typing import Dict, Any, Optional, List 346 347 known_meta = {'name','type','description','required','default','items','additionalProperties','nested'} 348 type_map = { 349 'integer': int, 'int': int, 350 'string': str, 'str': str, 351 'number': float, 'float': float, 352 'boolean': bool,'bool': bool, 353 'list': list,'array': list, 354 'dict': dict,'object': dict, 355 } 356 357 # 1) Handle any direct inline fields (outside of nested) 358 schema_fields: Dict[str, Any] = {} 359 for key, val in dict(arg_definition).items(): 360 if key in known_meta: 361 continue 362 if isinstance(val, dict) and 'type' in val: 363 # primitive or list/object without a nested sub‐block 364 field_type = type_map.get(val['type'], str) 365 if val['type'] in ('list','array'): 366 # list of primitives or dicts 367 items = val.get('items', {}) 368 sub_type_str = items.get('type','string') 369 sub_py = type_map.get(sub_type_str, str) 370 # use your custom list type to keep items schema 371 field_type = cls.custom_list_type(sub_py, sub_type_str) 372 schema_fields[key] = ( 373 Optional[field_type] if not val.get('required',True) else field_type, 374 Field( 375 default=None if not val.get('required',True) else ..., 376 description=val.get('description','') 377 ) 378 ) 379 380 # 2) Now build sub‐models for each group in 'nested' 381 nested = arg_definition.get('nested', {}) or {} 382 for group_name, group_props in nested.items(): 383 sub_model = cls._create_nested_model_recursive(f"{model_name}_{group_name}", group_props) 384 # include it as a required or optional field 385 required = group_props.get('required', True) 386 schema_fields[group_name] = ( 387 Optional[sub_model] if not required else sub_model, 388 Field( 389 default=None if not required else ..., 390 description=group_props.get('description','') 391 ) 392 ) 393 394 # 3) Create & return the Pydantic model for this level 395 # Handle both Pydantic 2.10.6 and 2.11+ syntax 396 if PYDANTIC_2_11_PLUS: 397 # Pydantic 2.11+ - use __config_class__ with ConfigDict 398 return create_model( 399 model_name, 400 __config_class__ = ForbidExtraConfig, 401 **schema_fields 402 ) 403 else: 404 # Pydantic 2.10.6 - use __config__ with class 405 return create_model( 406 model_name, 407 __config__ = ForbidExtraConfig, 408 **schema_fields 409 ) 410 411 @classmethod 412 def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: 413 """ 414 Validates the argument definitions, ensuring they have required metadata 415 and supported types. Returns the validated arguments with defaults moved to descriptions. 416 """ 417 known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'} 418 supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object'] 419 420 for arg_name, arg_details in arguments.items(): 421 if not isinstance(arg_details, dict): 422 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary") 423 424 # Ensure required keys are present 425 if not all(key in arg_details for key in ['name', 'type', 'description']): 426 missing = [k for k in ['name', 'type', 'description'] if k not in arg_details] 427 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}") 428 429 # Add default 'required=True' if missing 430 if 'required' not in arg_details: 431 arg_details['required'] = True 432 433 # Validate that 'required' is a boolean 434 if not isinstance(arg_details['required'], bool): 435 raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean") 436 437 # Validate type is supported 438 if arg_details['type'] not in supported_types: 439 raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}") 440 441 # Move default to description if present 442 if 'default' in arg_details: 443 default_value = arg_details['default'] 444 arg_details['description'] += f" (Defaults to {repr(default_value)})" 445 del arg_details['default'] 446 447 # Process any nested structure the same way 448 if 'nested' in arg_details: 449 # Validate the nested structure 450 nested = arg_details['nested'] 451 if not isinstance(nested, dict): 452 raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary") 453 454 # Validate each nested item's arguments 455 for item_key, item_args in nested.items(): 456 if not isinstance(item_args, dict): 457 raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary") 458 459 # Validate each argument 460 for sub_arg_name, sub_arg_def in item_args.items(): 461 if not isinstance(sub_arg_def, dict): 462 raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 463 464 # Ensure required metadata is present 465 if 'type' not in sub_arg_def: 466 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'") 467 468 if 'description' not in sub_arg_def: 469 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'") 470 471 # Validate type is supported 472 if sub_arg_def['type'] not in supported_types: 473 raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}") 474 475 # Add default 'required=False' if missing for nested properties 476 if 'required' not in sub_arg_def: 477 sub_arg_def['required'] = False 478 479 # Validate that 'required' is a boolean 480 if not isinstance(sub_arg_def['required'], bool): 481 raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean") 482 483 # Move default to description if present 484 if 'default' in sub_arg_def: 485 default_value = sub_arg_def['default'] 486 sub_arg_def['description'] += f" (Defaults to {repr(default_value)})" 487 del sub_arg_def['default'] 488 489 # Recursively process any nested structure within this 490 if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def: 491 cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def}) 492 493 # Handle nested validations for complex types 494 if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def: 495 items = sub_arg_def['items'] 496 if not isinstance(items, dict): 497 raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 498 499 if 'type' in items and items['type'] not in supported_types and items['type'] != 'any': 500 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}") 501 502 # Recursively validate nested structures (regular pattern) 503 if arg_details['type'] in ['dict', 'object']: 504 # Check for nested definitions and validate them 505 for key, value in arg_details.items(): 506 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 507 # This is a nested property - validate it as well 508 cls._validate_nested_property(f"{arg_name}.{key}", value) 509 510 # Validate list item type if it's a list 511 if arg_details['type'] in ['list', 'array']: 512 if 'items' not in arg_details: 513 arg_details['items'] = {'type': 'any'} # Default to Any if not specified 514 elif not isinstance(arg_details['items'], dict): 515 raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary") 516 elif 'type' not in arg_details['items']: 517 arg_details['items']['type'] = 'any' # Default to Any if type not specified 518 519 items = arg_details['items'] 520 item_type = items.get('type') 521 522 if item_type not in supported_types and item_type != 'any': 523 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}") 524 525 if item_type in ['dict', 'object']: 526 # Add description to items if missing 527 if 'description' not in items: 528 items['description'] = f"Item of {arg_name} list" 529 530 # Recursively validate nested item properties if they exist 531 for key, value in items.items(): 532 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 533 cls._validate_nested_property(f"{arg_name}[].{key}", value) 534 535 return arguments 536 537 @classmethod 538 def _validate_nested_property(cls, path: str, prop_details: Dict[str, Any]): 539 """ 540 Validates a nested property definition recursively. 541 542 Args: 543 path: The path to this property (for error messages) 544 prop_details: The property details to validate 545 """ 546 # Check required metadata 547 if 'type' not in prop_details: 548 raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'type'") 549 550 # Check for description 551 if 'description' not in prop_details: 552 raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'description'") 553 554 # Validate type is supported 555 supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object'] 556 if prop_details['type'] not in supported_types: 557 raise ValueError(f"ToolArgsSchema Error: Unsupported type for {path}: {prop_details['type']}") 558 559 # Add default 'required=False' if missing for nested properties 560 if 'required' not in prop_details: 561 prop_details['required'] = False 562 563 # Validate that 'required' is a boolean 564 if not isinstance(prop_details['required'], bool): 565 raise ValueError(f"ToolArgsSchema Error: The 'required' field for {path} must be a boolean") 566 567 # Move default to description if present 568 if 'default' in prop_details: 569 default_value = prop_details['default'] 570 prop_details['description'] += f" (Defaults to {repr(default_value)})" 571 del prop_details['default'] 572 573 # Known metadata keys to skip when looking for nested properties 574 known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'} 575 576 # Handle nested structure within this property 577 if 'nested' in prop_details and isinstance(prop_details['nested'], dict): 578 nested = prop_details['nested'] 579 for item_key, item_args in nested.items(): 580 if isinstance(item_args, dict): 581 for sub_arg_name, sub_arg_def in item_args.items(): 582 if isinstance(sub_arg_def, dict) and 'type' in sub_arg_def: 583 # Add directly to the parent object without prefixing 584 prop_details[sub_arg_name] = sub_arg_def 585 # Once processed, remove the nested key to avoid double-processing 586 del prop_details['nested'] 587 588 # If this is a dict/object, recursively validate its properties 589 if prop_details['type'] in ['dict', 'object']: 590 # Check for nested definitions and validate them 591 for key, value in prop_details.items(): 592 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 593 cls._validate_nested_property(f"{path}.{key}", value) 594 595 # If this is a list/array, validate its items 596 if prop_details['type'] in ['list', 'array']: 597 if 'items' not in prop_details: 598 prop_details['items'] = {'type': 'any'} 599 elif not isinstance(prop_details['items'], dict): 600 raise ValueError(f"ToolArgsSchema Error: 'items' for {path} must be a dictionary") 601 elif 'type' not in prop_details['items']: 602 prop_details['items']['type'] = 'any' 603 604 items = prop_details['items'] 605 item_type = items.get('type') 606 607 if item_type not in supported_types and item_type != 'any': 608 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {path}: {item_type}") 609 610 if item_type in ['dict', 'object']: 611 # Add description to items if missing 612 if 'description' not in items: 613 items['description'] = f"Item of {path}" 614 615 # Recursively validate nested item properties if they exist 616 for key, value in items.items(): 617 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 618 cls._validate_nested_property(f"{path}[].{key}", value)
Meta-validator for tool arguments ensuring LangChain compatibility
245 @classmethod 246 def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type: 247 mapping = {str: "string", int: "integer", float: "number", bool: "boolean"} 248 json_item_type = mapping.get(item_type, "string") 249 250 class CustomList(list): 251 @classmethod 252 def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any: 253 # Use the default core schema for lists. 254 return handler.generate_schema(list) 255 256 @classmethod 257 def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]: 258 # Get the default JSON schema from the core schema. 259 json_schema = handler(core_schema) 260 # Ensure that the "items" property exists. 261 json_schema.setdefault("items", {}) 262 # Add the "type" to the items if it isn't already provided. 263 if "type" not in json_schema["items"]: 264 json_schema["items"]["type"] = json_item_type 265 # Add the custom description. 266 json_schema["items"]["description"] = f"Item of type {item_type_str}" 267 return json_schema 268 269 return CustomList
273 @classmethod 274 def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]: 275 """ 276 Converts argument definitions into a dynamic Pydantic model. 277 For list fields with primitive items, we use a custom list type that populates 278 the "items" schema field properly. 279 """ 280 schema_fields = {} 281 type_mapping = { 282 'int': int, 'str': str, 'float': float, 'bool': bool, 283 'integer': int, 'string': str, 'number': float, 'boolean': bool, 284 'list': list, 'array': list, # We'll override list fields below. 285 'dict': Dict[str, Any], 'object': Dict[str, Any], 286 } 287 288 for arg_name, arg_details in arguments.items(): 289 if not isinstance(arg_details, dict): 290 continue 291 292 arg_type_str = arg_details.get('type', 'string') 293 description = arg_details.get('description', '') 294 is_required = arg_details.get('required', True) 295 default_value = arg_details.get('default', None) 296 297 # Determine the field type. 298 if arg_type_str in ('dict', 'object'): 299 schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details) 300 elif arg_type_str in ('list', 'array'): 301 item_info = arg_details.get('items', {}) 302 item_type_str = item_info.get('type', 'any') 303 if item_type_str == 'any': 304 item_type_str = 'str' 305 if item_type_str in ('dict', 'object'): 306 item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info) 307 schema_field_type = List[item_model] # Use standard list of nested model. 308 else: 309 primitive_type = type_mapping.get(item_type_str, str) 310 # Instead of List[primitive_type], use a custom list type. 311 schema_field_type = cls.custom_list_type(primitive_type, item_type_str) 312 else: 313 schema_field_type = type_mapping.get(arg_type_str, str) 314 315 # Append default info into description. 316 final_description = description 317 if default_value is not None: 318 final_description += f" (Defaults to {repr(default_value)})" 319 arg_details.pop('default', None) 320 321 field_kwargs = {"description": final_description} 322 if not is_required: 323 field_kwargs["default"] = None 324 schema_field_type = Optional[schema_field_type] 325 326 schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs)) 327 328 model_name = f"DynamicArgsSchema_{id(arguments)}" 329 330 # Handle both Pydantic 2.10.6 and 2.11+ syntax 331 if PYDANTIC_2_11_PLUS: 332 # Pydantic 2.11+ - use __config_class__ with ConfigDict 333 return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields) 334 else: 335 # Pydantic 2.10.6 - use __config__ with class 336 return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields)
Converts argument definitions into a dynamic Pydantic model. For list fields with primitive items, we use a custom list type that populates the "items" schema field properly.
411 @classmethod 412 def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: 413 """ 414 Validates the argument definitions, ensuring they have required metadata 415 and supported types. Returns the validated arguments with defaults moved to descriptions. 416 """ 417 known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'} 418 supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object'] 419 420 for arg_name, arg_details in arguments.items(): 421 if not isinstance(arg_details, dict): 422 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary") 423 424 # Ensure required keys are present 425 if not all(key in arg_details for key in ['name', 'type', 'description']): 426 missing = [k for k in ['name', 'type', 'description'] if k not in arg_details] 427 raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}") 428 429 # Add default 'required=True' if missing 430 if 'required' not in arg_details: 431 arg_details['required'] = True 432 433 # Validate that 'required' is a boolean 434 if not isinstance(arg_details['required'], bool): 435 raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean") 436 437 # Validate type is supported 438 if arg_details['type'] not in supported_types: 439 raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}") 440 441 # Move default to description if present 442 if 'default' in arg_details: 443 default_value = arg_details['default'] 444 arg_details['description'] += f" (Defaults to {repr(default_value)})" 445 del arg_details['default'] 446 447 # Process any nested structure the same way 448 if 'nested' in arg_details: 449 # Validate the nested structure 450 nested = arg_details['nested'] 451 if not isinstance(nested, dict): 452 raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary") 453 454 # Validate each nested item's arguments 455 for item_key, item_args in nested.items(): 456 if not isinstance(item_args, dict): 457 raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary") 458 459 # Validate each argument 460 for sub_arg_name, sub_arg_def in item_args.items(): 461 if not isinstance(sub_arg_def, dict): 462 raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 463 464 # Ensure required metadata is present 465 if 'type' not in sub_arg_def: 466 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'") 467 468 if 'description' not in sub_arg_def: 469 raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'") 470 471 # Validate type is supported 472 if sub_arg_def['type'] not in supported_types: 473 raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}") 474 475 # Add default 'required=False' if missing for nested properties 476 if 'required' not in sub_arg_def: 477 sub_arg_def['required'] = False 478 479 # Validate that 'required' is a boolean 480 if not isinstance(sub_arg_def['required'], bool): 481 raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean") 482 483 # Move default to description if present 484 if 'default' in sub_arg_def: 485 default_value = sub_arg_def['default'] 486 sub_arg_def['description'] += f" (Defaults to {repr(default_value)})" 487 del sub_arg_def['default'] 488 489 # Recursively process any nested structure within this 490 if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def: 491 cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def}) 492 493 # Handle nested validations for complex types 494 if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def: 495 items = sub_arg_def['items'] 496 if not isinstance(items, dict): 497 raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary") 498 499 if 'type' in items and items['type'] not in supported_types and items['type'] != 'any': 500 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}") 501 502 # Recursively validate nested structures (regular pattern) 503 if arg_details['type'] in ['dict', 'object']: 504 # Check for nested definitions and validate them 505 for key, value in arg_details.items(): 506 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 507 # This is a nested property - validate it as well 508 cls._validate_nested_property(f"{arg_name}.{key}", value) 509 510 # Validate list item type if it's a list 511 if arg_details['type'] in ['list', 'array']: 512 if 'items' not in arg_details: 513 arg_details['items'] = {'type': 'any'} # Default to Any if not specified 514 elif not isinstance(arg_details['items'], dict): 515 raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary") 516 elif 'type' not in arg_details['items']: 517 arg_details['items']['type'] = 'any' # Default to Any if type not specified 518 519 items = arg_details['items'] 520 item_type = items.get('type') 521 522 if item_type not in supported_types and item_type != 'any': 523 raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}") 524 525 if item_type in ['dict', 'object']: 526 # Add description to items if missing 527 if 'description' not in items: 528 items['description'] = f"Item of {arg_name} list" 529 530 # Recursively validate nested item properties if they exist 531 for key, value in items.items(): 532 if key not in known_meta_keys and isinstance(value, dict) and 'type' in value: 533 cls._validate_nested_property(f"{arg_name}[].{key}", value) 534 535 return arguments
Validates the argument definitions, ensuring they have required metadata and supported types. Returns the validated arguments with defaults moved to descriptions.
622class BaseHeavenTool(ABC): 623 """Provider-agnostic tool base class with standardized results""" 624 # Required class attributes that tools must define 625 name: str 626 description: str 627 func: Callable 628 args_schema: Type[ToolArgsSchema] 629 base_tool: BaseTool 630 is_async: bool 631 632 def __init__( 633 self, 634 base_tool: BaseTool, 635 args_schema: Type[ToolArgsSchema], 636 is_async: bool = False 637 ): 638 # Set properties (no more super() call since we're not inheriting from BaseTool) 639 self.args_schema = args_schema 640 self.base_tool = base_tool 641 self.is_async = is_async 642 643 # Create instance and validate arguments schema 644 schema_instance = args_schema() 645 ToolArgsSchema.validate_arguments(schema_instance.arguments) 646 647 def _clean_kwargs(self, obj: Any) -> Any: 648 """ 649 Recursively return a *copy* of `obj` in which every mapping key that looks 650 like "'some_key'" is replaced by "some_key". 651 652 • Works for any Mapping subclass (dict, OrderedDict, defaultdict, …). 653 • Keeps list / tuple / set semantics identical. 654 • Primitives and values are left untouched. 655 """ 656 # ---- Handle any mapping (dict-like) --------------------------- 657 if isinstance(obj, Mapping): 658 cleaned_items = {} 659 for k, v in obj.items(): 660 new_k = k 661 if ( 662 isinstance(k, str) 663 and k.startswith("'") 664 and k.endswith("'") 665 ): 666 stripped = k[1:-1] 667 if stripped != k: 668 # logging.warning( 669 # "Sanitizing malformed tool-argument key: %r → %r", 670 # k, stripped 671 # ) 672 new_k = stripped 673 674 cleaned_items[new_k] = self._clean_kwargs(v) # recurse on value 675 676 # Rebuild using the same mapping type when possible 677 try: 678 return type(obj)(cleaned_items) 679 except Exception: 680 return cleaned_items 681 682 # ---- Handle iterables that can hold nested mappings ----------- 683 if isinstance(obj, (list, tuple, set)): 684 cleaned_iter = [self._clean_kwargs(item) for item in obj] 685 return type(obj)(cleaned_iter) 686 687 # ---- Walk other iterables (e.g., generators) ------------------ 688 if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)): 689 return type(obj)(self._clean_kwargs(item) for item in obj) 690 691 # ---- Base case ------------------------------------------------ 692 return obj 693 694 695 def _run( 696 self, 697 run_manager: Optional[CallbackManagerForToolRun] = None, 698 config: Optional[RunnableConfig] = None, 699 **kwargs 700 ) -> ToolResult: 701 """Synchronous execution path using wrapped base tool""" 702 cleaned_kwargs = self._clean_kwargs(kwargs) 703 if cleaned_kwargs: 704 kwargs = cleaned_kwargs 705 try: 706 if self.is_async: 707 raise ValueError("Tool marked as async but _run was called") 708 709 if hasattr(self.base_tool, '_run'): 710 result = self.base_tool._run(run_manager=run_manager, config=config, **kwargs) 711 else: 712 # Fallback to func if _run is not available 713 result = self.base_tool.func(**kwargs) 714 715 716 if isinstance(result, ToolResult): 717 return result # Already a ToolResult (includes CLIResult) 718 return ToolResult(output=str(result)) 719 except ToolError as e: 720 return ToolResult(error=str(e)) 721 except Exception as e: 722 return ToolResult(error=f"Error in tool '{self.name}': {e}") 723 724 async def _arun( 725 self, 726 run_manager: Optional[AsyncCallbackManagerForToolRun] = None, 727 config: Optional[RunnableConfig] = None, 728 **kwargs 729 ) -> ToolResult: 730 """Asynchronous execution path using wrapped base tool""" 731 cleaned_kwargs = self._clean_kwargs(kwargs) 732 if cleaned_kwargs: 733 kwargs = cleaned_kwargs 734 try: 735 736 # Improved async handling 737 if not self.is_async: 738 # Use asyncio.to_thread for sync functions 739 import asyncio 740 741 result = await asyncio.to_thread(self.base_tool.func, **kwargs) 742 else: 743 # Check for native async methods 744 if hasattr(self.base_tool, '_arun'): 745 result = await self.base_tool._arun(run_manager=run_manager, config=config, **kwargs) 746 elif hasattr(self.base_tool, 'arun'): 747 result = await self.base_tool.arun(**kwargs) 748 else: 749 750 # Check if the function itself is async 751 import inspect 752 if inspect.iscoroutinefunction(self.base_tool.func): 753 result = await self.base_tool.func(**kwargs) 754 else: 755 # Fallback to thread-based execution only for sync functions 756 import asyncio 757 result = await asyncio.to_thread(self.base_tool.func, **kwargs) 758 759 # Handle result types 760 if isinstance(result, ToolResult): 761 return result # Already a ToolResult (includes CLIResult) 762 return ToolResult(output=str(result)) 763 except ToolError as e: 764 return ToolResult(error=str(e)) 765 except Exception as e: 766 return ToolResult(error=f"Error in tool '{self.name}': {e}") 767 768 def get_spec(self) -> dict: 769 """Get this tool's specification""" 770 return { 771 "name": self.name, 772 "description": self.description, 773 "args": self.args_schema().arguments # That's it. The dictionary is already there. 774 } 775 776 @classmethod 777 def to_openai_function(cls): 778 """Convert the tool's schema to OpenAI function format""" 779 # Create the Pydantic schema 780 schema_instance = cls.args_schema() 781 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 782 # Use LangChain's built-in converter 783 openai_function = convert_to_openai_tool( 784 Tool( 785 name=cls.name, 786 description=cls.description, 787 func=cls.func, 788 args_schema=pydantic_schema 789 ) 790 ) 791 792 return openai_function 793 794 def get_openai_function(self): 795 """Get the OpenAI function specification""" 796 return self.__class__.to_openai_function() 797 798 @classmethod 799 def create_adk_tool(cls, func): 800 from google.adk.tools import FunctionTool, BaseTool 801 802 # Step 1: Get original schema and dynamic Pydantic model 803 schema_instance = cls.args_schema() 804 DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments) 805 flat_schema = generate_dereferenced_schema(DynamicModel) 806 prop_schemas = flat_schema.get("properties", {}) 807 top_required = flat_schema.get("required", []) 808 original_defs = schema_instance.arguments 809 810 # Step 2: Build wrapped function with proper signature (from flattened schema) 811 import inspect 812 from typing import Optional, get_args, get_origin 813 814 param_defs = [] 815 for name, definition in prop_schemas.items(): 816 typ = definition.get("type", "string") 817 is_required = name in top_required 818 default = None if not is_required else inspect._empty 819 820 # Very basic type mapping 821 type_map = { 822 "string": str, 823 "integer": int, 824 "number": float, 825 "boolean": bool, 826 "array": list, 827 "object": dict 828 } 829 resolved_type = type_map.get(typ, str) 830 if not is_required: 831 resolved_type = Optional[resolved_type] 832 833 param_defs.append((name, resolved_type, default)) 834 835 # Create the function dynamically 836 def create_wrapped_function(original_func, func_name, param_defs): 837 params = [] 838 for name, typ, default in param_defs: 839 param = inspect.Parameter( 840 name, 841 inspect.Parameter.POSITIONAL_OR_KEYWORD, 842 annotation=typ, 843 default=default 844 ) 845 params.append(param) 846 847 sig = inspect.Signature(params) 848 849 async def wrapper(**kwargs): 850 import inspect, asyncio 851 852 if inspect.iscoroutinefunction(original_func): 853 return await original_func(**kwargs) 854 else: 855 # run sync code in thread to avoid blocking 856 return await asyncio.to_thread(original_func, **kwargs) 857 858 859 wrapper.__name__ = func_name 860 wrapper.__qualname__ = func_name 861 wrapper.__signature__ = sig 862 return wrapper 863 864 wrapped_func = create_wrapped_function(func, cls.name, param_defs) 865 866 # Step 3: Create ADK FunctionTool 867 orig_tool = FunctionTool(func=wrapped_func) 868 orig_tool.description = cls.description 869 # Step 4: Schema fixer merge logic 870 def merge(adk_prop, pschema, info_def=None): 871 enum_cls = type(adk_prop.type) 872 if "anyOf" in pschema: 873 branches = pschema.pop("anyOf") 874 if any(b.get("type") == "null" for b in branches): 875 adk_prop.nullable = True 876 arr = next((b for b in branches if b.get("type") == "array"), None) 877 chosen = arr or next((b for b in branches if b.get("type") != "null"), {}) 878 pschema = chosen 879 880 if isinstance(info_def, dict): 881 if info_def.get("name"): 882 adk_prop.title = info_def["name"] 883 if not getattr(adk_prop, "description", None) and info_def.get("description"): 884 adk_prop.description = info_def["description"] 885 886 if "description" in pschema: 887 adk_prop.description = pschema["description"] 888 889 if "items" in pschema or pschema.get("type") == "array": 890 adk_prop.type = enum_cls.ARRAY 891 elif "type" in pschema: 892 t = pschema["type"].upper() 893 if hasattr(enum_cls, t): 894 adk_prop.type = getattr(enum_cls, t) 895 896 if adk_prop.type == enum_cls.OBJECT: 897 adk_prop.properties = adk_prop.properties or {} 898 # nested_defs = {} 899 # if isinstance(info_def, dict): 900 # if isinstance(info_def.get("nested"), dict): 901 # nested_defs = info_def["nested"] 902 # else: 903 # nested_defs = { 904 # k: v for k, v in info_def.items() 905 # if isinstance(v, dict) and ("type" in v or "nested" in v) 906 # } 907 # always allow any dict under info_def[key] as child_info 908 for key, child_ps in pschema.get("properties", {}).items(): 909 child_info = {} 910 if isinstance(info_def, dict) and isinstance(info_def.get(key), dict): 911 child_info = info_def[key] 912 # for key, child_ps in pschema.get("properties", {}).items(): 913 # child_info = nested_defs.get(key, {}) 914 if key not in adk_prop.properties: 915 Placeholder = type(adk_prop) 916 adk_prop.properties[key] = Placeholder.model_construct( 917 __pydantic_initialised__=True, 918 name=key, 919 title=key, 920 description="", 921 properties=None, 922 items=None, 923 required=[], 924 type=adk_prop.type, 925 ) 926 merge(adk_prop.properties[key], child_ps, child_info) 927 928 # req = pschema.get("required") 929 # if req is None: 930 # req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")] 931 # adk_prop.required = req or [] 932 # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def 933 req = pschema.get("required") 934 if req is None: 935 req = [] 936 if isinstance(info_def, dict): 937 req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")] 938 adk_prop.required = req or [] 939 940 elif adk_prop.type == enum_cls.ARRAY: 941 if adk_prop.items is None: 942 Placeholder = type(adk_prop) 943 adk_prop.items = Placeholder.model_construct( 944 __pydantic_initialised__=True, 945 name=adk_prop.title or "", 946 title=adk_prop.title or "", 947 description=pschema.get("items", {}).get("description", ""), 948 properties=None, 949 items=None, 950 required=[], 951 type=adk_prop.type, 952 ) 953 merge(adk_prop.items, pschema.get("items", {}), None) 954#### LITELLM ### 955 def normalize_schema_decl(schema): 956 from enum import Enum 957 # `schema` is a google.genai.types.Schema or similar 958 # 1️⃣ normalize this node’s type 959 if hasattr(schema, "type") and isinstance(schema.type, Enum): 960 schema.type = schema.type.name.lower() 961 # 2️⃣ normalize any nested properties 962 if getattr(schema, "properties", None): 963 for prop in schema.properties.values(): 964 normalize_schema_decl(prop) 965 # 3️⃣ normalize array items 966 if getattr(schema, "items", None): 967 normalize_schema_decl(schema.items) 968 return schema 969#### LITELLM #### 970 # Step 5: Wrap in ADK BaseTool with fixed declaration 971 class RequiredFieldsFixTool(BaseTool): 972 def __init__(self, orig_tool): 973 self._orig_tool = orig_tool 974 super().__init__(name=orig_tool.name, description=orig_tool.description) 975 976 async def run_async(self, **kwargs): 977 try: 978 # 1) invoke the underlying tool (may be sync or async) 979 raw = self._orig_tool.run_async(**kwargs) 980 result = await raw if inspect.isawaitable(raw) else raw 981 982 # 2) if it's already a ToolResult or CLIResult, just return it 983 if isinstance(result, ToolResult) or isinstance(result, CLIResult): 984 return result 985 986 # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields 987 if isinstance(result, dict): 988 # ADK wraps tool output under result["response"]["result"] 989 resp = result.get("response", result) 990 payload = resp.get("result", resp) if isinstance(resp, dict) else resp 991 992 # payload may itself be a dict containing output, error, base64_image, etc. 993 if isinstance(payload, dict): 994 return ToolResult( 995 output=payload.get("output", ""), 996 error=payload.get("error", None), 997 base64_image=payload.get("base64_image", None), 998 system=payload.get("system", None), 999 ) 1000 # otherwise treat payload as a plain string 1001 return ToolResult(output=str(payload)) 1002 1003 # 4) if it's just a string, wrap it 1004 if isinstance(result, str): 1005 return ToolResult(output=result) 1006 1007 # 5) fallback for anything else 1008 return ToolResult(output=str(result)) 1009 1010 except ToolError as e: 1011 return ToolResult(error=str(e)) 1012 except Exception as e: 1013 return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}") 1014 1015 def _get_declaration(self): 1016 decl = self._orig_tool._get_declaration() 1017 1018 decl.description = self.description 1019 if not (decl and decl.parameters and decl.parameters.properties): 1020 return decl 1021 1022 decl.parameters.required = top_required 1023 1024 for key, ps in prop_schemas.items(): 1025 prop = decl.parameters.properties.get(key) 1026 if not prop: 1027 continue 1028 merge(prop, ps, original_defs.get(key)) 1029 ### LITE LLM 1030 # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings 1031 # from enum import Enum 1032 # for prop in decl.parameters.properties.values(): 1033 # # only convert ADK enum values 1034 # if isinstance(prop.type, Enum): 1035 # prop.type = prop.type.name.lower() 1036 # # handle array items too 1037 # if getattr(prop, "items", None) and isinstance(prop.items.type, Enum): 1038 # prop.items.type = prop.items.type.name.lower() 1039 normalize_schema_decl(decl.parameters) 1040 ### 1041 return decl 1042 1043 return RequiredFieldsFixTool(orig_tool) 1044 1045 1046 1047 @classmethod 1048 def create(cls, adk: bool = False): 1049 """Create a tool instance using class attributes""" 1050 # with open(tool_log_path, 'a') as f: 1051 # f.write("\n\ncreate entered!\n") 1052 1053 # Create an instance of the schema 1054 1055 schema_instance = cls.args_schema() 1056 1057 # # Use your existing Pydantic schema generator 1058 1059 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1060 1061 1062 1063 if adk: 1064 1065 return cls.create_adk_tool(cls.func) # new method 1066 1067 #### LANGCHAIN 1068 else: 1069 1070 schema_instance = cls.args_schema() 1071 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1072 if cls.is_async: 1073 def sync_stub(**kwargs): 1074 raise NotImplementedError("This tool is async only") 1075 1076 base_tool = Tool( 1077 name=cls.name, 1078 description=cls.description, 1079 func=sync_stub, 1080 coroutine=cls.func, 1081 args_schema=pydantic_schema 1082 ) 1083 else: 1084 base_tool = Tool( 1085 name=cls.name, 1086 description=cls.description, 1087 func=cls.func, 1088 args_schema=pydantic_schema 1089 ) 1090 1091 1092 # return instance 1093 return cls( 1094 base_tool=base_tool, 1095 args_schema=cls.args_schema, 1096 is_async=cls.is_async 1097 )
Provider-agnostic tool base class with standardized results
632 def __init__( 633 self, 634 base_tool: BaseTool, 635 args_schema: Type[ToolArgsSchema], 636 is_async: bool = False 637 ): 638 # Set properties (no more super() call since we're not inheriting from BaseTool) 639 self.args_schema = args_schema 640 self.base_tool = base_tool 641 self.is_async = is_async 642 643 # Create instance and validate arguments schema 644 schema_instance = args_schema() 645 ToolArgsSchema.validate_arguments(schema_instance.arguments)
768 def get_spec(self) -> dict: 769 """Get this tool's specification""" 770 return { 771 "name": self.name, 772 "description": self.description, 773 "args": self.args_schema().arguments # That's it. The dictionary is already there. 774 }
Get this tool's specification
776 @classmethod 777 def to_openai_function(cls): 778 """Convert the tool's schema to OpenAI function format""" 779 # Create the Pydantic schema 780 schema_instance = cls.args_schema() 781 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 782 # Use LangChain's built-in converter 783 openai_function = convert_to_openai_tool( 784 Tool( 785 name=cls.name, 786 description=cls.description, 787 func=cls.func, 788 args_schema=pydantic_schema 789 ) 790 ) 791 792 return openai_function
Convert the tool's schema to OpenAI function format
794 def get_openai_function(self): 795 """Get the OpenAI function specification""" 796 return self.__class__.to_openai_function()
Get the OpenAI function specification
798 @classmethod 799 def create_adk_tool(cls, func): 800 from google.adk.tools import FunctionTool, BaseTool 801 802 # Step 1: Get original schema and dynamic Pydantic model 803 schema_instance = cls.args_schema() 804 DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments) 805 flat_schema = generate_dereferenced_schema(DynamicModel) 806 prop_schemas = flat_schema.get("properties", {}) 807 top_required = flat_schema.get("required", []) 808 original_defs = schema_instance.arguments 809 810 # Step 2: Build wrapped function with proper signature (from flattened schema) 811 import inspect 812 from typing import Optional, get_args, get_origin 813 814 param_defs = [] 815 for name, definition in prop_schemas.items(): 816 typ = definition.get("type", "string") 817 is_required = name in top_required 818 default = None if not is_required else inspect._empty 819 820 # Very basic type mapping 821 type_map = { 822 "string": str, 823 "integer": int, 824 "number": float, 825 "boolean": bool, 826 "array": list, 827 "object": dict 828 } 829 resolved_type = type_map.get(typ, str) 830 if not is_required: 831 resolved_type = Optional[resolved_type] 832 833 param_defs.append((name, resolved_type, default)) 834 835 # Create the function dynamically 836 def create_wrapped_function(original_func, func_name, param_defs): 837 params = [] 838 for name, typ, default in param_defs: 839 param = inspect.Parameter( 840 name, 841 inspect.Parameter.POSITIONAL_OR_KEYWORD, 842 annotation=typ, 843 default=default 844 ) 845 params.append(param) 846 847 sig = inspect.Signature(params) 848 849 async def wrapper(**kwargs): 850 import inspect, asyncio 851 852 if inspect.iscoroutinefunction(original_func): 853 return await original_func(**kwargs) 854 else: 855 # run sync code in thread to avoid blocking 856 return await asyncio.to_thread(original_func, **kwargs) 857 858 859 wrapper.__name__ = func_name 860 wrapper.__qualname__ = func_name 861 wrapper.__signature__ = sig 862 return wrapper 863 864 wrapped_func = create_wrapped_function(func, cls.name, param_defs) 865 866 # Step 3: Create ADK FunctionTool 867 orig_tool = FunctionTool(func=wrapped_func) 868 orig_tool.description = cls.description 869 # Step 4: Schema fixer merge logic 870 def merge(adk_prop, pschema, info_def=None): 871 enum_cls = type(adk_prop.type) 872 if "anyOf" in pschema: 873 branches = pschema.pop("anyOf") 874 if any(b.get("type") == "null" for b in branches): 875 adk_prop.nullable = True 876 arr = next((b for b in branches if b.get("type") == "array"), None) 877 chosen = arr or next((b for b in branches if b.get("type") != "null"), {}) 878 pschema = chosen 879 880 if isinstance(info_def, dict): 881 if info_def.get("name"): 882 adk_prop.title = info_def["name"] 883 if not getattr(adk_prop, "description", None) and info_def.get("description"): 884 adk_prop.description = info_def["description"] 885 886 if "description" in pschema: 887 adk_prop.description = pschema["description"] 888 889 if "items" in pschema or pschema.get("type") == "array": 890 adk_prop.type = enum_cls.ARRAY 891 elif "type" in pschema: 892 t = pschema["type"].upper() 893 if hasattr(enum_cls, t): 894 adk_prop.type = getattr(enum_cls, t) 895 896 if adk_prop.type == enum_cls.OBJECT: 897 adk_prop.properties = adk_prop.properties or {} 898 # nested_defs = {} 899 # if isinstance(info_def, dict): 900 # if isinstance(info_def.get("nested"), dict): 901 # nested_defs = info_def["nested"] 902 # else: 903 # nested_defs = { 904 # k: v for k, v in info_def.items() 905 # if isinstance(v, dict) and ("type" in v or "nested" in v) 906 # } 907 # always allow any dict under info_def[key] as child_info 908 for key, child_ps in pschema.get("properties", {}).items(): 909 child_info = {} 910 if isinstance(info_def, dict) and isinstance(info_def.get(key), dict): 911 child_info = info_def[key] 912 # for key, child_ps in pschema.get("properties", {}).items(): 913 # child_info = nested_defs.get(key, {}) 914 if key not in adk_prop.properties: 915 Placeholder = type(adk_prop) 916 adk_prop.properties[key] = Placeholder.model_construct( 917 __pydantic_initialised__=True, 918 name=key, 919 title=key, 920 description="", 921 properties=None, 922 items=None, 923 required=[], 924 type=adk_prop.type, 925 ) 926 merge(adk_prop.properties[key], child_ps, child_info) 927 928 # req = pschema.get("required") 929 # if req is None: 930 # req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")] 931 # adk_prop.required = req or [] 932 # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def 933 req = pschema.get("required") 934 if req is None: 935 req = [] 936 if isinstance(info_def, dict): 937 req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")] 938 adk_prop.required = req or [] 939 940 elif adk_prop.type == enum_cls.ARRAY: 941 if adk_prop.items is None: 942 Placeholder = type(adk_prop) 943 adk_prop.items = Placeholder.model_construct( 944 __pydantic_initialised__=True, 945 name=adk_prop.title or "", 946 title=adk_prop.title or "", 947 description=pschema.get("items", {}).get("description", ""), 948 properties=None, 949 items=None, 950 required=[], 951 type=adk_prop.type, 952 ) 953 merge(adk_prop.items, pschema.get("items", {}), None) 954#### LITELLM ### 955 def normalize_schema_decl(schema): 956 from enum import Enum 957 # `schema` is a google.genai.types.Schema or similar 958 # 1️⃣ normalize this node’s type 959 if hasattr(schema, "type") and isinstance(schema.type, Enum): 960 schema.type = schema.type.name.lower() 961 # 2️⃣ normalize any nested properties 962 if getattr(schema, "properties", None): 963 for prop in schema.properties.values(): 964 normalize_schema_decl(prop) 965 # 3️⃣ normalize array items 966 if getattr(schema, "items", None): 967 normalize_schema_decl(schema.items) 968 return schema 969#### LITELLM #### 970 # Step 5: Wrap in ADK BaseTool with fixed declaration 971 class RequiredFieldsFixTool(BaseTool): 972 def __init__(self, orig_tool): 973 self._orig_tool = orig_tool 974 super().__init__(name=orig_tool.name, description=orig_tool.description) 975 976 async def run_async(self, **kwargs): 977 try: 978 # 1) invoke the underlying tool (may be sync or async) 979 raw = self._orig_tool.run_async(**kwargs) 980 result = await raw if inspect.isawaitable(raw) else raw 981 982 # 2) if it's already a ToolResult or CLIResult, just return it 983 if isinstance(result, ToolResult) or isinstance(result, CLIResult): 984 return result 985 986 # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields 987 if isinstance(result, dict): 988 # ADK wraps tool output under result["response"]["result"] 989 resp = result.get("response", result) 990 payload = resp.get("result", resp) if isinstance(resp, dict) else resp 991 992 # payload may itself be a dict containing output, error, base64_image, etc. 993 if isinstance(payload, dict): 994 return ToolResult( 995 output=payload.get("output", ""), 996 error=payload.get("error", None), 997 base64_image=payload.get("base64_image", None), 998 system=payload.get("system", None), 999 ) 1000 # otherwise treat payload as a plain string 1001 return ToolResult(output=str(payload)) 1002 1003 # 4) if it's just a string, wrap it 1004 if isinstance(result, str): 1005 return ToolResult(output=result) 1006 1007 # 5) fallback for anything else 1008 return ToolResult(output=str(result)) 1009 1010 except ToolError as e: 1011 return ToolResult(error=str(e)) 1012 except Exception as e: 1013 return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}") 1014 1015 def _get_declaration(self): 1016 decl = self._orig_tool._get_declaration() 1017 1018 decl.description = self.description 1019 if not (decl and decl.parameters and decl.parameters.properties): 1020 return decl 1021 1022 decl.parameters.required = top_required 1023 1024 for key, ps in prop_schemas.items(): 1025 prop = decl.parameters.properties.get(key) 1026 if not prop: 1027 continue 1028 merge(prop, ps, original_defs.get(key)) 1029 ### LITE LLM 1030 # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings 1031 # from enum import Enum 1032 # for prop in decl.parameters.properties.values(): 1033 # # only convert ADK enum values 1034 # if isinstance(prop.type, Enum): 1035 # prop.type = prop.type.name.lower() 1036 # # handle array items too 1037 # if getattr(prop, "items", None) and isinstance(prop.items.type, Enum): 1038 # prop.items.type = prop.items.type.name.lower() 1039 normalize_schema_decl(decl.parameters) 1040 ### 1041 return decl 1042 1043 return RequiredFieldsFixTool(orig_tool)
1047 @classmethod 1048 def create(cls, adk: bool = False): 1049 """Create a tool instance using class attributes""" 1050 # with open(tool_log_path, 'a') as f: 1051 # f.write("\n\ncreate entered!\n") 1052 1053 # Create an instance of the schema 1054 1055 schema_instance = cls.args_schema() 1056 1057 # # Use your existing Pydantic schema generator 1058 1059 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1060 1061 1062 1063 if adk: 1064 1065 return cls.create_adk_tool(cls.func) # new method 1066 1067 #### LANGCHAIN 1068 else: 1069 1070 schema_instance = cls.args_schema() 1071 pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments) 1072 if cls.is_async: 1073 def sync_stub(**kwargs): 1074 raise NotImplementedError("This tool is async only") 1075 1076 base_tool = Tool( 1077 name=cls.name, 1078 description=cls.description, 1079 func=sync_stub, 1080 coroutine=cls.func, 1081 args_schema=pydantic_schema 1082 ) 1083 else: 1084 base_tool = Tool( 1085 name=cls.name, 1086 description=cls.description, 1087 func=cls.func, 1088 args_schema=pydantic_schema 1089 ) 1090 1091 1092 # return instance 1093 return cls( 1094 base_tool=base_tool, 1095 args_schema=cls.args_schema, 1096 is_async=cls.is_async 1097 )
Create a tool instance using class attributes
1109class CalculatorArgsSchema(ToolArgsSchema): 1110 arguments: Dict[str, Dict[str, Any]] = { 1111 'a': { 1112 'name': 'a', 1113 'type': 'int', 1114 'description': 'First number to multiply', 1115 'required': True 1116 }, 1117 'b': { 1118 'name': 'b', 1119 'type': 'int', 1120 'description': 'Second number to multiply', 1121 'required': True 1122 } 1123 }
Meta-validator for tool arguments ensuring LangChain compatibility
Inherited Members
1126class CalculatorTool(BaseHeavenTool): 1127 name = "calculator" 1128 description = "A tool that multiplies two numbers." 1129 func = calculator_func 1130 args_schema = CalculatorArgsSchema 1131 is_async = False
Provider-agnostic tool base class with standardized results
1138class TestToolArgsSchema(ToolArgsSchema): 1139 arguments: Dict[str, Dict[str, Any]] = { 1140 'message': { 1141 'name': 'message', 1142 'type': 'str', 1143 'description': 'Any message to send to the test tool', 1144 'required': True 1145 } 1146 }
Meta-validator for tool arguments ensuring LangChain compatibility
Inherited Members
1148class TestTool(BaseHeavenTool): 1149 name = "TestTool" 1150 description = "A test tool that performs a test the user needs performed." 1151 func = test_tool_func 1152 args_schema = TestToolArgsSchema 1153 is_async = False
Provider-agnostic tool base class with standardized results
1159def example_util_func(example_arg_return_dict: bool = False) -> str: 1160 if return_dict: 1161 return {"example_result_dict": "example text output"} 1162 return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```"
1167class ExampleToolArgsSchema(ToolArgsSchema): 1168 arguments: Dict[str, Dict[str, Any]] = { 1169 'example_arg_return_dict': { 1170 'name': 'example_arg_return_dict', 1171 'type': 'bool', 1172 'description': 'Set to True to receive a dict and False to get a string', 1173 'required': True 1174 } 1175 }
Meta-validator for tool arguments ensuring LangChain compatibility
Inherited Members
1177class ExampleTool(BaseHeavenTool): 1178 name = "ExampleTool" 1179 description = "An example tool that returns an example result" 1180 func = example_util_func 1181 args_schema = ExampleToolArgsSchema 1182 is_async = False
Provider-agnostic tool base class with standardized results
1159def example_util_func(example_arg_return_dict: bool = False) -> str: 1160 if return_dict: 1161 return {"example_result_dict": "example text output"} 1162 return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```"
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.