heaven_base.baseheaventool

   1# A metaprogramming wrapper for Langchain BaseTool and Tool construction
   2from copy import deepcopy
   3from dataclasses import dataclass, fields, replace
   4from typing import Any, Dict, Optional, Type, Callable, ClassVar, Literal, List, Union
   5from langchain_core.callbacks import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun
   6from langchain_core.runnables import RunnableConfig
   7from pydantic import BaseModel, Field, create_model, ConfigDict, VERSION
   8try:
   9    from pydantic import Extra  # For Pydantic 2.10.6
  10except ImportError:
  11    Extra = None  # For Pydantic 2.11+
  12
  13# Check if we're using Pydantic 2.11+ (where Extra is deprecated)
  14PYDANTIC_2_11_PLUS = tuple(map(int, VERSION.split('.')[:2])) >= (2, 11)
  15from langchain_core.tools import BaseTool, Tool
  16from langchain_core.utils.function_calling import convert_to_openai_tool
  17from abc import abstractmethod, ABC
  18from langchain_core.utils.json_schema import dereference_refs
  19import importlib
  20from collections.abc import Mapping, Iterable
  21
  22
  23tool_log_path = "/tmp/tool_debug.log"
  24
  25def schema_to_pydantic_model(model_name: str, schema: dict) -> type:
  26    """
  27    Convert a cleaned JSON schema back into a Pydantic model.
  28    NOTE: Supports only basic 'type', 'properties', 'required', 'description'.
  29    """
  30    fields = {}
  31    props = schema.get("properties", {})
  32    required = set(schema.get("required", []))
  33
  34    type_map = {
  35        "string": str,
  36        "integer": int,
  37        "number": float,
  38        "boolean": bool,
  39        "array": list,
  40        "object": dict,
  41    }
  42
  43    for field_name, field_schema in props.items():
  44        t = type_map.get(field_schema.get("type"), Any)
  45        desc = field_schema.get("description", "")
  46        is_required = field_name in required
  47        field_def = (t, Field(... if is_required else None, description=desc))
  48        fields[field_name] = field_def
  49
  50    return create_model(model_name, **fields)
  51
  52# These will need a Message class...
  53
  54@dataclass(frozen=True)
  55class UserMessage:
  56    content: str
  57    user_id: Optional[str] = None
  58    timestamp: Optional[str] = None
  59    is_pseudo: Optional[bool] = False
  60  
  61@dataclass(frozen=True)
  62class AgentMessage:
  63    content: str
  64    agent_id: Optional[str] = None
  65    tool_call: Optional["ToolUse"] = None  # If tool triggered in content
  66    timestamp: Optional[str] = None
  67
  68@dataclass(frozen=True)
  69class ToolUse:
  70    tool_name: str
  71    arguments: dict
  72    tool_call_id: Optional[str] = None
  73    agent_id: Optional[str] = None
  74
  75
  76
  77
  78if PYDANTIC_2_11_PLUS:
  79    # Pydantic 2.11+ style - use ConfigDict
  80    ForbidExtraConfig = ConfigDict(extra='forbid')
  81else:
  82    # Pydantic 2.10.6 style - use class config
  83    class ForbidExtraConfig:
  84        extra = Extra.forbid
  85    
  86
  87@dataclass(kw_only=True, frozen=True)
  88class ToolResult:
  89    """Represents the result of a tool execution."""
  90    output: Optional[str] = None
  91    error: Optional[str] = None
  92    base64_image: Optional[str] = None
  93    system: Optional[str] = None
  94
  95    def __bool__(self):
  96        return any(getattr(self, field.name) for field in fields(self))
  97
  98    def __add__(self, other: "ToolResult"):
  99        def combine_fields(
 100            field: Optional[str], other_field: Optional[str], concatenate: bool = True
 101        ):
 102            if field and other_field:
 103                if concatenate:
 104                    return field + other_field
 105                raise ValueError("Cannot combine tool results")
 106            return field or other_field
 107
 108        return ToolResult(
 109            output=combine_fields(self.output, other.output),
 110            error=combine_fields(self.error, other.error),
 111            base64_image=combine_fields(self.base64_image, other.base64_image, False),
 112            system=combine_fields(self.system, other.system),
 113        )
 114
 115    def replace(self, **kwargs):
 116        return replace(self, **kwargs)
 117
 118class CLIResult(ToolResult):
 119    """A ToolResult that can be rendered as a CLI output."""
 120
 121class ToolFailure(ToolResult):
 122    """A ToolResult that represents a failure."""
 123
 124class ToolError(Exception):
 125    """Raised when a tool encounters an error."""
 126    def __init__(self, message):
 127        self.message = f"ERROR!!! {message}"
 128        super().__init__(message)
 129
 130
 131def fix_ref_paths(schema: dict) -> dict:
 132    """Fix $ref paths in schema by replacing #/$defs/ with #/defs/"""
 133    schema_copy = deepcopy(schema)
 134
 135    def _fix_refs_recursive(obj):
 136        if isinstance(obj, dict):
 137            if "$ref" in obj and isinstance(obj["$ref"], str):
 138                obj["$ref"] = obj["$ref"].replace("/$defs/", "/defs/")
 139            for k, v in list(obj.items()):
 140                if isinstance(v, (dict, list)):
 141                    _fix_refs_recursive(v)
 142        elif isinstance(obj, list):
 143            for item in obj:
 144                if isinstance(item, (dict, list)):
 145                    _fix_refs_recursive(item)
 146
 147    _fix_refs_recursive(schema_copy)
 148    return schema_copy
 149
 150def flatten_array_anyof(schema: dict) -> dict:
 151    """
 152    If the schema has an 'anyOf' that contains one branch with type "array"
 153    and another with type "null", flatten it to a single array schema with
 154    'nullable': true.
 155    """
 156    if "anyOf" in schema and isinstance(schema["anyOf"], list):
 157        array_branch = None
 158        null_branch = False
 159        for branch in schema["anyOf"]:
 160            if branch.get("type") == "array":
 161                array_branch = branch
 162            elif branch.get("type") == "null":
 163                null_branch = True
 164        if array_branch and null_branch:
 165            new_schema = dict(schema)
 166            new_schema.pop("anyOf")
 167            new_schema["type"] = "array"
 168            new_schema["items"] = array_branch.get("items", {})
 169            if "default" in schema:
 170                new_schema["default"] = schema["default"]
 171            new_schema["nullable"] = True
 172            if "description" in schema:
 173                new_schema["description"] = schema["description"]
 174            return new_schema
 175    return schema
 176
 177def recursive_flatten(schema: Union[dict, list]) -> Union[dict, list]:
 178    if isinstance(schema, dict):
 179        new_schema = flatten_array_anyof(schema)
 180        for key, value in new_schema.items():
 181            if isinstance(value, dict) or isinstance(value, list):
 182                new_schema[key] = recursive_flatten(value)
 183        return new_schema
 184    elif isinstance(schema, list):
 185        return [recursive_flatten(item) if isinstance(item, dict) else item for item in schema]
 186    else:
 187        return schema
 188
 189def fix_empty_object_properties(schema: Union[dict, list]) -> Union[dict, list]:
 190    """
 191    Recursively fixes any object-type schema that has an empty 'properties'
 192    dict by removing 'properties' and adding 'additionalProperties': True.
 193    """
 194    if isinstance(schema, dict):
 195        # Check if this is an object with empty properties.
 196        if schema.get("type") == "object":
 197            if "properties" in schema and not schema["properties"]:
 198                # Remove the empty properties and allow arbitrary keys.
 199                del schema["properties"]
 200                schema["additionalProperties"] = True
 201        # Recurse over dictionary values.
 202        new_schema = {}
 203        for key, value in schema.items():
 204            new_schema[key] = fix_empty_object_properties(value) if isinstance(value, (dict, list)) else value
 205        return new_schema
 206    elif isinstance(schema, list):
 207        return [fix_empty_object_properties(item) if isinstance(item, (dict, list)) else item for item in schema]
 208    return schema
 209
 210def generate_dereferenced_schema(schema: Union[dict, Type[BaseModel]]) -> dict:
 211    """
 212    Returns a fully dereferenced (flattened) JSON schema.
 213    If a Pydantic model is passed, generate its JSON schema;
 214    if a dict is passed, assume it's already a JSON schema.
 215    Additionally, flatten array schemas that use an "anyOf" and fix empty
 216    object properties to support Gemini.
 217    """
 218    if isinstance(schema, dict):
 219        raw_schema = schema
 220    else:
 221        raw_schema = schema.model_json_schema(ref_template="#/defs/{model}")
 222    # ADDED FOR ADK COMPLIANCE
 223    # Fix $ref paths before renaming $defs to defs
 224    raw_schema = fix_ref_paths(raw_schema)
 225    ########
 226    if "$defs" in raw_schema:
 227        raw_schema["defs"] = raw_schema.pop("$defs")
 228    inlined = dereference_refs(raw_schema)
 229    inlined.pop("defs", None)
 230    # flattened = recursive_flatten(inlined)
 231    # fixed = fix_empty_object_properties(flattened)
 232    fixed = fix_empty_object_properties(inlined)
 233    return fixed
 234
 235
 236
 237class ToolArgsSchema(BaseModel):
 238    """Meta-validator for tool arguments ensuring LangChain compatibility"""
 239    arguments: Dict[str, Dict[str, Any]] = Field(
 240        ..., description="Validated tool argument specifications"
 241    )
 242  
 243# V6
 244    @classmethod
 245    def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type:
 246        mapping = {str: "string", int: "integer", float: "number", bool: "boolean"}
 247        json_item_type = mapping.get(item_type, "string")
 248        
 249        class CustomList(list):
 250            @classmethod
 251            def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any:
 252                # Use the default core schema for lists.
 253                return handler.generate_schema(list)
 254    
 255            @classmethod
 256            def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]:
 257                # Get the default JSON schema from the core schema.
 258                json_schema = handler(core_schema)
 259                # Ensure that the "items" property exists.
 260                json_schema.setdefault("items", {})
 261                # Add the "type" to the items if it isn't already provided.
 262                if "type" not in json_schema["items"]:
 263                    json_schema["items"]["type"] = json_item_type
 264                # Add the custom description.
 265                json_schema["items"]["description"] = f"Item of type {item_type_str}"
 266                return json_schema
 267    
 268        return CustomList
 269
 270
 271
 272    @classmethod
 273    def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]:
 274        """
 275        Converts argument definitions into a dynamic Pydantic model.
 276        For list fields with primitive items, we use a custom list type that populates
 277        the "items" schema field properly.
 278        """
 279        schema_fields = {}
 280        type_mapping = {
 281            'int': int, 'str': str, 'float': float, 'bool': bool,
 282            'integer': int, 'string': str, 'number': float, 'boolean': bool,
 283            'list': list, 'array': list,  # We'll override list fields below.
 284            'dict': Dict[str, Any], 'object': Dict[str, Any],
 285        }
 286
 287        for arg_name, arg_details in arguments.items():
 288            if not isinstance(arg_details, dict):
 289                continue
 290
 291            arg_type_str = arg_details.get('type', 'string')
 292            description = arg_details.get('description', '')
 293            is_required = arg_details.get('required', True)
 294            default_value = arg_details.get('default', None)
 295
 296            # Determine the field type.
 297            if arg_type_str in ('dict', 'object'):
 298                schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details)
 299            elif arg_type_str in ('list', 'array'):
 300                item_info = arg_details.get('items', {})
 301                item_type_str = item_info.get('type', 'any')
 302                if item_type_str == 'any':
 303                    item_type_str = 'str'
 304                if item_type_str in ('dict', 'object'):
 305                    item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info)
 306                    schema_field_type = List[item_model]  # Use standard list of nested model.
 307                else:
 308                    primitive_type = type_mapping.get(item_type_str, str)
 309                    # Instead of List[primitive_type], use a custom list type.
 310                    schema_field_type = cls.custom_list_type(primitive_type, item_type_str)
 311            else:
 312                schema_field_type = type_mapping.get(arg_type_str, str)
 313
 314            # Append default info into description.
 315            final_description = description
 316            if default_value is not None:
 317                final_description += f" (Defaults to {repr(default_value)})"
 318                arg_details.pop('default', None)
 319
 320            field_kwargs = {"description": final_description}
 321            if not is_required:
 322                field_kwargs["default"] = None
 323                schema_field_type = Optional[schema_field_type]
 324
 325            schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs))
 326
 327        model_name = f"DynamicArgsSchema_{id(arguments)}"
 328        
 329        # Handle both Pydantic 2.10.6 and 2.11+ syntax
 330        if PYDANTIC_2_11_PLUS:
 331            # Pydantic 2.11+ - use __config_class__ with ConfigDict
 332            return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields)
 333        else:
 334            # Pydantic 2.10.6 - use __config__ with class
 335            return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields)
 336    
 337    @classmethod
 338    def _create_nested_model_recursive(cls, model_name: str, arg_definition: Dict[str, Any]) -> Type[BaseModel]:
 339        """
 340        Recursively creates a nested Pydantic model from the provided definition.
 341        Now treats 'nested' blocks as true sub-models instead of flattening them.
 342        """
 343        from pydantic import create_model, Field, BaseModel
 344        from typing import Dict, Any, Optional, List
 345
 346        known_meta = {'name','type','description','required','default','items','additionalProperties','nested'}
 347        type_map = {
 348            'integer': int, 'int': int,
 349            'string': str, 'str': str,
 350            'number': float, 'float': float,
 351            'boolean': bool,'bool': bool,
 352            'list':  list,'array': list,
 353            'dict':  dict,'object': dict,
 354        }
 355
 356        # 1) Handle any direct inline fields (outside of nested)
 357        schema_fields: Dict[str, Any] = {}
 358        for key, val in dict(arg_definition).items():
 359            if key in known_meta: 
 360                continue
 361            if isinstance(val, dict) and 'type' in val:
 362                # primitive or list/object without a nested sub‐block
 363                field_type = type_map.get(val['type'], str)
 364                if val['type'] in ('list','array'):
 365                    # list of primitives or dicts
 366                    items = val.get('items', {})
 367                    sub_type_str = items.get('type','string')
 368                    sub_py = type_map.get(sub_type_str, str)
 369                    # use your custom list type to keep items schema
 370                    field_type = cls.custom_list_type(sub_py, sub_type_str)
 371                schema_fields[key] = ( 
 372                    Optional[field_type] if not val.get('required',True) else field_type,
 373                    Field(
 374                        default=None if not val.get('required',True) else ...,
 375                        description=val.get('description','')
 376                    )
 377                )
 378
 379        # 2) Now build sub‐models for each group in 'nested'
 380        nested = arg_definition.get('nested', {}) or {}
 381        for group_name, group_props in nested.items():
 382            sub_model = cls._create_nested_model_recursive(f"{model_name}_{group_name}", group_props)
 383            # include it as a required or optional field
 384            required = group_props.get('required', True)
 385            schema_fields[group_name] = (
 386                Optional[sub_model] if not required else sub_model,
 387                Field(
 388                    default=None if not required else ...,
 389                    description=group_props.get('description','')
 390                )
 391            )
 392
 393        # 3) Create & return the Pydantic model for this level
 394        # Handle both Pydantic 2.10.6 and 2.11+ syntax
 395        if PYDANTIC_2_11_PLUS:
 396            # Pydantic 2.11+ - use __config_class__ with ConfigDict
 397            return create_model(
 398                model_name,
 399                __config_class__ = ForbidExtraConfig,
 400                **schema_fields
 401            )
 402        else:
 403            # Pydantic 2.10.6 - use __config__ with class
 404            return create_model(
 405                model_name,
 406                __config__ = ForbidExtraConfig,
 407                **schema_fields
 408            )
 409   
 410    @classmethod
 411    def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
 412        """
 413        Validates the argument definitions, ensuring they have required metadata
 414        and supported types. Returns the validated arguments with defaults moved to descriptions.
 415        """
 416        known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'}
 417        supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object']
 418        
 419        for arg_name, arg_details in arguments.items():
 420            if not isinstance(arg_details, dict):
 421                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary")
 422                
 423            # Ensure required keys are present
 424            if not all(key in arg_details for key in ['name', 'type', 'description']):
 425                missing = [k for k in ['name', 'type', 'description'] if k not in arg_details]
 426                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}")
 427
 428            # Add default 'required=True' if missing
 429            if 'required' not in arg_details:
 430                arg_details['required'] = True
 431                
 432            # Validate that 'required' is a boolean
 433            if not isinstance(arg_details['required'], bool):
 434                raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean")
 435                
 436            # Validate type is supported
 437            if arg_details['type'] not in supported_types:
 438                raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}")
 439                
 440            # Move default to description if present
 441            if 'default' in arg_details:
 442                default_value = arg_details['default']
 443                arg_details['description'] += f" (Defaults to {repr(default_value)})"
 444                del arg_details['default']
 445                
 446            # Process any nested structure the same way
 447            if 'nested' in arg_details:
 448                # Validate the nested structure
 449                nested = arg_details['nested']
 450                if not isinstance(nested, dict):
 451                    raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary")
 452                
 453                # Validate each nested item's arguments
 454                for item_key, item_args in nested.items():
 455                    if not isinstance(item_args, dict):
 456                        raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary")
 457                    
 458                    # Validate each argument
 459                    for sub_arg_name, sub_arg_def in item_args.items():
 460                        if not isinstance(sub_arg_def, dict):
 461                            raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
 462                        
 463                        # Ensure required metadata is present
 464                        if 'type' not in sub_arg_def:
 465                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'")
 466                        
 467                        if 'description' not in sub_arg_def:
 468                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'")
 469                        
 470                        # Validate type is supported
 471                        if sub_arg_def['type'] not in supported_types:
 472                            raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}")
 473                        
 474                        # Add default 'required=False' if missing for nested properties
 475                        if 'required' not in sub_arg_def:
 476                            sub_arg_def['required'] = False
 477                            
 478                        # Validate that 'required' is a boolean
 479                        if not isinstance(sub_arg_def['required'], bool):
 480                            raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean")
 481                        
 482                        # Move default to description if present
 483                        if 'default' in sub_arg_def:
 484                            default_value = sub_arg_def['default']
 485                            sub_arg_def['description'] += f" (Defaults to {repr(default_value)})"
 486                            del sub_arg_def['default']
 487                        
 488                        # Recursively process any nested structure within this
 489                        if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def:
 490                            cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def})
 491                        
 492                        # Handle nested validations for complex types
 493                        if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def:
 494                            items = sub_arg_def['items']
 495                            if not isinstance(items, dict):
 496                                raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
 497                                
 498                            if 'type' in items and items['type'] not in supported_types and items['type'] != 'any':
 499                                raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}")
 500                
 501            # Recursively validate nested structures (regular pattern)
 502            if arg_details['type'] in ['dict', 'object']:
 503                # Check for nested definitions and validate them
 504                for key, value in arg_details.items():
 505                    if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
 506                        # This is a nested property - validate it as well
 507                        cls._validate_nested_property(f"{arg_name}.{key}", value)
 508                        
 509            # Validate list item type if it's a list
 510            if arg_details['type'] in ['list', 'array']:
 511                if 'items' not in arg_details:
 512                    arg_details['items'] = {'type': 'any'}  # Default to Any if not specified
 513                elif not isinstance(arg_details['items'], dict):
 514                    raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary")
 515                elif 'type' not in arg_details['items']:
 516                    arg_details['items']['type'] = 'any'  # Default to Any if type not specified
 517                
 518                items = arg_details['items']
 519                item_type = items.get('type')
 520                
 521                if item_type not in supported_types and item_type != 'any':
 522                    raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}")
 523                
 524                if item_type in ['dict', 'object']:
 525                    # Add description to items if missing
 526                    if 'description' not in items:
 527                        items['description'] = f"Item of {arg_name} list"
 528                    
 529                    # Recursively validate nested item properties if they exist
 530                    for key, value in items.items():
 531                        if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
 532                            cls._validate_nested_property(f"{arg_name}[].{key}", value)
 533        
 534        return arguments
 535
 536    @classmethod
 537    def _validate_nested_property(cls, path: str, prop_details: Dict[str, Any]):
 538        """
 539        Validates a nested property definition recursively.
 540        
 541        Args:
 542            path: The path to this property (for error messages)
 543            prop_details: The property details to validate
 544        """
 545        # Check required metadata
 546        if 'type' not in prop_details:
 547            raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'type'")
 548            
 549        # Check for description
 550        if 'description' not in prop_details:
 551            raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'description'")
 552            
 553        # Validate type is supported
 554        supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object']
 555        if prop_details['type'] not in supported_types:
 556            raise ValueError(f"ToolArgsSchema Error: Unsupported type for {path}: {prop_details['type']}")
 557            
 558        # Add default 'required=False' if missing for nested properties
 559        if 'required' not in prop_details:
 560            prop_details['required'] = False
 561        
 562        # Validate that 'required' is a boolean
 563        if not isinstance(prop_details['required'], bool):
 564            raise ValueError(f"ToolArgsSchema Error: The 'required' field for {path} must be a boolean")
 565            
 566        # Move default to description if present
 567        if 'default' in prop_details:
 568            default_value = prop_details['default']
 569            prop_details['description'] += f" (Defaults to {repr(default_value)})"
 570            del prop_details['default']
 571            
 572        # Known metadata keys to skip when looking for nested properties
 573        known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'}
 574            
 575        # Handle nested structure within this property
 576        if 'nested' in prop_details and isinstance(prop_details['nested'], dict):
 577            nested = prop_details['nested']
 578            for item_key, item_args in nested.items():
 579                if isinstance(item_args, dict):
 580                    for sub_arg_name, sub_arg_def in item_args.items():
 581                        if isinstance(sub_arg_def, dict) and 'type' in sub_arg_def:
 582                            # Add directly to the parent object without prefixing
 583                            prop_details[sub_arg_name] = sub_arg_def
 584            # Once processed, remove the nested key to avoid double-processing
 585            del prop_details['nested']
 586            
 587        # If this is a dict/object, recursively validate its properties
 588        if prop_details['type'] in ['dict', 'object']:
 589            # Check for nested definitions and validate them
 590            for key, value in prop_details.items():
 591                if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
 592                    cls._validate_nested_property(f"{path}.{key}", value)
 593            
 594        # If this is a list/array, validate its items
 595        if prop_details['type'] in ['list', 'array']:
 596            if 'items' not in prop_details:
 597                prop_details['items'] = {'type': 'any'}
 598            elif not isinstance(prop_details['items'], dict):
 599                raise ValueError(f"ToolArgsSchema Error: 'items' for {path} must be a dictionary")
 600            elif 'type' not in prop_details['items']:
 601                prop_details['items']['type'] = 'any'
 602                
 603            items = prop_details['items']
 604            item_type = items.get('type')
 605            
 606            if item_type not in supported_types and item_type != 'any':
 607                raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {path}: {item_type}")
 608                
 609            if item_type in ['dict', 'object']:
 610                # Add description to items if missing
 611                if 'description' not in items:
 612                    items['description'] = f"Item of {path}"
 613                
 614                # Recursively validate nested item properties if they exist
 615                for key, value in items.items():
 616                    if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
 617                        cls._validate_nested_property(f"{path}[].{key}", value)
 618
 619
 620  
 621class BaseHeavenTool(ABC):
 622    """Provider-agnostic tool base class with standardized results"""
 623    # Required class attributes that tools must define
 624    name: str
 625    description: str
 626    func: Callable
 627    args_schema: Type[ToolArgsSchema] 
 628    base_tool: BaseTool 
 629    is_async: bool 
 630
 631    def __init__(
 632        self, 
 633        base_tool: BaseTool,
 634        args_schema: Type[ToolArgsSchema],
 635        is_async: bool = False
 636    ):
 637        # Set properties (no more super() call since we're not inheriting from BaseTool)
 638        self.args_schema = args_schema
 639        self.base_tool = base_tool
 640        self.is_async = is_async
 641
 642        # Create instance and validate arguments schema
 643        schema_instance = args_schema()
 644        ToolArgsSchema.validate_arguments(schema_instance.arguments)
 645    
 646    def _clean_kwargs(self, obj: Any) -> Any:
 647        """
 648        Recursively return a *copy* of `obj` in which every mapping key that looks
 649        like "'some_key'" is replaced by "some_key".
 650
 651        • Works for any Mapping subclass (dict, OrderedDict, defaultdict, …).
 652        • Keeps list / tuple / set semantics identical.
 653        • Primitives and values are left untouched.
 654        """
 655        # ---- Handle any mapping (dict-like) ---------------------------
 656        if isinstance(obj, Mapping):
 657            cleaned_items = {}
 658            for k, v in obj.items():
 659                new_k = k
 660                if (
 661                    isinstance(k, str)
 662                    and k.startswith("'")
 663                    and k.endswith("'")
 664                ):
 665                    stripped = k[1:-1]
 666                    if stripped != k:
 667                        # logging.warning(
 668                        #     "Sanitizing malformed tool-argument key: %r → %r",
 669                        #     k, stripped
 670                        # )
 671                        new_k = stripped
 672
 673                cleaned_items[new_k] = self._clean_kwargs(v)  # recurse on value
 674
 675            # Rebuild using the same mapping type when possible
 676            try:
 677                return type(obj)(cleaned_items)
 678            except Exception:
 679                return cleaned_items
 680
 681        # ---- Handle iterables that can hold nested mappings -----------
 682        if isinstance(obj, (list, tuple, set)):
 683            cleaned_iter = [self._clean_kwargs(item) for item in obj]
 684            return type(obj)(cleaned_iter)
 685
 686        # ---- Walk other iterables (e.g., generators) ------------------
 687        if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)):
 688            return type(obj)(self._clean_kwargs(item) for item in obj)
 689
 690        # ---- Base case ------------------------------------------------
 691        return obj
 692
 693        
 694    def _run(
 695        self,
 696        run_manager: Optional[CallbackManagerForToolRun] = None,
 697        config: Optional[RunnableConfig] = None,
 698        **kwargs
 699    ) -> ToolResult:
 700        """Synchronous execution path using wrapped base tool"""
 701        cleaned_kwargs = self._clean_kwargs(kwargs)
 702        if cleaned_kwargs:
 703            kwargs = cleaned_kwargs
 704        try:
 705            if self.is_async:
 706                raise ValueError("Tool marked as async but _run was called")
 707
 708            if hasattr(self.base_tool, '_run'):
 709                result = self.base_tool._run(run_manager=run_manager, config=config, **kwargs)
 710            else:
 711                # Fallback to func if _run is not available
 712                result = self.base_tool.func(**kwargs)
 713
 714   
 715            if isinstance(result, ToolResult):
 716                return result  # Already a ToolResult (includes CLIResult)
 717            return ToolResult(output=str(result))
 718        except ToolError as e:
 719            return ToolResult(error=str(e))
 720        except Exception as e:
 721            return ToolResult(error=f"Error in tool '{self.name}': {e}")
 722          
 723    async def _arun(
 724        self,
 725        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
 726        config: Optional[RunnableConfig] = None,
 727        **kwargs
 728    ) -> ToolResult:
 729        """Asynchronous execution path using wrapped base tool"""
 730        cleaned_kwargs = self._clean_kwargs(kwargs)
 731        if cleaned_kwargs:
 732            kwargs = cleaned_kwargs
 733        try:
 734          
 735            # Improved async handling
 736            if not self.is_async:
 737                # Use asyncio.to_thread for sync functions
 738                import asyncio
 739                
 740                result = await asyncio.to_thread(self.base_tool.func, **kwargs)
 741            else:
 742                # Check for native async methods
 743                if hasattr(self.base_tool, '_arun'):
 744                    result = await self.base_tool._arun(run_manager=run_manager, config=config, **kwargs)
 745                elif hasattr(self.base_tool, 'arun'):
 746                    result = await self.base_tool.arun(**kwargs)
 747                else:
 748                    
 749                    # Check if the function itself is async
 750                    import inspect
 751                    if inspect.iscoroutinefunction(self.base_tool.func):
 752                        result = await self.base_tool.func(**kwargs)
 753                    else:
 754                        # Fallback to thread-based execution only for sync functions
 755                        import asyncio
 756                        result = await asyncio.to_thread(self.base_tool.func, **kwargs)
 757   
 758              # Handle result types
 759            if isinstance(result, ToolResult):
 760                return result  # Already a ToolResult (includes CLIResult)
 761            return ToolResult(output=str(result))
 762        except ToolError as e:
 763            return ToolResult(error=str(e))
 764        except Exception as e:
 765            return ToolResult(error=f"Error in tool '{self.name}': {e}")
 766   
 767    def get_spec(self) -> dict:
 768        """Get this tool's specification"""
 769        return {
 770            "name": self.name,
 771            "description": self.description,
 772            "args": self.args_schema().arguments  # That's it. The dictionary is already there.
 773        }
 774
 775    @classmethod
 776    def to_openai_function(cls):
 777        """Convert the tool's schema to OpenAI function format"""
 778        # Create the Pydantic schema
 779        schema_instance = cls.args_schema()
 780        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
 781        # Use LangChain's built-in converter
 782        openai_function = convert_to_openai_tool(
 783            Tool(
 784                name=cls.name,
 785                description=cls.description,
 786                func=cls.func,
 787                args_schema=pydantic_schema
 788            )
 789        )
 790        
 791        return openai_function
 792      
 793    def get_openai_function(self):
 794        """Get the OpenAI function specification"""
 795        return self.__class__.to_openai_function()   
 796  
 797    @classmethod
 798    def create_adk_tool(cls, func):
 799        from google.adk.tools import FunctionTool, BaseTool
 800    
 801        # Step 1: Get original schema and dynamic Pydantic model
 802        schema_instance = cls.args_schema()
 803        DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments)
 804        flat_schema = generate_dereferenced_schema(DynamicModel)
 805        prop_schemas = flat_schema.get("properties", {})
 806        top_required = flat_schema.get("required", [])
 807        original_defs = schema_instance.arguments
 808    
 809        # Step 2: Build wrapped function with proper signature (from flattened schema)
 810        import inspect
 811        from typing import Optional, get_args, get_origin
 812    
 813        param_defs = []
 814        for name, definition in prop_schemas.items():
 815            typ = definition.get("type", "string")
 816            is_required = name in top_required
 817            default = None if not is_required else inspect._empty
 818    
 819            # Very basic type mapping
 820            type_map = {
 821                "string": str,
 822                "integer": int,
 823                "number": float,
 824                "boolean": bool,
 825                "array": list,
 826                "object": dict
 827            }
 828            resolved_type = type_map.get(typ, str)
 829            if not is_required:
 830                resolved_type = Optional[resolved_type]
 831    
 832            param_defs.append((name, resolved_type, default))
 833    
 834        # Create the function dynamically
 835        def create_wrapped_function(original_func, func_name, param_defs):
 836            params = []
 837            for name, typ, default in param_defs:
 838                param = inspect.Parameter(
 839                    name,
 840                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
 841                    annotation=typ,
 842                    default=default
 843                )
 844                params.append(param)
 845    
 846            sig = inspect.Signature(params)
 847    
 848            async def wrapper(**kwargs):
 849                import inspect, asyncio
 850
 851                if inspect.iscoroutinefunction(original_func):
 852                    return await original_func(**kwargs)
 853                else:
 854                    # run sync code in thread to avoid blocking
 855                    return await asyncio.to_thread(original_func, **kwargs)
 856                
 857    
 858            wrapper.__name__ = func_name
 859            wrapper.__qualname__ = func_name
 860            wrapper.__signature__ = sig
 861            return wrapper
 862    
 863        wrapped_func = create_wrapped_function(func, cls.name, param_defs)
 864    
 865        # Step 3: Create ADK FunctionTool
 866        orig_tool = FunctionTool(func=wrapped_func)
 867        orig_tool.description = cls.description
 868        # Step 4: Schema fixer merge logic
 869        def merge(adk_prop, pschema, info_def=None):
 870            enum_cls = type(adk_prop.type)
 871            if "anyOf" in pschema:
 872                branches = pschema.pop("anyOf")
 873                if any(b.get("type") == "null" for b in branches):
 874                    adk_prop.nullable = True
 875                arr = next((b for b in branches if b.get("type") == "array"), None)
 876                chosen = arr or next((b for b in branches if b.get("type") != "null"), {})
 877                pschema = chosen
 878    
 879            if isinstance(info_def, dict):
 880                if info_def.get("name"):
 881                    adk_prop.title = info_def["name"]
 882                if not getattr(adk_prop, "description", None) and info_def.get("description"):
 883                    adk_prop.description = info_def["description"]
 884    
 885            if "description" in pschema:
 886                adk_prop.description = pschema["description"]
 887    
 888            if "items" in pschema or pschema.get("type") == "array":
 889                adk_prop.type = enum_cls.ARRAY
 890            elif "type" in pschema:
 891                t = pschema["type"].upper()
 892                if hasattr(enum_cls, t):
 893                    adk_prop.type = getattr(enum_cls, t)
 894    
 895            if adk_prop.type == enum_cls.OBJECT:
 896                adk_prop.properties = adk_prop.properties or {}
 897                # nested_defs = {}
 898                # if isinstance(info_def, dict):
 899                #     if isinstance(info_def.get("nested"), dict):
 900                #         nested_defs = info_def["nested"]
 901                #     else:
 902                #         nested_defs = {
 903                #             k: v for k, v in info_def.items()
 904                #             if isinstance(v, dict) and ("type" in v or "nested" in v)
 905                #         }
 906                # always allow any dict under info_def[key] as child_info
 907                for key, child_ps in pschema.get("properties", {}).items():
 908                    child_info = {}
 909                    if isinstance(info_def, dict) and isinstance(info_def.get(key), dict):
 910                        child_info = info_def[key]
 911                # for key, child_ps in pschema.get("properties", {}).items():
 912                #     child_info = nested_defs.get(key, {})
 913                    if key not in adk_prop.properties:
 914                        Placeholder = type(adk_prop)
 915                        adk_prop.properties[key] = Placeholder.model_construct(
 916                            __pydantic_initialised__=True,
 917                            name=key,
 918                            title=key,
 919                            description="",
 920                            properties=None,
 921                            items=None,
 922                            required=[],
 923                            type=adk_prop.type,
 924                        )
 925                    merge(adk_prop.properties[key], child_ps, child_info)
 926    
 927                # req = pschema.get("required")
 928                # if req is None:
 929                #     req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")]
 930                # adk_prop.required = req or []
 931                # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def
 932                req = pschema.get("required")
 933                if req is None:
 934                        req = []
 935                        if isinstance(info_def, dict):
 936                                req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")]
 937                adk_prop.required = req or []
 938    
 939            elif adk_prop.type == enum_cls.ARRAY:
 940                if adk_prop.items is None:
 941                    Placeholder = type(adk_prop)
 942                    adk_prop.items = Placeholder.model_construct(
 943                        __pydantic_initialised__=True,
 944                        name=adk_prop.title or "",
 945                        title=adk_prop.title or "",
 946                        description=pschema.get("items", {}).get("description", ""),
 947                        properties=None,
 948                        items=None,
 949                        required=[],
 950                        type=adk_prop.type,
 951                    )
 952                merge(adk_prop.items, pschema.get("items", {}), None)
 953#### LITELLM ###
 954        def normalize_schema_decl(schema):
 955            from enum import Enum
 956            # `schema` is a google.genai.types.Schema or similar
 957            # 1️⃣ normalize this node’s type
 958            if hasattr(schema, "type") and isinstance(schema.type, Enum):
 959                schema.type = schema.type.name.lower()
 960            # 2️⃣ normalize any nested properties
 961            if getattr(schema, "properties", None):
 962                for prop in schema.properties.values():
 963                    normalize_schema_decl(prop)
 964            # 3️⃣ normalize array items
 965            if getattr(schema, "items", None):
 966                normalize_schema_decl(schema.items)
 967            return schema
 968#### LITELLM ####
 969        # Step 5: Wrap in ADK BaseTool with fixed declaration
 970        class RequiredFieldsFixTool(BaseTool):
 971            def __init__(self, orig_tool):
 972                self._orig_tool = orig_tool
 973                super().__init__(name=orig_tool.name, description=orig_tool.description)
 974    
 975            async def run_async(self, **kwargs):
 976                try:
 977                    # 1) invoke the underlying tool (may be sync or async)
 978                    raw = self._orig_tool.run_async(**kwargs)
 979                    result = await raw if inspect.isawaitable(raw) else raw
 980        
 981                    # 2) if it's already a ToolResult or CLIResult, just return it
 982                    if isinstance(result, ToolResult) or isinstance(result, CLIResult):
 983                        return result
 984        
 985                    # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields
 986                    if isinstance(result, dict):
 987                        # ADK wraps tool output under result["response"]["result"]
 988                        resp = result.get("response", result)
 989                        payload = resp.get("result", resp) if isinstance(resp, dict) else resp
 990        
 991                        # payload may itself be a dict containing output, error, base64_image, etc.
 992                        if isinstance(payload, dict):
 993                            return ToolResult(
 994                                output=payload.get("output", ""),
 995                                error=payload.get("error", None),
 996                                base64_image=payload.get("base64_image", None),
 997                                system=payload.get("system", None),
 998                            )
 999                        # otherwise treat payload as a plain string
1000                        return ToolResult(output=str(payload))
1001        
1002                    # 4) if it's just a string, wrap it
1003                    if isinstance(result, str):
1004                        return ToolResult(output=result)
1005        
1006                    # 5) fallback for anything else
1007                    return ToolResult(output=str(result))
1008        
1009                except ToolError as e:
1010                    return ToolResult(error=str(e))
1011                except Exception as e:
1012                    return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}")
1013                  
1014            def _get_declaration(self):
1015                decl = self._orig_tool._get_declaration()
1016                
1017                decl.description = self.description
1018                if not (decl and decl.parameters and decl.parameters.properties):
1019                    return decl
1020    
1021                decl.parameters.required = top_required
1022    
1023                for key, ps in prop_schemas.items():
1024                    prop = decl.parameters.properties.get(key)
1025                    if not prop:
1026                        continue
1027                    merge(prop, ps, original_defs.get(key))
1028                ### LITE LLM
1029                # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings
1030                # from enum import Enum
1031                # for prop in decl.parameters.properties.values():
1032                #    # only convert ADK enum values
1033                #     if isinstance(prop.type, Enum):
1034                #        prop.type = prop.type.name.lower()
1035                #    # handle array items too
1036                #     if getattr(prop, "items", None) and isinstance(prop.items.type, Enum):
1037                #         prop.items.type = prop.items.type.name.lower()
1038                normalize_schema_decl(decl.parameters)
1039                ###
1040                return decl
1041    
1042        return RequiredFieldsFixTool(orig_tool)
1043
1044
1045  
1046    @classmethod
1047    def create(cls, adk: bool = False):
1048        """Create a tool instance using class attributes"""
1049        # with open(tool_log_path, 'a') as f:
1050        #         f.write("\n\ncreate entered!\n")
1051       
1052        # Create an instance of the schema
1053        
1054        schema_instance = cls.args_schema()
1055    
1056        # # Use your existing Pydantic schema generator
1057        
1058        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1059        
1060        
1061        
1062        if adk:
1063            
1064            return cls.create_adk_tool(cls.func) # new method
1065                   
1066        #### LANGCHAIN
1067        else:
1068            
1069            schema_instance = cls.args_schema()
1070            pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1071            if cls.is_async:
1072                def sync_stub(**kwargs):
1073                    raise NotImplementedError("This tool is async only")
1074    
1075                base_tool = Tool(
1076                    name=cls.name,
1077                    description=cls.description,
1078                    func=sync_stub,
1079                    coroutine=cls.func,
1080                    args_schema=pydantic_schema
1081                )
1082            else:
1083                base_tool = Tool(
1084                    name=cls.name,
1085                    description=cls.description,
1086                    func=cls.func,
1087                    args_schema=pydantic_schema
1088                )
1089    
1090           
1091            # return instance
1092            return cls(
1093                base_tool=base_tool,
1094                args_schema=cls.args_schema,
1095                is_async=cls.is_async
1096            )
1097
1098
1099
1100
1101### EXAMPLE ###
1102
1103# ✅ Define Calculator function
1104def calculator_func(a: int, b: int) -> str:
1105    return str(a * b)
1106
1107# ✅ Define ToolArgsSchema for Calculator
1108class CalculatorArgsSchema(ToolArgsSchema):
1109    arguments: Dict[str, Dict[str, Any]] = {
1110        'a': {
1111            'name': 'a', 
1112            'type': 'int', 
1113            'description': 'First number to multiply',
1114            'required': True
1115        },
1116        'b': {
1117            'name': 'b', 
1118            'type': 'int', 
1119            'description': 'Second number to multiply',
1120            'required': True
1121        }
1122    }
1123
1124# Calculator Tool
1125class CalculatorTool(BaseHeavenTool):
1126    name = "calculator"
1127    description = "A tool that multiplies two numbers."
1128    func = calculator_func
1129    args_schema = CalculatorArgsSchema
1130    is_async = False
1131
1132
1133def test_tool_func(message: str) -> str:
1134    return "The test tool returns this str:`ASDON!@DASN(IJ K@!EDWQACKNSXCBUO!LASDKANSLDALSDLK!@`\n\nMake sure to follow the original instructions while responding to this."
1135
1136# Updated Test Tool Args Schema with required flags
1137class TestToolArgsSchema(ToolArgsSchema):
1138    arguments: Dict[str, Dict[str, Any]] = {
1139        'message': {
1140            'name': 'message',
1141            'type': 'str',
1142            'description': 'Any message to send to the test tool',
1143            'required': True
1144        }
1145    }
1146
1147class TestTool(BaseHeavenTool):
1148    name = "TestTool"
1149    description = "A test tool that performs a test the user needs performed."
1150    func = test_tool_func
1151    args_schema = TestToolArgsSchema
1152    is_async = False
1153
1154
1155# Example tool definition
1156# if you need to, add imports
1157# 
1158def example_util_func(example_arg_return_dict: bool = False) -> str:
1159    if return_dict:
1160        return {"example_result_dict": "example text output"}
1161    return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```"
1162
1163# from .baseheaventool import BaseHeavenTool, ToolArgsSchema
1164# from heaven_base.tools.tool_utils.example_util import example_util_func ## Doesnt exist outside of documentation purposes
1165# Updated Test Tool Args Schema with required flags
1166class ExampleToolArgsSchema(ToolArgsSchema):
1167    arguments: Dict[str, Dict[str, Any]] = {
1168        'example_arg_return_dict': {
1169            'name': 'example_arg_return_dict',
1170            'type': 'bool',
1171            'description': 'Set to True to receive a dict and False to get a string',
1172            'required': True
1173        }
1174    }
1175
1176class ExampleTool(BaseHeavenTool):
1177    name = "ExampleTool"
1178    description = "An example tool that returns an example result"
1179    func = example_util_func
1180    args_schema = ExampleToolArgsSchema
1181    is_async = False
1182
1183# Then on the agent side ie using the tool:
1184# import the tool
1185# append it to the HeavenAgentConfig.tools
1186# Initialize the agent
1187# Pass that agent to the hermes step in heaven_base.tool_utils.hermes_utils
1188# Run it or create a HermesConfig and run it
1189# Now you understand tools in HEAVEN SDK!
1190
1191# This is one possible way to allow openAi provider binding to work with tools
1192class StrictDict(BaseModel):
1193    class Config:
1194        extra = Extra.forbid
1195
1196# ADK Attempt... doesnt work
1197# class StrictDict(BaseModel):
1198#     model_config = {"extra": "forbid"}
PYDANTIC_2_11_PLUS = True
tool_log_path = '/tmp/tool_debug.log'
def schema_to_pydantic_model(model_name: str, schema: dict) -> type:
26def schema_to_pydantic_model(model_name: str, schema: dict) -> type:
27    """
28    Convert a cleaned JSON schema back into a Pydantic model.
29    NOTE: Supports only basic 'type', 'properties', 'required', 'description'.
30    """
31    fields = {}
32    props = schema.get("properties", {})
33    required = set(schema.get("required", []))
34
35    type_map = {
36        "string": str,
37        "integer": int,
38        "number": float,
39        "boolean": bool,
40        "array": list,
41        "object": dict,
42    }
43
44    for field_name, field_schema in props.items():
45        t = type_map.get(field_schema.get("type"), Any)
46        desc = field_schema.get("description", "")
47        is_required = field_name in required
48        field_def = (t, Field(... if is_required else None, description=desc))
49        fields[field_name] = field_def
50
51    return create_model(model_name, **fields)

Convert a cleaned JSON schema back into a Pydantic model. NOTE: Supports only basic 'type', 'properties', 'required', 'description'.

@dataclass(frozen=True)
class UserMessage:
55@dataclass(frozen=True)
56class UserMessage:
57    content: str
58    user_id: Optional[str] = None
59    timestamp: Optional[str] = None
60    is_pseudo: Optional[bool] = False
UserMessage( content: str, user_id: Optional[str] = None, timestamp: Optional[str] = None, is_pseudo: Optional[bool] = False)
content: str
user_id: Optional[str] = None
timestamp: Optional[str] = None
is_pseudo: Optional[bool] = False
@dataclass(frozen=True)
class AgentMessage:
62@dataclass(frozen=True)
63class AgentMessage:
64    content: str
65    agent_id: Optional[str] = None
66    tool_call: Optional["ToolUse"] = None  # If tool triggered in content
67    timestamp: Optional[str] = None
AgentMessage( content: str, agent_id: Optional[str] = None, tool_call: Optional[ToolUse] = None, timestamp: Optional[str] = None)
content: str
agent_id: Optional[str] = None
tool_call: Optional[ToolUse] = None
timestamp: Optional[str] = None
@dataclass(frozen=True)
class ToolUse:
69@dataclass(frozen=True)
70class ToolUse:
71    tool_name: str
72    arguments: dict
73    tool_call_id: Optional[str] = None
74    agent_id: Optional[str] = None
ToolUse( tool_name: str, arguments: dict, tool_call_id: Optional[str] = None, agent_id: Optional[str] = None)
tool_name: str
arguments: dict
tool_call_id: Optional[str] = None
agent_id: Optional[str] = None
@dataclass(kw_only=True, frozen=True)
class ToolResult:
 88@dataclass(kw_only=True, frozen=True)
 89class ToolResult:
 90    """Represents the result of a tool execution."""
 91    output: Optional[str] = None
 92    error: Optional[str] = None
 93    base64_image: Optional[str] = None
 94    system: Optional[str] = None
 95
 96    def __bool__(self):
 97        return any(getattr(self, field.name) for field in fields(self))
 98
 99    def __add__(self, other: "ToolResult"):
100        def combine_fields(
101            field: Optional[str], other_field: Optional[str], concatenate: bool = True
102        ):
103            if field and other_field:
104                if concatenate:
105                    return field + other_field
106                raise ValueError("Cannot combine tool results")
107            return field or other_field
108
109        return ToolResult(
110            output=combine_fields(self.output, other.output),
111            error=combine_fields(self.error, other.error),
112            base64_image=combine_fields(self.base64_image, other.base64_image, False),
113            system=combine_fields(self.system, other.system),
114        )
115
116    def replace(self, **kwargs):
117        return replace(self, **kwargs)

Represents the result of a tool execution.

ToolResult( *, output: Optional[str] = None, error: Optional[str] = None, base64_image: Optional[str] = None, system: Optional[str] = None)
output: Optional[str] = None
error: Optional[str] = None
base64_image: Optional[str] = None
system: Optional[str] = None
def replace(self, **kwargs):
116    def replace(self, **kwargs):
117        return replace(self, **kwargs)
class CLIResult(ToolResult):
119class CLIResult(ToolResult):
120    """A ToolResult that can be rendered as a CLI output."""

A ToolResult that can be rendered as a CLI output.

class ToolFailure(ToolResult):
122class ToolFailure(ToolResult):
123    """A ToolResult that represents a failure."""

A ToolResult that represents a failure.

class ToolError(builtins.Exception):
125class ToolError(Exception):
126    """Raised when a tool encounters an error."""
127    def __init__(self, message):
128        self.message = f"ERROR!!! {message}"
129        super().__init__(message)

Raised when a tool encounters an error.

ToolError(message)
127    def __init__(self, message):
128        self.message = f"ERROR!!! {message}"
129        super().__init__(message)
message
def fix_ref_paths(schema: dict) -> dict:
132def fix_ref_paths(schema: dict) -> dict:
133    """Fix $ref paths in schema by replacing #/$defs/ with #/defs/"""
134    schema_copy = deepcopy(schema)
135
136    def _fix_refs_recursive(obj):
137        if isinstance(obj, dict):
138            if "$ref" in obj and isinstance(obj["$ref"], str):
139                obj["$ref"] = obj["$ref"].replace("/$defs/", "/defs/")
140            for k, v in list(obj.items()):
141                if isinstance(v, (dict, list)):
142                    _fix_refs_recursive(v)
143        elif isinstance(obj, list):
144            for item in obj:
145                if isinstance(item, (dict, list)):
146                    _fix_refs_recursive(item)
147
148    _fix_refs_recursive(schema_copy)
149    return schema_copy

Fix $ref paths in schema by replacing #/$defs/ with #/defs/

def flatten_array_anyof(schema: dict) -> dict:
151def flatten_array_anyof(schema: dict) -> dict:
152    """
153    If the schema has an 'anyOf' that contains one branch with type "array"
154    and another with type "null", flatten it to a single array schema with
155    'nullable': true.
156    """
157    if "anyOf" in schema and isinstance(schema["anyOf"], list):
158        array_branch = None
159        null_branch = False
160        for branch in schema["anyOf"]:
161            if branch.get("type") == "array":
162                array_branch = branch
163            elif branch.get("type") == "null":
164                null_branch = True
165        if array_branch and null_branch:
166            new_schema = dict(schema)
167            new_schema.pop("anyOf")
168            new_schema["type"] = "array"
169            new_schema["items"] = array_branch.get("items", {})
170            if "default" in schema:
171                new_schema["default"] = schema["default"]
172            new_schema["nullable"] = True
173            if "description" in schema:
174                new_schema["description"] = schema["description"]
175            return new_schema
176    return schema

If the schema has an 'anyOf' that contains one branch with type "array" and another with type "null", flatten it to a single array schema with 'nullable': true.

def recursive_flatten(schema: Union[dict, list]) -> Union[dict, list]:
178def recursive_flatten(schema: Union[dict, list]) -> Union[dict, list]:
179    if isinstance(schema, dict):
180        new_schema = flatten_array_anyof(schema)
181        for key, value in new_schema.items():
182            if isinstance(value, dict) or isinstance(value, list):
183                new_schema[key] = recursive_flatten(value)
184        return new_schema
185    elif isinstance(schema, list):
186        return [recursive_flatten(item) if isinstance(item, dict) else item for item in schema]
187    else:
188        return schema
def fix_empty_object_properties(schema: Union[dict, list]) -> Union[dict, list]:
190def fix_empty_object_properties(schema: Union[dict, list]) -> Union[dict, list]:
191    """
192    Recursively fixes any object-type schema that has an empty 'properties'
193    dict by removing 'properties' and adding 'additionalProperties': True.
194    """
195    if isinstance(schema, dict):
196        # Check if this is an object with empty properties.
197        if schema.get("type") == "object":
198            if "properties" in schema and not schema["properties"]:
199                # Remove the empty properties and allow arbitrary keys.
200                del schema["properties"]
201                schema["additionalProperties"] = True
202        # Recurse over dictionary values.
203        new_schema = {}
204        for key, value in schema.items():
205            new_schema[key] = fix_empty_object_properties(value) if isinstance(value, (dict, list)) else value
206        return new_schema
207    elif isinstance(schema, list):
208        return [fix_empty_object_properties(item) if isinstance(item, (dict, list)) else item for item in schema]
209    return schema

Recursively fixes any object-type schema that has an empty 'properties' dict by removing 'properties' and adding 'additionalProperties': True.

def generate_dereferenced_schema(schema: Union[dict, Type[pydantic.main.BaseModel]]) -> dict:
211def generate_dereferenced_schema(schema: Union[dict, Type[BaseModel]]) -> dict:
212    """
213    Returns a fully dereferenced (flattened) JSON schema.
214    If a Pydantic model is passed, generate its JSON schema;
215    if a dict is passed, assume it's already a JSON schema.
216    Additionally, flatten array schemas that use an "anyOf" and fix empty
217    object properties to support Gemini.
218    """
219    if isinstance(schema, dict):
220        raw_schema = schema
221    else:
222        raw_schema = schema.model_json_schema(ref_template="#/defs/{model}")
223    # ADDED FOR ADK COMPLIANCE
224    # Fix $ref paths before renaming $defs to defs
225    raw_schema = fix_ref_paths(raw_schema)
226    ########
227    if "$defs" in raw_schema:
228        raw_schema["defs"] = raw_schema.pop("$defs")
229    inlined = dereference_refs(raw_schema)
230    inlined.pop("defs", None)
231    # flattened = recursive_flatten(inlined)
232    # fixed = fix_empty_object_properties(flattened)
233    fixed = fix_empty_object_properties(inlined)
234    return fixed

Returns a fully dereferenced (flattened) JSON schema. If a Pydantic model is passed, generate its JSON schema; if a dict is passed, assume it's already a JSON schema. Additionally, flatten array schemas that use an "anyOf" and fix empty object properties to support Gemini.

class ToolArgsSchema(pydantic.main.BaseModel):
238class ToolArgsSchema(BaseModel):
239    """Meta-validator for tool arguments ensuring LangChain compatibility"""
240    arguments: Dict[str, Dict[str, Any]] = Field(
241        ..., description="Validated tool argument specifications"
242    )
243  
244# V6
245    @classmethod
246    def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type:
247        mapping = {str: "string", int: "integer", float: "number", bool: "boolean"}
248        json_item_type = mapping.get(item_type, "string")
249        
250        class CustomList(list):
251            @classmethod
252            def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any:
253                # Use the default core schema for lists.
254                return handler.generate_schema(list)
255    
256            @classmethod
257            def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]:
258                # Get the default JSON schema from the core schema.
259                json_schema = handler(core_schema)
260                # Ensure that the "items" property exists.
261                json_schema.setdefault("items", {})
262                # Add the "type" to the items if it isn't already provided.
263                if "type" not in json_schema["items"]:
264                    json_schema["items"]["type"] = json_item_type
265                # Add the custom description.
266                json_schema["items"]["description"] = f"Item of type {item_type_str}"
267                return json_schema
268    
269        return CustomList
270
271
272
273    @classmethod
274    def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]:
275        """
276        Converts argument definitions into a dynamic Pydantic model.
277        For list fields with primitive items, we use a custom list type that populates
278        the "items" schema field properly.
279        """
280        schema_fields = {}
281        type_mapping = {
282            'int': int, 'str': str, 'float': float, 'bool': bool,
283            'integer': int, 'string': str, 'number': float, 'boolean': bool,
284            'list': list, 'array': list,  # We'll override list fields below.
285            'dict': Dict[str, Any], 'object': Dict[str, Any],
286        }
287
288        for arg_name, arg_details in arguments.items():
289            if not isinstance(arg_details, dict):
290                continue
291
292            arg_type_str = arg_details.get('type', 'string')
293            description = arg_details.get('description', '')
294            is_required = arg_details.get('required', True)
295            default_value = arg_details.get('default', None)
296
297            # Determine the field type.
298            if arg_type_str in ('dict', 'object'):
299                schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details)
300            elif arg_type_str in ('list', 'array'):
301                item_info = arg_details.get('items', {})
302                item_type_str = item_info.get('type', 'any')
303                if item_type_str == 'any':
304                    item_type_str = 'str'
305                if item_type_str in ('dict', 'object'):
306                    item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info)
307                    schema_field_type = List[item_model]  # Use standard list of nested model.
308                else:
309                    primitive_type = type_mapping.get(item_type_str, str)
310                    # Instead of List[primitive_type], use a custom list type.
311                    schema_field_type = cls.custom_list_type(primitive_type, item_type_str)
312            else:
313                schema_field_type = type_mapping.get(arg_type_str, str)
314
315            # Append default info into description.
316            final_description = description
317            if default_value is not None:
318                final_description += f" (Defaults to {repr(default_value)})"
319                arg_details.pop('default', None)
320
321            field_kwargs = {"description": final_description}
322            if not is_required:
323                field_kwargs["default"] = None
324                schema_field_type = Optional[schema_field_type]
325
326            schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs))
327
328        model_name = f"DynamicArgsSchema_{id(arguments)}"
329        
330        # Handle both Pydantic 2.10.6 and 2.11+ syntax
331        if PYDANTIC_2_11_PLUS:
332            # Pydantic 2.11+ - use __config_class__ with ConfigDict
333            return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields)
334        else:
335            # Pydantic 2.10.6 - use __config__ with class
336            return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields)
337    
338    @classmethod
339    def _create_nested_model_recursive(cls, model_name: str, arg_definition: Dict[str, Any]) -> Type[BaseModel]:
340        """
341        Recursively creates a nested Pydantic model from the provided definition.
342        Now treats 'nested' blocks as true sub-models instead of flattening them.
343        """
344        from pydantic import create_model, Field, BaseModel
345        from typing import Dict, Any, Optional, List
346
347        known_meta = {'name','type','description','required','default','items','additionalProperties','nested'}
348        type_map = {
349            'integer': int, 'int': int,
350            'string': str, 'str': str,
351            'number': float, 'float': float,
352            'boolean': bool,'bool': bool,
353            'list':  list,'array': list,
354            'dict':  dict,'object': dict,
355        }
356
357        # 1) Handle any direct inline fields (outside of nested)
358        schema_fields: Dict[str, Any] = {}
359        for key, val in dict(arg_definition).items():
360            if key in known_meta: 
361                continue
362            if isinstance(val, dict) and 'type' in val:
363                # primitive or list/object without a nested sub‐block
364                field_type = type_map.get(val['type'], str)
365                if val['type'] in ('list','array'):
366                    # list of primitives or dicts
367                    items = val.get('items', {})
368                    sub_type_str = items.get('type','string')
369                    sub_py = type_map.get(sub_type_str, str)
370                    # use your custom list type to keep items schema
371                    field_type = cls.custom_list_type(sub_py, sub_type_str)
372                schema_fields[key] = ( 
373                    Optional[field_type] if not val.get('required',True) else field_type,
374                    Field(
375                        default=None if not val.get('required',True) else ...,
376                        description=val.get('description','')
377                    )
378                )
379
380        # 2) Now build sub‐models for each group in 'nested'
381        nested = arg_definition.get('nested', {}) or {}
382        for group_name, group_props in nested.items():
383            sub_model = cls._create_nested_model_recursive(f"{model_name}_{group_name}", group_props)
384            # include it as a required or optional field
385            required = group_props.get('required', True)
386            schema_fields[group_name] = (
387                Optional[sub_model] if not required else sub_model,
388                Field(
389                    default=None if not required else ...,
390                    description=group_props.get('description','')
391                )
392            )
393
394        # 3) Create & return the Pydantic model for this level
395        # Handle both Pydantic 2.10.6 and 2.11+ syntax
396        if PYDANTIC_2_11_PLUS:
397            # Pydantic 2.11+ - use __config_class__ with ConfigDict
398            return create_model(
399                model_name,
400                __config_class__ = ForbidExtraConfig,
401                **schema_fields
402            )
403        else:
404            # Pydantic 2.10.6 - use __config__ with class
405            return create_model(
406                model_name,
407                __config__ = ForbidExtraConfig,
408                **schema_fields
409            )
410   
411    @classmethod
412    def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
413        """
414        Validates the argument definitions, ensuring they have required metadata
415        and supported types. Returns the validated arguments with defaults moved to descriptions.
416        """
417        known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'}
418        supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object']
419        
420        for arg_name, arg_details in arguments.items():
421            if not isinstance(arg_details, dict):
422                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary")
423                
424            # Ensure required keys are present
425            if not all(key in arg_details for key in ['name', 'type', 'description']):
426                missing = [k for k in ['name', 'type', 'description'] if k not in arg_details]
427                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}")
428
429            # Add default 'required=True' if missing
430            if 'required' not in arg_details:
431                arg_details['required'] = True
432                
433            # Validate that 'required' is a boolean
434            if not isinstance(arg_details['required'], bool):
435                raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean")
436                
437            # Validate type is supported
438            if arg_details['type'] not in supported_types:
439                raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}")
440                
441            # Move default to description if present
442            if 'default' in arg_details:
443                default_value = arg_details['default']
444                arg_details['description'] += f" (Defaults to {repr(default_value)})"
445                del arg_details['default']
446                
447            # Process any nested structure the same way
448            if 'nested' in arg_details:
449                # Validate the nested structure
450                nested = arg_details['nested']
451                if not isinstance(nested, dict):
452                    raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary")
453                
454                # Validate each nested item's arguments
455                for item_key, item_args in nested.items():
456                    if not isinstance(item_args, dict):
457                        raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary")
458                    
459                    # Validate each argument
460                    for sub_arg_name, sub_arg_def in item_args.items():
461                        if not isinstance(sub_arg_def, dict):
462                            raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
463                        
464                        # Ensure required metadata is present
465                        if 'type' not in sub_arg_def:
466                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'")
467                        
468                        if 'description' not in sub_arg_def:
469                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'")
470                        
471                        # Validate type is supported
472                        if sub_arg_def['type'] not in supported_types:
473                            raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}")
474                        
475                        # Add default 'required=False' if missing for nested properties
476                        if 'required' not in sub_arg_def:
477                            sub_arg_def['required'] = False
478                            
479                        # Validate that 'required' is a boolean
480                        if not isinstance(sub_arg_def['required'], bool):
481                            raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean")
482                        
483                        # Move default to description if present
484                        if 'default' in sub_arg_def:
485                            default_value = sub_arg_def['default']
486                            sub_arg_def['description'] += f" (Defaults to {repr(default_value)})"
487                            del sub_arg_def['default']
488                        
489                        # Recursively process any nested structure within this
490                        if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def:
491                            cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def})
492                        
493                        # Handle nested validations for complex types
494                        if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def:
495                            items = sub_arg_def['items']
496                            if not isinstance(items, dict):
497                                raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
498                                
499                            if 'type' in items and items['type'] not in supported_types and items['type'] != 'any':
500                                raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}")
501                
502            # Recursively validate nested structures (regular pattern)
503            if arg_details['type'] in ['dict', 'object']:
504                # Check for nested definitions and validate them
505                for key, value in arg_details.items():
506                    if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
507                        # This is a nested property - validate it as well
508                        cls._validate_nested_property(f"{arg_name}.{key}", value)
509                        
510            # Validate list item type if it's a list
511            if arg_details['type'] in ['list', 'array']:
512                if 'items' not in arg_details:
513                    arg_details['items'] = {'type': 'any'}  # Default to Any if not specified
514                elif not isinstance(arg_details['items'], dict):
515                    raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary")
516                elif 'type' not in arg_details['items']:
517                    arg_details['items']['type'] = 'any'  # Default to Any if type not specified
518                
519                items = arg_details['items']
520                item_type = items.get('type')
521                
522                if item_type not in supported_types and item_type != 'any':
523                    raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}")
524                
525                if item_type in ['dict', 'object']:
526                    # Add description to items if missing
527                    if 'description' not in items:
528                        items['description'] = f"Item of {arg_name} list"
529                    
530                    # Recursively validate nested item properties if they exist
531                    for key, value in items.items():
532                        if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
533                            cls._validate_nested_property(f"{arg_name}[].{key}", value)
534        
535        return arguments
536
537    @classmethod
538    def _validate_nested_property(cls, path: str, prop_details: Dict[str, Any]):
539        """
540        Validates a nested property definition recursively.
541        
542        Args:
543            path: The path to this property (for error messages)
544            prop_details: The property details to validate
545        """
546        # Check required metadata
547        if 'type' not in prop_details:
548            raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'type'")
549            
550        # Check for description
551        if 'description' not in prop_details:
552            raise ValueError(f"ToolArgsSchema Error: Nested property {path} missing required 'description'")
553            
554        # Validate type is supported
555        supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object']
556        if prop_details['type'] not in supported_types:
557            raise ValueError(f"ToolArgsSchema Error: Unsupported type for {path}: {prop_details['type']}")
558            
559        # Add default 'required=False' if missing for nested properties
560        if 'required' not in prop_details:
561            prop_details['required'] = False
562        
563        # Validate that 'required' is a boolean
564        if not isinstance(prop_details['required'], bool):
565            raise ValueError(f"ToolArgsSchema Error: The 'required' field for {path} must be a boolean")
566            
567        # Move default to description if present
568        if 'default' in prop_details:
569            default_value = prop_details['default']
570            prop_details['description'] += f" (Defaults to {repr(default_value)})"
571            del prop_details['default']
572            
573        # Known metadata keys to skip when looking for nested properties
574        known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'}
575            
576        # Handle nested structure within this property
577        if 'nested' in prop_details and isinstance(prop_details['nested'], dict):
578            nested = prop_details['nested']
579            for item_key, item_args in nested.items():
580                if isinstance(item_args, dict):
581                    for sub_arg_name, sub_arg_def in item_args.items():
582                        if isinstance(sub_arg_def, dict) and 'type' in sub_arg_def:
583                            # Add directly to the parent object without prefixing
584                            prop_details[sub_arg_name] = sub_arg_def
585            # Once processed, remove the nested key to avoid double-processing
586            del prop_details['nested']
587            
588        # If this is a dict/object, recursively validate its properties
589        if prop_details['type'] in ['dict', 'object']:
590            # Check for nested definitions and validate them
591            for key, value in prop_details.items():
592                if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
593                    cls._validate_nested_property(f"{path}.{key}", value)
594            
595        # If this is a list/array, validate its items
596        if prop_details['type'] in ['list', 'array']:
597            if 'items' not in prop_details:
598                prop_details['items'] = {'type': 'any'}
599            elif not isinstance(prop_details['items'], dict):
600                raise ValueError(f"ToolArgsSchema Error: 'items' for {path} must be a dictionary")
601            elif 'type' not in prop_details['items']:
602                prop_details['items']['type'] = 'any'
603                
604            items = prop_details['items']
605            item_type = items.get('type')
606            
607            if item_type not in supported_types and item_type != 'any':
608                raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {path}: {item_type}")
609                
610            if item_type in ['dict', 'object']:
611                # Add description to items if missing
612                if 'description' not in items:
613                    items['description'] = f"Item of {path}"
614                
615                # Recursively validate nested item properties if they exist
616                for key, value in items.items():
617                    if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
618                        cls._validate_nested_property(f"{path}[].{key}", value)

Meta-validator for tool arguments ensuring LangChain compatibility

arguments: Dict[str, Dict[str, Any]] = PydanticUndefined

Validated tool argument specifications

@classmethod
def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type:
245    @classmethod
246    def custom_list_type(cls, item_type: Type, item_type_str: str) -> Type:
247        mapping = {str: "string", int: "integer", float: "number", bool: "boolean"}
248        json_item_type = mapping.get(item_type, "string")
249        
250        class CustomList(list):
251            @classmethod
252            def __get_pydantic_core_schema__(cls, source: Any, handler: Any) -> Any:
253                # Use the default core schema for lists.
254                return handler.generate_schema(list)
255    
256            @classmethod
257            def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> Dict[str, Any]:
258                # Get the default JSON schema from the core schema.
259                json_schema = handler(core_schema)
260                # Ensure that the "items" property exists.
261                json_schema.setdefault("items", {})
262                # Add the "type" to the items if it isn't already provided.
263                if "type" not in json_schema["items"]:
264                    json_schema["items"]["type"] = json_item_type
265                # Add the custom description.
266                json_schema["items"]["description"] = f"Item of type {item_type_str}"
267                return json_schema
268    
269        return CustomList
@classmethod
def to_pydantic_schema( cls, arguments: Dict[str, Dict[str, Any]]) -> Type[pydantic.main.BaseModel]:
273    @classmethod
274    def to_pydantic_schema(cls, arguments: Dict[str, Dict[str, Any]]) -> Type[BaseModel]:
275        """
276        Converts argument definitions into a dynamic Pydantic model.
277        For list fields with primitive items, we use a custom list type that populates
278        the "items" schema field properly.
279        """
280        schema_fields = {}
281        type_mapping = {
282            'int': int, 'str': str, 'float': float, 'bool': bool,
283            'integer': int, 'string': str, 'number': float, 'boolean': bool,
284            'list': list, 'array': list,  # We'll override list fields below.
285            'dict': Dict[str, Any], 'object': Dict[str, Any],
286        }
287
288        for arg_name, arg_details in arguments.items():
289            if not isinstance(arg_details, dict):
290                continue
291
292            arg_type_str = arg_details.get('type', 'string')
293            description = arg_details.get('description', '')
294            is_required = arg_details.get('required', True)
295            default_value = arg_details.get('default', None)
296
297            # Determine the field type.
298            if arg_type_str in ('dict', 'object'):
299                schema_field_type = cls._create_nested_model_recursive(f"Nested_{arg_name}", arg_details)
300            elif arg_type_str in ('list', 'array'):
301                item_info = arg_details.get('items', {})
302                item_type_str = item_info.get('type', 'any')
303                if item_type_str == 'any':
304                    item_type_str = 'str'
305                if item_type_str in ('dict', 'object'):
306                    item_model = cls._create_nested_model_recursive(f"ListItem_{arg_name}", item_info)
307                    schema_field_type = List[item_model]  # Use standard list of nested model.
308                else:
309                    primitive_type = type_mapping.get(item_type_str, str)
310                    # Instead of List[primitive_type], use a custom list type.
311                    schema_field_type = cls.custom_list_type(primitive_type, item_type_str)
312            else:
313                schema_field_type = type_mapping.get(arg_type_str, str)
314
315            # Append default info into description.
316            final_description = description
317            if default_value is not None:
318                final_description += f" (Defaults to {repr(default_value)})"
319                arg_details.pop('default', None)
320
321            field_kwargs = {"description": final_description}
322            if not is_required:
323                field_kwargs["default"] = None
324                schema_field_type = Optional[schema_field_type]
325
326            schema_fields[arg_name] = (schema_field_type, Field(**field_kwargs))
327
328        model_name = f"DynamicArgsSchema_{id(arguments)}"
329        
330        # Handle both Pydantic 2.10.6 and 2.11+ syntax
331        if PYDANTIC_2_11_PLUS:
332            # Pydantic 2.11+ - use __config_class__ with ConfigDict
333            return create_model(model_name, __config_class__=ForbidExtraConfig, **schema_fields)
334        else:
335            # Pydantic 2.10.6 - use __config__ with class
336            return create_model(model_name, __config__=ForbidExtraConfig, **schema_fields)

Converts argument definitions into a dynamic Pydantic model. For list fields with primitive items, we use a custom list type that populates the "items" schema field properly.

@classmethod
def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
411    @classmethod
412    def validate_arguments(cls, arguments: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
413        """
414        Validates the argument definitions, ensuring they have required metadata
415        and supported types. Returns the validated arguments with defaults moved to descriptions.
416        """
417        known_meta_keys = {'name', 'type', 'description', 'required', 'default', 'items', 'additionalProperties', 'nested'}
418        supported_types = ['int', 'str', 'float', 'bool', 'list', 'dict', 'integer', 'string', 'number', 'boolean', 'array', 'object']
419        
420        for arg_name, arg_details in arguments.items():
421            if not isinstance(arg_details, dict):
422                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} must be a dictionary")
423                
424            # Ensure required keys are present
425            if not all(key in arg_details for key in ['name', 'type', 'description']):
426                missing = [k for k in ['name', 'type', 'description'] if k not in arg_details]
427                raise ValueError(f"ToolArgsSchema Error: Argument {arg_name} missing required metadata: {missing}")
428
429            # Add default 'required=True' if missing
430            if 'required' not in arg_details:
431                arg_details['required'] = True
432                
433            # Validate that 'required' is a boolean
434            if not isinstance(arg_details['required'], bool):
435                raise ValueError(f"ToolArgsSchema Error: The 'required' field for {arg_name} must be a boolean")
436                
437            # Validate type is supported
438            if arg_details['type'] not in supported_types:
439                raise ValueError(f"ToolArgsSchema Error: Unsupported type for {arg_name}: {arg_details['type']}")
440                
441            # Move default to description if present
442            if 'default' in arg_details:
443                default_value = arg_details['default']
444                arg_details['description'] += f" (Defaults to {repr(default_value)})"
445                del arg_details['default']
446                
447            # Process any nested structure the same way
448            if 'nested' in arg_details:
449                # Validate the nested structure
450                nested = arg_details['nested']
451                if not isinstance(nested, dict):
452                    raise ValueError(f"ToolArgsSchema Error: 'nested' in {arg_name} must be a dictionary")
453                
454                # Validate each nested item's arguments
455                for item_key, item_args in nested.items():
456                    if not isinstance(item_args, dict):
457                        raise ValueError(f"ToolArgsSchema Error: Arguments for '{item_key}' in {arg_name} must be a dictionary")
458                    
459                    # Validate each argument
460                    for sub_arg_name, sub_arg_def in item_args.items():
461                        if not isinstance(sub_arg_def, dict):
462                            raise ValueError(f"ToolArgsSchema Error: Definition for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
463                        
464                        # Ensure required metadata is present
465                        if 'type' not in sub_arg_def:
466                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'type'")
467                        
468                        if 'description' not in sub_arg_def:
469                            raise ValueError(f"ToolArgsSchema Error: '{arg_name}.{item_key}.{sub_arg_name}' missing required 'description'")
470                        
471                        # Validate type is supported
472                        if sub_arg_def['type'] not in supported_types:
473                            raise ValueError(f"ToolArgsSchema Error: Unsupported type for '{arg_name}.{item_key}.{sub_arg_name}': {sub_arg_def['type']}")
474                        
475                        # Add default 'required=False' if missing for nested properties
476                        if 'required' not in sub_arg_def:
477                            sub_arg_def['required'] = False
478                            
479                        # Validate that 'required' is a boolean
480                        if not isinstance(sub_arg_def['required'], bool):
481                            raise ValueError(f"ToolArgsSchema Error: The 'required' field for '{arg_name}.{item_key}.{sub_arg_name}' must be a boolean")
482                        
483                        # Move default to description if present
484                        if 'default' in sub_arg_def:
485                            default_value = sub_arg_def['default']
486                            sub_arg_def['description'] += f" (Defaults to {repr(default_value)})"
487                            del sub_arg_def['default']
488                        
489                        # Recursively process any nested structure within this
490                        if sub_arg_def['type'] in ['dict', 'object'] and 'nested' in sub_arg_def:
491                            cls.validate_arguments({f"{arg_name}.{item_key}.{sub_arg_name}": sub_arg_def})
492                        
493                        # Handle nested validations for complex types
494                        if sub_arg_def['type'] in ['list', 'array'] and 'items' in sub_arg_def:
495                            items = sub_arg_def['items']
496                            if not isinstance(items, dict):
497                                raise ValueError(f"ToolArgsSchema Error: 'items' for '{arg_name}.{item_key}.{sub_arg_name}' must be a dictionary")
498                                
499                            if 'type' in items and items['type'] not in supported_types and items['type'] != 'any':
500                                raise ValueError(f"ToolArgsSchema Error: Unsupported item type for '{arg_name}.{item_key}.{sub_arg_name}': {items['type']}")
501                
502            # Recursively validate nested structures (regular pattern)
503            if arg_details['type'] in ['dict', 'object']:
504                # Check for nested definitions and validate them
505                for key, value in arg_details.items():
506                    if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
507                        # This is a nested property - validate it as well
508                        cls._validate_nested_property(f"{arg_name}.{key}", value)
509                        
510            # Validate list item type if it's a list
511            if arg_details['type'] in ['list', 'array']:
512                if 'items' not in arg_details:
513                    arg_details['items'] = {'type': 'any'}  # Default to Any if not specified
514                elif not isinstance(arg_details['items'], dict):
515                    raise ValueError(f"ToolArgsSchema Error: 'items' for {arg_name} must be a dictionary")
516                elif 'type' not in arg_details['items']:
517                    arg_details['items']['type'] = 'any'  # Default to Any if type not specified
518                
519                items = arg_details['items']
520                item_type = items.get('type')
521                
522                if item_type not in supported_types and item_type != 'any':
523                    raise ValueError(f"ToolArgsSchema Error: Unsupported item type for {arg_name}: {item_type}")
524                
525                if item_type in ['dict', 'object']:
526                    # Add description to items if missing
527                    if 'description' not in items:
528                        items['description'] = f"Item of {arg_name} list"
529                    
530                    # Recursively validate nested item properties if they exist
531                    for key, value in items.items():
532                        if key not in known_meta_keys and isinstance(value, dict) and 'type' in value:
533                            cls._validate_nested_property(f"{arg_name}[].{key}", value)
534        
535        return arguments

Validates the argument definitions, ensuring they have required metadata and supported types. Returns the validated arguments with defaults moved to descriptions.

class BaseHeavenTool(abc.ABC):
 622class BaseHeavenTool(ABC):
 623    """Provider-agnostic tool base class with standardized results"""
 624    # Required class attributes that tools must define
 625    name: str
 626    description: str
 627    func: Callable
 628    args_schema: Type[ToolArgsSchema] 
 629    base_tool: BaseTool 
 630    is_async: bool 
 631
 632    def __init__(
 633        self, 
 634        base_tool: BaseTool,
 635        args_schema: Type[ToolArgsSchema],
 636        is_async: bool = False
 637    ):
 638        # Set properties (no more super() call since we're not inheriting from BaseTool)
 639        self.args_schema = args_schema
 640        self.base_tool = base_tool
 641        self.is_async = is_async
 642
 643        # Create instance and validate arguments schema
 644        schema_instance = args_schema()
 645        ToolArgsSchema.validate_arguments(schema_instance.arguments)
 646    
 647    def _clean_kwargs(self, obj: Any) -> Any:
 648        """
 649        Recursively return a *copy* of `obj` in which every mapping key that looks
 650        like "'some_key'" is replaced by "some_key".
 651
 652        • Works for any Mapping subclass (dict, OrderedDict, defaultdict, …).
 653        • Keeps list / tuple / set semantics identical.
 654        • Primitives and values are left untouched.
 655        """
 656        # ---- Handle any mapping (dict-like) ---------------------------
 657        if isinstance(obj, Mapping):
 658            cleaned_items = {}
 659            for k, v in obj.items():
 660                new_k = k
 661                if (
 662                    isinstance(k, str)
 663                    and k.startswith("'")
 664                    and k.endswith("'")
 665                ):
 666                    stripped = k[1:-1]
 667                    if stripped != k:
 668                        # logging.warning(
 669                        #     "Sanitizing malformed tool-argument key: %r → %r",
 670                        #     k, stripped
 671                        # )
 672                        new_k = stripped
 673
 674                cleaned_items[new_k] = self._clean_kwargs(v)  # recurse on value
 675
 676            # Rebuild using the same mapping type when possible
 677            try:
 678                return type(obj)(cleaned_items)
 679            except Exception:
 680                return cleaned_items
 681
 682        # ---- Handle iterables that can hold nested mappings -----------
 683        if isinstance(obj, (list, tuple, set)):
 684            cleaned_iter = [self._clean_kwargs(item) for item in obj]
 685            return type(obj)(cleaned_iter)
 686
 687        # ---- Walk other iterables (e.g., generators) ------------------
 688        if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)):
 689            return type(obj)(self._clean_kwargs(item) for item in obj)
 690
 691        # ---- Base case ------------------------------------------------
 692        return obj
 693
 694        
 695    def _run(
 696        self,
 697        run_manager: Optional[CallbackManagerForToolRun] = None,
 698        config: Optional[RunnableConfig] = None,
 699        **kwargs
 700    ) -> ToolResult:
 701        """Synchronous execution path using wrapped base tool"""
 702        cleaned_kwargs = self._clean_kwargs(kwargs)
 703        if cleaned_kwargs:
 704            kwargs = cleaned_kwargs
 705        try:
 706            if self.is_async:
 707                raise ValueError("Tool marked as async but _run was called")
 708
 709            if hasattr(self.base_tool, '_run'):
 710                result = self.base_tool._run(run_manager=run_manager, config=config, **kwargs)
 711            else:
 712                # Fallback to func if _run is not available
 713                result = self.base_tool.func(**kwargs)
 714
 715   
 716            if isinstance(result, ToolResult):
 717                return result  # Already a ToolResult (includes CLIResult)
 718            return ToolResult(output=str(result))
 719        except ToolError as e:
 720            return ToolResult(error=str(e))
 721        except Exception as e:
 722            return ToolResult(error=f"Error in tool '{self.name}': {e}")
 723          
 724    async def _arun(
 725        self,
 726        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
 727        config: Optional[RunnableConfig] = None,
 728        **kwargs
 729    ) -> ToolResult:
 730        """Asynchronous execution path using wrapped base tool"""
 731        cleaned_kwargs = self._clean_kwargs(kwargs)
 732        if cleaned_kwargs:
 733            kwargs = cleaned_kwargs
 734        try:
 735          
 736            # Improved async handling
 737            if not self.is_async:
 738                # Use asyncio.to_thread for sync functions
 739                import asyncio
 740                
 741                result = await asyncio.to_thread(self.base_tool.func, **kwargs)
 742            else:
 743                # Check for native async methods
 744                if hasattr(self.base_tool, '_arun'):
 745                    result = await self.base_tool._arun(run_manager=run_manager, config=config, **kwargs)
 746                elif hasattr(self.base_tool, 'arun'):
 747                    result = await self.base_tool.arun(**kwargs)
 748                else:
 749                    
 750                    # Check if the function itself is async
 751                    import inspect
 752                    if inspect.iscoroutinefunction(self.base_tool.func):
 753                        result = await self.base_tool.func(**kwargs)
 754                    else:
 755                        # Fallback to thread-based execution only for sync functions
 756                        import asyncio
 757                        result = await asyncio.to_thread(self.base_tool.func, **kwargs)
 758   
 759              # Handle result types
 760            if isinstance(result, ToolResult):
 761                return result  # Already a ToolResult (includes CLIResult)
 762            return ToolResult(output=str(result))
 763        except ToolError as e:
 764            return ToolResult(error=str(e))
 765        except Exception as e:
 766            return ToolResult(error=f"Error in tool '{self.name}': {e}")
 767   
 768    def get_spec(self) -> dict:
 769        """Get this tool's specification"""
 770        return {
 771            "name": self.name,
 772            "description": self.description,
 773            "args": self.args_schema().arguments  # That's it. The dictionary is already there.
 774        }
 775
 776    @classmethod
 777    def to_openai_function(cls):
 778        """Convert the tool's schema to OpenAI function format"""
 779        # Create the Pydantic schema
 780        schema_instance = cls.args_schema()
 781        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
 782        # Use LangChain's built-in converter
 783        openai_function = convert_to_openai_tool(
 784            Tool(
 785                name=cls.name,
 786                description=cls.description,
 787                func=cls.func,
 788                args_schema=pydantic_schema
 789            )
 790        )
 791        
 792        return openai_function
 793      
 794    def get_openai_function(self):
 795        """Get the OpenAI function specification"""
 796        return self.__class__.to_openai_function()   
 797  
 798    @classmethod
 799    def create_adk_tool(cls, func):
 800        from google.adk.tools import FunctionTool, BaseTool
 801    
 802        # Step 1: Get original schema and dynamic Pydantic model
 803        schema_instance = cls.args_schema()
 804        DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments)
 805        flat_schema = generate_dereferenced_schema(DynamicModel)
 806        prop_schemas = flat_schema.get("properties", {})
 807        top_required = flat_schema.get("required", [])
 808        original_defs = schema_instance.arguments
 809    
 810        # Step 2: Build wrapped function with proper signature (from flattened schema)
 811        import inspect
 812        from typing import Optional, get_args, get_origin
 813    
 814        param_defs = []
 815        for name, definition in prop_schemas.items():
 816            typ = definition.get("type", "string")
 817            is_required = name in top_required
 818            default = None if not is_required else inspect._empty
 819    
 820            # Very basic type mapping
 821            type_map = {
 822                "string": str,
 823                "integer": int,
 824                "number": float,
 825                "boolean": bool,
 826                "array": list,
 827                "object": dict
 828            }
 829            resolved_type = type_map.get(typ, str)
 830            if not is_required:
 831                resolved_type = Optional[resolved_type]
 832    
 833            param_defs.append((name, resolved_type, default))
 834    
 835        # Create the function dynamically
 836        def create_wrapped_function(original_func, func_name, param_defs):
 837            params = []
 838            for name, typ, default in param_defs:
 839                param = inspect.Parameter(
 840                    name,
 841                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
 842                    annotation=typ,
 843                    default=default
 844                )
 845                params.append(param)
 846    
 847            sig = inspect.Signature(params)
 848    
 849            async def wrapper(**kwargs):
 850                import inspect, asyncio
 851
 852                if inspect.iscoroutinefunction(original_func):
 853                    return await original_func(**kwargs)
 854                else:
 855                    # run sync code in thread to avoid blocking
 856                    return await asyncio.to_thread(original_func, **kwargs)
 857                
 858    
 859            wrapper.__name__ = func_name
 860            wrapper.__qualname__ = func_name
 861            wrapper.__signature__ = sig
 862            return wrapper
 863    
 864        wrapped_func = create_wrapped_function(func, cls.name, param_defs)
 865    
 866        # Step 3: Create ADK FunctionTool
 867        orig_tool = FunctionTool(func=wrapped_func)
 868        orig_tool.description = cls.description
 869        # Step 4: Schema fixer merge logic
 870        def merge(adk_prop, pschema, info_def=None):
 871            enum_cls = type(adk_prop.type)
 872            if "anyOf" in pschema:
 873                branches = pschema.pop("anyOf")
 874                if any(b.get("type") == "null" for b in branches):
 875                    adk_prop.nullable = True
 876                arr = next((b for b in branches if b.get("type") == "array"), None)
 877                chosen = arr or next((b for b in branches if b.get("type") != "null"), {})
 878                pschema = chosen
 879    
 880            if isinstance(info_def, dict):
 881                if info_def.get("name"):
 882                    adk_prop.title = info_def["name"]
 883                if not getattr(adk_prop, "description", None) and info_def.get("description"):
 884                    adk_prop.description = info_def["description"]
 885    
 886            if "description" in pschema:
 887                adk_prop.description = pschema["description"]
 888    
 889            if "items" in pschema or pschema.get("type") == "array":
 890                adk_prop.type = enum_cls.ARRAY
 891            elif "type" in pschema:
 892                t = pschema["type"].upper()
 893                if hasattr(enum_cls, t):
 894                    adk_prop.type = getattr(enum_cls, t)
 895    
 896            if adk_prop.type == enum_cls.OBJECT:
 897                adk_prop.properties = adk_prop.properties or {}
 898                # nested_defs = {}
 899                # if isinstance(info_def, dict):
 900                #     if isinstance(info_def.get("nested"), dict):
 901                #         nested_defs = info_def["nested"]
 902                #     else:
 903                #         nested_defs = {
 904                #             k: v for k, v in info_def.items()
 905                #             if isinstance(v, dict) and ("type" in v or "nested" in v)
 906                #         }
 907                # always allow any dict under info_def[key] as child_info
 908                for key, child_ps in pschema.get("properties", {}).items():
 909                    child_info = {}
 910                    if isinstance(info_def, dict) and isinstance(info_def.get(key), dict):
 911                        child_info = info_def[key]
 912                # for key, child_ps in pschema.get("properties", {}).items():
 913                #     child_info = nested_defs.get(key, {})
 914                    if key not in adk_prop.properties:
 915                        Placeholder = type(adk_prop)
 916                        adk_prop.properties[key] = Placeholder.model_construct(
 917                            __pydantic_initialised__=True,
 918                            name=key,
 919                            title=key,
 920                            description="",
 921                            properties=None,
 922                            items=None,
 923                            required=[],
 924                            type=adk_prop.type,
 925                        )
 926                    merge(adk_prop.properties[key], child_ps, child_info)
 927    
 928                # req = pschema.get("required")
 929                # if req is None:
 930                #     req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")]
 931                # adk_prop.required = req or []
 932                # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def
 933                req = pschema.get("required")
 934                if req is None:
 935                        req = []
 936                        if isinstance(info_def, dict):
 937                                req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")]
 938                adk_prop.required = req or []
 939    
 940            elif adk_prop.type == enum_cls.ARRAY:
 941                if adk_prop.items is None:
 942                    Placeholder = type(adk_prop)
 943                    adk_prop.items = Placeholder.model_construct(
 944                        __pydantic_initialised__=True,
 945                        name=adk_prop.title or "",
 946                        title=adk_prop.title or "",
 947                        description=pschema.get("items", {}).get("description", ""),
 948                        properties=None,
 949                        items=None,
 950                        required=[],
 951                        type=adk_prop.type,
 952                    )
 953                merge(adk_prop.items, pschema.get("items", {}), None)
 954#### LITELLM ###
 955        def normalize_schema_decl(schema):
 956            from enum import Enum
 957            # `schema` is a google.genai.types.Schema or similar
 958            # 1️⃣ normalize this node’s type
 959            if hasattr(schema, "type") and isinstance(schema.type, Enum):
 960                schema.type = schema.type.name.lower()
 961            # 2️⃣ normalize any nested properties
 962            if getattr(schema, "properties", None):
 963                for prop in schema.properties.values():
 964                    normalize_schema_decl(prop)
 965            # 3️⃣ normalize array items
 966            if getattr(schema, "items", None):
 967                normalize_schema_decl(schema.items)
 968            return schema
 969#### LITELLM ####
 970        # Step 5: Wrap in ADK BaseTool with fixed declaration
 971        class RequiredFieldsFixTool(BaseTool):
 972            def __init__(self, orig_tool):
 973                self._orig_tool = orig_tool
 974                super().__init__(name=orig_tool.name, description=orig_tool.description)
 975    
 976            async def run_async(self, **kwargs):
 977                try:
 978                    # 1) invoke the underlying tool (may be sync or async)
 979                    raw = self._orig_tool.run_async(**kwargs)
 980                    result = await raw if inspect.isawaitable(raw) else raw
 981        
 982                    # 2) if it's already a ToolResult or CLIResult, just return it
 983                    if isinstance(result, ToolResult) or isinstance(result, CLIResult):
 984                        return result
 985        
 986                    # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields
 987                    if isinstance(result, dict):
 988                        # ADK wraps tool output under result["response"]["result"]
 989                        resp = result.get("response", result)
 990                        payload = resp.get("result", resp) if isinstance(resp, dict) else resp
 991        
 992                        # payload may itself be a dict containing output, error, base64_image, etc.
 993                        if isinstance(payload, dict):
 994                            return ToolResult(
 995                                output=payload.get("output", ""),
 996                                error=payload.get("error", None),
 997                                base64_image=payload.get("base64_image", None),
 998                                system=payload.get("system", None),
 999                            )
1000                        # otherwise treat payload as a plain string
1001                        return ToolResult(output=str(payload))
1002        
1003                    # 4) if it's just a string, wrap it
1004                    if isinstance(result, str):
1005                        return ToolResult(output=result)
1006        
1007                    # 5) fallback for anything else
1008                    return ToolResult(output=str(result))
1009        
1010                except ToolError as e:
1011                    return ToolResult(error=str(e))
1012                except Exception as e:
1013                    return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}")
1014                  
1015            def _get_declaration(self):
1016                decl = self._orig_tool._get_declaration()
1017                
1018                decl.description = self.description
1019                if not (decl and decl.parameters and decl.parameters.properties):
1020                    return decl
1021    
1022                decl.parameters.required = top_required
1023    
1024                for key, ps in prop_schemas.items():
1025                    prop = decl.parameters.properties.get(key)
1026                    if not prop:
1027                        continue
1028                    merge(prop, ps, original_defs.get(key))
1029                ### LITE LLM
1030                # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings
1031                # from enum import Enum
1032                # for prop in decl.parameters.properties.values():
1033                #    # only convert ADK enum values
1034                #     if isinstance(prop.type, Enum):
1035                #        prop.type = prop.type.name.lower()
1036                #    # handle array items too
1037                #     if getattr(prop, "items", None) and isinstance(prop.items.type, Enum):
1038                #         prop.items.type = prop.items.type.name.lower()
1039                normalize_schema_decl(decl.parameters)
1040                ###
1041                return decl
1042    
1043        return RequiredFieldsFixTool(orig_tool)
1044
1045
1046  
1047    @classmethod
1048    def create(cls, adk: bool = False):
1049        """Create a tool instance using class attributes"""
1050        # with open(tool_log_path, 'a') as f:
1051        #         f.write("\n\ncreate entered!\n")
1052       
1053        # Create an instance of the schema
1054        
1055        schema_instance = cls.args_schema()
1056    
1057        # # Use your existing Pydantic schema generator
1058        
1059        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1060        
1061        
1062        
1063        if adk:
1064            
1065            return cls.create_adk_tool(cls.func) # new method
1066                   
1067        #### LANGCHAIN
1068        else:
1069            
1070            schema_instance = cls.args_schema()
1071            pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1072            if cls.is_async:
1073                def sync_stub(**kwargs):
1074                    raise NotImplementedError("This tool is async only")
1075    
1076                base_tool = Tool(
1077                    name=cls.name,
1078                    description=cls.description,
1079                    func=sync_stub,
1080                    coroutine=cls.func,
1081                    args_schema=pydantic_schema
1082                )
1083            else:
1084                base_tool = Tool(
1085                    name=cls.name,
1086                    description=cls.description,
1087                    func=cls.func,
1088                    args_schema=pydantic_schema
1089                )
1090    
1091           
1092            # return instance
1093            return cls(
1094                base_tool=base_tool,
1095                args_schema=cls.args_schema,
1096                is_async=cls.is_async
1097            )

Provider-agnostic tool base class with standardized results

BaseHeavenTool( base_tool: langchain_core.tools.base.BaseTool, args_schema: Type[ToolArgsSchema], is_async: bool = False)
632    def __init__(
633        self, 
634        base_tool: BaseTool,
635        args_schema: Type[ToolArgsSchema],
636        is_async: bool = False
637    ):
638        # Set properties (no more super() call since we're not inheriting from BaseTool)
639        self.args_schema = args_schema
640        self.base_tool = base_tool
641        self.is_async = is_async
642
643        # Create instance and validate arguments schema
644        schema_instance = args_schema()
645        ToolArgsSchema.validate_arguments(schema_instance.arguments)
name: str
description: str
func: Callable
args_schema: Type[ToolArgsSchema]
base_tool: langchain_core.tools.base.BaseTool
is_async: bool
def get_spec(self) -> dict:
768    def get_spec(self) -> dict:
769        """Get this tool's specification"""
770        return {
771            "name": self.name,
772            "description": self.description,
773            "args": self.args_schema().arguments  # That's it. The dictionary is already there.
774        }

Get this tool's specification

@classmethod
def to_openai_function(cls):
776    @classmethod
777    def to_openai_function(cls):
778        """Convert the tool's schema to OpenAI function format"""
779        # Create the Pydantic schema
780        schema_instance = cls.args_schema()
781        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
782        # Use LangChain's built-in converter
783        openai_function = convert_to_openai_tool(
784            Tool(
785                name=cls.name,
786                description=cls.description,
787                func=cls.func,
788                args_schema=pydantic_schema
789            )
790        )
791        
792        return openai_function

Convert the tool's schema to OpenAI function format

def get_openai_function(self):
794    def get_openai_function(self):
795        """Get the OpenAI function specification"""
796        return self.__class__.to_openai_function()   

Get the OpenAI function specification

@classmethod
def create_adk_tool(cls, func):
 798    @classmethod
 799    def create_adk_tool(cls, func):
 800        from google.adk.tools import FunctionTool, BaseTool
 801    
 802        # Step 1: Get original schema and dynamic Pydantic model
 803        schema_instance = cls.args_schema()
 804        DynamicModel = schema_instance.to_pydantic_schema(schema_instance.arguments)
 805        flat_schema = generate_dereferenced_schema(DynamicModel)
 806        prop_schemas = flat_schema.get("properties", {})
 807        top_required = flat_schema.get("required", [])
 808        original_defs = schema_instance.arguments
 809    
 810        # Step 2: Build wrapped function with proper signature (from flattened schema)
 811        import inspect
 812        from typing import Optional, get_args, get_origin
 813    
 814        param_defs = []
 815        for name, definition in prop_schemas.items():
 816            typ = definition.get("type", "string")
 817            is_required = name in top_required
 818            default = None if not is_required else inspect._empty
 819    
 820            # Very basic type mapping
 821            type_map = {
 822                "string": str,
 823                "integer": int,
 824                "number": float,
 825                "boolean": bool,
 826                "array": list,
 827                "object": dict
 828            }
 829            resolved_type = type_map.get(typ, str)
 830            if not is_required:
 831                resolved_type = Optional[resolved_type]
 832    
 833            param_defs.append((name, resolved_type, default))
 834    
 835        # Create the function dynamically
 836        def create_wrapped_function(original_func, func_name, param_defs):
 837            params = []
 838            for name, typ, default in param_defs:
 839                param = inspect.Parameter(
 840                    name,
 841                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
 842                    annotation=typ,
 843                    default=default
 844                )
 845                params.append(param)
 846    
 847            sig = inspect.Signature(params)
 848    
 849            async def wrapper(**kwargs):
 850                import inspect, asyncio
 851
 852                if inspect.iscoroutinefunction(original_func):
 853                    return await original_func(**kwargs)
 854                else:
 855                    # run sync code in thread to avoid blocking
 856                    return await asyncio.to_thread(original_func, **kwargs)
 857                
 858    
 859            wrapper.__name__ = func_name
 860            wrapper.__qualname__ = func_name
 861            wrapper.__signature__ = sig
 862            return wrapper
 863    
 864        wrapped_func = create_wrapped_function(func, cls.name, param_defs)
 865    
 866        # Step 3: Create ADK FunctionTool
 867        orig_tool = FunctionTool(func=wrapped_func)
 868        orig_tool.description = cls.description
 869        # Step 4: Schema fixer merge logic
 870        def merge(adk_prop, pschema, info_def=None):
 871            enum_cls = type(adk_prop.type)
 872            if "anyOf" in pschema:
 873                branches = pschema.pop("anyOf")
 874                if any(b.get("type") == "null" for b in branches):
 875                    adk_prop.nullable = True
 876                arr = next((b for b in branches if b.get("type") == "array"), None)
 877                chosen = arr or next((b for b in branches if b.get("type") != "null"), {})
 878                pschema = chosen
 879    
 880            if isinstance(info_def, dict):
 881                if info_def.get("name"):
 882                    adk_prop.title = info_def["name"]
 883                if not getattr(adk_prop, "description", None) and info_def.get("description"):
 884                    adk_prop.description = info_def["description"]
 885    
 886            if "description" in pschema:
 887                adk_prop.description = pschema["description"]
 888    
 889            if "items" in pschema or pschema.get("type") == "array":
 890                adk_prop.type = enum_cls.ARRAY
 891            elif "type" in pschema:
 892                t = pschema["type"].upper()
 893                if hasattr(enum_cls, t):
 894                    adk_prop.type = getattr(enum_cls, t)
 895    
 896            if adk_prop.type == enum_cls.OBJECT:
 897                adk_prop.properties = adk_prop.properties or {}
 898                # nested_defs = {}
 899                # if isinstance(info_def, dict):
 900                #     if isinstance(info_def.get("nested"), dict):
 901                #         nested_defs = info_def["nested"]
 902                #     else:
 903                #         nested_defs = {
 904                #             k: v for k, v in info_def.items()
 905                #             if isinstance(v, dict) and ("type" in v or "nested" in v)
 906                #         }
 907                # always allow any dict under info_def[key] as child_info
 908                for key, child_ps in pschema.get("properties", {}).items():
 909                    child_info = {}
 910                    if isinstance(info_def, dict) and isinstance(info_def.get(key), dict):
 911                        child_info = info_def[key]
 912                # for key, child_ps in pschema.get("properties", {}).items():
 913                #     child_info = nested_defs.get(key, {})
 914                    if key not in adk_prop.properties:
 915                        Placeholder = type(adk_prop)
 916                        adk_prop.properties[key] = Placeholder.model_construct(
 917                            __pydantic_initialised__=True,
 918                            name=key,
 919                            title=key,
 920                            description="",
 921                            properties=None,
 922                            items=None,
 923                            required=[],
 924                            type=adk_prop.type,
 925                        )
 926                    merge(adk_prop.properties[key], child_ps, child_info)
 927    
 928                # req = pschema.get("required")
 929                # if req is None:
 930                #     req = [k for k, v in nested_defs.items() if isinstance(v, dict) and v.get("required")]
 931                # adk_prop.required = req or []
 932                # first try JSON‑schema’s own 'required', otherwise fall back to any dict‑defined 'required' in info_def
 933                req = pschema.get("required")
 934                if req is None:
 935                        req = []
 936                        if isinstance(info_def, dict):
 937                                req = [k for k, v in info_def.items() if isinstance(v, dict) and v.get("required")]
 938                adk_prop.required = req or []
 939    
 940            elif adk_prop.type == enum_cls.ARRAY:
 941                if adk_prop.items is None:
 942                    Placeholder = type(adk_prop)
 943                    adk_prop.items = Placeholder.model_construct(
 944                        __pydantic_initialised__=True,
 945                        name=adk_prop.title or "",
 946                        title=adk_prop.title or "",
 947                        description=pschema.get("items", {}).get("description", ""),
 948                        properties=None,
 949                        items=None,
 950                        required=[],
 951                        type=adk_prop.type,
 952                    )
 953                merge(adk_prop.items, pschema.get("items", {}), None)
 954#### LITELLM ###
 955        def normalize_schema_decl(schema):
 956            from enum import Enum
 957            # `schema` is a google.genai.types.Schema or similar
 958            # 1️⃣ normalize this node’s type
 959            if hasattr(schema, "type") and isinstance(schema.type, Enum):
 960                schema.type = schema.type.name.lower()
 961            # 2️⃣ normalize any nested properties
 962            if getattr(schema, "properties", None):
 963                for prop in schema.properties.values():
 964                    normalize_schema_decl(prop)
 965            # 3️⃣ normalize array items
 966            if getattr(schema, "items", None):
 967                normalize_schema_decl(schema.items)
 968            return schema
 969#### LITELLM ####
 970        # Step 5: Wrap in ADK BaseTool with fixed declaration
 971        class RequiredFieldsFixTool(BaseTool):
 972            def __init__(self, orig_tool):
 973                self._orig_tool = orig_tool
 974                super().__init__(name=orig_tool.name, description=orig_tool.description)
 975    
 976            async def run_async(self, **kwargs):
 977                try:
 978                    # 1) invoke the underlying tool (may be sync or async)
 979                    raw = self._orig_tool.run_async(**kwargs)
 980                    result = await raw if inspect.isawaitable(raw) else raw
 981        
 982                    # 2) if it's already a ToolResult or CLIResult, just return it
 983                    if isinstance(result, ToolResult) or isinstance(result, CLIResult):
 984                        return result
 985        
 986                    # 3) if it's a dict (ADK function tool), dig into the 'response' / 'result' fields
 987                    if isinstance(result, dict):
 988                        # ADK wraps tool output under result["response"]["result"]
 989                        resp = result.get("response", result)
 990                        payload = resp.get("result", resp) if isinstance(resp, dict) else resp
 991        
 992                        # payload may itself be a dict containing output, error, base64_image, etc.
 993                        if isinstance(payload, dict):
 994                            return ToolResult(
 995                                output=payload.get("output", ""),
 996                                error=payload.get("error", None),
 997                                base64_image=payload.get("base64_image", None),
 998                                system=payload.get("system", None),
 999                            )
1000                        # otherwise treat payload as a plain string
1001                        return ToolResult(output=str(payload))
1002        
1003                    # 4) if it's just a string, wrap it
1004                    if isinstance(result, str):
1005                        return ToolResult(output=result)
1006        
1007                    # 5) fallback for anything else
1008                    return ToolResult(output=str(result))
1009        
1010                except ToolError as e:
1011                    return ToolResult(error=str(e))
1012                except Exception as e:
1013                    return ToolResult(error=f"Unhandled error in tool '{self.name}': {e}")
1014                  
1015            def _get_declaration(self):
1016                decl = self._orig_tool._get_declaration()
1017                
1018                decl.description = self.description
1019                if not (decl and decl.parameters and decl.parameters.properties):
1020                    return decl
1021    
1022                decl.parameters.required = top_required
1023    
1024                for key, ps in prop_schemas.items():
1025                    prop = decl.parameters.properties.get(key)
1026                    if not prop:
1027                        continue
1028                    merge(prop, ps, original_defs.get(key))
1029                ### LITE LLM
1030                # 2) **Post‑process**: lower‑case enum types into JSON‑schema strings
1031                # from enum import Enum
1032                # for prop in decl.parameters.properties.values():
1033                #    # only convert ADK enum values
1034                #     if isinstance(prop.type, Enum):
1035                #        prop.type = prop.type.name.lower()
1036                #    # handle array items too
1037                #     if getattr(prop, "items", None) and isinstance(prop.items.type, Enum):
1038                #         prop.items.type = prop.items.type.name.lower()
1039                normalize_schema_decl(decl.parameters)
1040                ###
1041                return decl
1042    
1043        return RequiredFieldsFixTool(orig_tool)
@classmethod
def create(cls, adk: bool = False):
1047    @classmethod
1048    def create(cls, adk: bool = False):
1049        """Create a tool instance using class attributes"""
1050        # with open(tool_log_path, 'a') as f:
1051        #         f.write("\n\ncreate entered!\n")
1052       
1053        # Create an instance of the schema
1054        
1055        schema_instance = cls.args_schema()
1056    
1057        # # Use your existing Pydantic schema generator
1058        
1059        pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1060        
1061        
1062        
1063        if adk:
1064            
1065            return cls.create_adk_tool(cls.func) # new method
1066                   
1067        #### LANGCHAIN
1068        else:
1069            
1070            schema_instance = cls.args_schema()
1071            pydantic_schema = cls.args_schema.to_pydantic_schema(schema_instance.arguments)
1072            if cls.is_async:
1073                def sync_stub(**kwargs):
1074                    raise NotImplementedError("This tool is async only")
1075    
1076                base_tool = Tool(
1077                    name=cls.name,
1078                    description=cls.description,
1079                    func=sync_stub,
1080                    coroutine=cls.func,
1081                    args_schema=pydantic_schema
1082                )
1083            else:
1084                base_tool = Tool(
1085                    name=cls.name,
1086                    description=cls.description,
1087                    func=cls.func,
1088                    args_schema=pydantic_schema
1089                )
1090    
1091           
1092            # return instance
1093            return cls(
1094                base_tool=base_tool,
1095                args_schema=cls.args_schema,
1096                is_async=cls.is_async
1097            )

Create a tool instance using class attributes

def calculator_func(a: int, b: int) -> str:
1105def calculator_func(a: int, b: int) -> str:
1106    return str(a * b)
class CalculatorArgsSchema(ToolArgsSchema):
1109class CalculatorArgsSchema(ToolArgsSchema):
1110    arguments: Dict[str, Dict[str, Any]] = {
1111        'a': {
1112            'name': 'a', 
1113            'type': 'int', 
1114            'description': 'First number to multiply',
1115            'required': True
1116        },
1117        'b': {
1118            'name': 'b', 
1119            'type': 'int', 
1120            'description': 'Second number to multiply',
1121            'required': True
1122        }
1123    }

Meta-validator for tool arguments ensuring LangChain compatibility

arguments: Dict[str, Dict[str, Any]] = {'a': {'name': 'a', 'type': 'int', 'description': 'First number to multiply', 'required': True}, 'b': {'name': 'b', 'type': 'int', 'description': 'Second number to multiply', 'required': True}}
class CalculatorTool(BaseHeavenTool):
1126class CalculatorTool(BaseHeavenTool):
1127    name = "calculator"
1128    description = "A tool that multiplies two numbers."
1129    func = calculator_func
1130    args_schema = CalculatorArgsSchema
1131    is_async = False

Provider-agnostic tool base class with standardized results

name = 'calculator'
description = 'A tool that multiplies two numbers.'
def func(a: int, b: int) -> str:
1105def calculator_func(a: int, b: int) -> str:
1106    return str(a * b)
args_schema = <class 'CalculatorArgsSchema'>
is_async = False
def test_tool_func(message: str) -> str:
1134def test_tool_func(message: str) -> str:
1135    return "The test tool returns this str:`ASDON!@DASN(IJ K@!EDWQACKNSXCBUO!LASDKANSLDALSDLK!@`\n\nMake sure to follow the original instructions while responding to this."
class TestToolArgsSchema(ToolArgsSchema):
1138class TestToolArgsSchema(ToolArgsSchema):
1139    arguments: Dict[str, Dict[str, Any]] = {
1140        'message': {
1141            'name': 'message',
1142            'type': 'str',
1143            'description': 'Any message to send to the test tool',
1144            'required': True
1145        }
1146    }

Meta-validator for tool arguments ensuring LangChain compatibility

arguments: Dict[str, Dict[str, Any]] = {'message': {'name': 'message', 'type': 'str', 'description': 'Any message to send to the test tool', 'required': True}}
class TestTool(BaseHeavenTool):
1148class TestTool(BaseHeavenTool):
1149    name = "TestTool"
1150    description = "A test tool that performs a test the user needs performed."
1151    func = test_tool_func
1152    args_schema = TestToolArgsSchema
1153    is_async = False

Provider-agnostic tool base class with standardized results

name = 'TestTool'
description = 'A test tool that performs a test the user needs performed.'
def func(message: str) -> str:
1134def test_tool_func(message: str) -> str:
1135    return "The test tool returns this str:`ASDON!@DASN(IJ K@!EDWQACKNSXCBUO!LASDKANSLDALSDLK!@`\n\nMake sure to follow the original instructions while responding to this."
args_schema = <class 'TestToolArgsSchema'>
is_async = False
def example_util_func(example_arg_return_dict: bool = False) -> str:
1159def example_util_func(example_arg_return_dict: bool = False) -> str:
1160    if return_dict:
1161        return {"example_result_dict": "example text output"}
1162    return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```"
class ExampleToolArgsSchema(ToolArgsSchema):
1167class ExampleToolArgsSchema(ToolArgsSchema):
1168    arguments: Dict[str, Dict[str, Any]] = {
1169        'example_arg_return_dict': {
1170            'name': 'example_arg_return_dict',
1171            'type': 'bool',
1172            'description': 'Set to True to receive a dict and False to get a string',
1173            'required': True
1174        }
1175    }

Meta-validator for tool arguments ensuring LangChain compatibility

arguments: Dict[str, Dict[str, Any]] = {'example_arg_return_dict': {'name': 'example_arg_return_dict', 'type': 'bool', 'description': 'Set to True to receive a dict and False to get a string', 'required': True}}
class ExampleTool(BaseHeavenTool):
1177class ExampleTool(BaseHeavenTool):
1178    name = "ExampleTool"
1179    description = "An example tool that returns an example result"
1180    func = example_util_func
1181    args_schema = ExampleToolArgsSchema
1182    is_async = False

Provider-agnostic tool base class with standardized results

name = 'ExampleTool'
description = 'An example tool that returns an example result'
def func(example_arg_return_dict: bool = False) -> str:
1159def example_util_func(example_arg_return_dict: bool = False) -> str:
1160    if return_dict:
1161        return {"example_result_dict": "example text output"}
1162    return "Congrats! You and the user successfully tested a tool! This is an example tool result that returns an example string.\n\nThis is example text after 2 line breaks.\n\nThis is an example injected instruction in an example markdown fence:\n\n```markdown\n# Example\n\nContinue\n```"
args_schema = <class 'ExampleToolArgsSchema'>
is_async = False
class StrictDict(pydantic.main.BaseModel):
1193class StrictDict(BaseModel):
1194    class Config:
1195        extra = Extra.forbid

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
class StrictDict.Config:
1194    class Config:
1195        extra = Extra.forbid
extra = 'forbid'