Coverage for src\funcall\funcall.py: 76%
134 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 12:46 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 12:46 +0900
1import asyncio
2import concurrent.futures
3import inspect
4import json
5from collections.abc import Callable
6from logging import getLogger
7from typing import Literal, Union, get_type_hints
9import litellm
10from openai.types.responses import (
11 FunctionToolParam,
12 ResponseFunctionToolCall,
13)
14from pydantic import BaseModel
16from funcall.types import is_context_type
18from .metadata import generate_function_metadata
21def _convert_argument_type(value: list, hint: type) -> object:
22 """
23 Convert argument values to match expected types.
25 Args:
26 value: The value to convert
27 hint: The type hint to convert to
29 Returns:
30 Converted value
31 """
32 origin = getattr(hint, "__origin__", None)
33 result = value
34 if origin in (list, set, tuple):
35 args = getattr(hint, "__args__", [])
36 item_type = args[0] if args else str
37 result = [_convert_argument_type(v, item_type) for v in value]
38 elif origin is dict: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 result = value
40 elif getattr(hint, "__origin__", None) is Union:
41 args = getattr(hint, "__args__", [])
42 non_none_types = [a for a in args if a is not type(None)]
43 result = _convert_argument_type(value, non_none_types[0]) if len(non_none_types) == 1 else value
44 elif isinstance(hint, type) and BaseModel and issubclass(hint, BaseModel):
45 if isinstance(value, dict): 45 ↛ 50line 45 didn't jump to line 50 because the condition on line 45 was always true
46 fields = hint.model_fields
47 converted_data = {k: _convert_argument_type(v, fields[k].annotation) if k in fields else v for k, v in value.items()} # type: ignore
48 result = hint(**converted_data)
49 else:
50 result = value
51 elif hasattr(hint, "__dataclass_fields__"):
52 if isinstance(value, dict): 52 ↛ 57line 52 didn't jump to line 57 because the condition on line 52 was always true
53 field_types = {f: t.type for f, t in hint.__dataclass_fields__.items()}
54 converted_data = {k: _convert_argument_type(v, field_types.get(k, type(v))) for k, v in value.items()}
55 result = hint(**converted_data)
56 else:
57 result = value
58 return result
61def _is_async_function(func: object) -> bool:
62 """Check if a function is asynchronous."""
63 return inspect.iscoroutinefunction(func)
66logger = getLogger("funcall")
69class Funcall:
70 """Handler for function calling in LLM interactions."""
72 def __init__(self, functions: list[Callable] | None = None) -> None:
73 """
74 Initialize the function call handler.
76 Args:
77 functions: List of functions to register
78 """
79 self.functions = functions or []
80 self.function_registry = {func.__name__: func for func in self.functions}
82 def get_tools(self, target: Literal["openai", "litellm"] = "openai") -> list[FunctionToolParam]:
83 """
84 Get tool definitions for the specified target platform.
86 Args:
87 target: Target platform ("openai" or "litellm")
89 Returns:
90 List of function tool parameters
91 """
92 return [generate_function_metadata(func, target) for func in self.functions] # type: ignore
94 def _prepare_function_execution(
95 self,
96 func_name: str,
97 args: str,
98 context: object = None,
99 ) -> tuple[Callable, dict]:
100 """
101 Prepare function call arguments and context injection.
103 Args:
104 func_name: Name of the function to call
105 args: JSON string of function arguments
106 context: Context object to inject
108 Returns:
109 Tuple of (function, prepared_kwargs)
110 """
111 if func_name not in self.function_registry:
112 msg = f"Function {func_name} not found"
113 raise ValueError(msg)
115 func = self.function_registry[func_name]
116 signature = inspect.signature(func)
117 type_hints = get_type_hints(func)
118 arguments = json.loads(args)
120 # Find non-context parameters
121 non_context_params = [name for name in signature.parameters if not is_context_type(type_hints.get(name, str))]
123 # Handle single parameter case
124 if len(non_context_params) == 1 and (not isinstance(arguments, dict) or set(arguments.keys()) != set(non_context_params)):
125 arguments = {non_context_params[0]: arguments}
127 # Prepare final kwargs with type conversion and context injection
128 prepared_kwargs = {}
129 for param_name in signature.parameters:
130 hint = type_hints.get(param_name, str)
132 if is_context_type(hint):
133 prepared_kwargs[param_name] = context
134 elif param_name in arguments: 134 ↛ 129line 134 didn't jump to line 129 because the condition on line 134 was always true
135 prepared_kwargs[param_name] = _convert_argument_type(arguments[param_name], hint) # type: ignore
137 return func, prepared_kwargs
139 def _execute_sync_in_async_context(self, func: Callable, kwargs: dict) -> object:
140 """Execute synchronous function in async context safely."""
141 try:
142 loop = asyncio.get_event_loop()
143 if loop.is_running(): 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was never true
144 # If already in event loop, use thread pool
145 with concurrent.futures.ThreadPoolExecutor() as executor:
146 future = executor.submit(func, **kwargs)
147 return future.result()
148 else:
149 return loop.run_until_complete(func(**kwargs))
150 except RuntimeError:
151 # No event loop exists, create new one
152 return asyncio.run(func(**kwargs))
154 def call_function(
155 self,
156 name: str,
157 arguments: str,
158 context: object = None,
159 ) -> object:
160 """
161 Call a function by name with JSON arguments synchronously.
163 Args:
164 name: Name of the function to call
165 arguments: JSON string of function arguments
166 context: Context object to inject (optional)
168 Returns:
169 Function execution result
171 Raises:
172 ValueError: If function is not found
173 json.JSONDecodeError: If arguments are not valid JSON
174 """
175 func, kwargs = self._prepare_function_execution(name, arguments, context)
177 if _is_async_function(func):
178 logger.warning(
179 "Function %s is async but being called synchronously. Consider using call_function_async.",
180 name,
181 )
182 return self._execute_sync_in_async_context(func, kwargs)
184 return func(**kwargs)
186 async def call_function_async(
187 self,
188 name: str,
189 arguments: str,
190 context: object = None,
191 ) -> object:
192 """
193 Call a function by name with JSON arguments asynchronously.
195 Args:
196 name: Name of the function to call
197 arguments: JSON string of function arguments
198 context: Context object to inject (optional)
200 Returns:
201 Function execution result
203 Raises:
204 ValueError: If function is not found
205 json.JSONDecodeError: If arguments are not valid JSON
206 """
207 func, kwargs = self._prepare_function_execution(name, arguments, context)
209 if _is_async_function(func): 209 ↛ 213line 209 didn't jump to line 213 because the condition on line 209 was always true
210 return await func(**kwargs)
212 # Run sync function in thread pool to avoid blocking event loop
213 loop = asyncio.get_event_loop()
214 return await loop.run_in_executor(None, lambda: func(**kwargs))
216 def handle_openai_function_call(
217 self,
218 call: ResponseFunctionToolCall,
219 context: object = None,
220 ) -> object:
221 """
222 Handle OpenAI function call synchronously.
224 Args:
225 call: OpenAI function tool call
226 context: Context object to inject
228 Returns:
229 Function execution result
230 """
231 if not isinstance(call, ResponseFunctionToolCall): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 msg = "call must be an instance of ResponseFunctionToolCall"
233 raise TypeError(msg)
235 return self.call_function(call.name, call.arguments, context)
237 async def handle_openai_function_call_async(
238 self,
239 call: ResponseFunctionToolCall,
240 context: object = None,
241 ) -> object:
242 """
243 Handle OpenAI function call asynchronously.
245 Args:
246 call: OpenAI function tool call
247 context: Context object to inject
249 Returns:
250 Function execution result
251 """
252 if not isinstance(call, ResponseFunctionToolCall): 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 msg = "call must be an instance of ResponseFunctionToolCall"
254 raise TypeError(msg)
256 return await self.call_function_async(call.name, call.arguments, context)
258 def handle_litellm_function_call(
259 self,
260 call: litellm.ChatCompletionMessageToolCall,
261 context: object = None,
262 ) -> object:
263 """
264 Handle LiteLLM function call synchronously.
266 Args:
267 call: LiteLLM function tool call
268 context: Context object to inject
270 Returns:
271 Function execution result
272 """
273 if not isinstance(call, litellm.ChatCompletionMessageToolCall): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 msg = "call must be an instance of litellm.ChatCompletionMessageToolCall"
275 raise TypeError(msg)
276 if not call.function: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 msg = "call.function must not be None"
278 raise ValueError(msg)
279 if not call.function.name: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 msg = "call.function.name must not be empty"
281 raise ValueError(msg)
282 return self.call_function(
283 call.function.name,
284 call.function.arguments,
285 context,
286 )
288 async def handle_litellm_function_call_async(
289 self,
290 call: litellm.ChatCompletionMessageToolCall,
291 context: object = None,
292 ) -> object:
293 """
294 Handle LiteLLM function call asynchronously.
296 Args:
297 call: LiteLLM function tool call
298 context: Context object to inject
300 Returns:
301 Function execution result
302 """
303 if not isinstance(call, litellm.ChatCompletionMessageToolCall): 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 msg = "call must be an instance of litellm.ChatCompletionMessageToolCall"
305 raise TypeError(msg)
306 if not call.function: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 msg = "call.function must not be None"
308 raise ValueError(msg)
309 if not call.function.name: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 msg = "call.function.name must not be empty"
311 raise ValueError(msg)
312 return await self.call_function_async(
313 call.function.name,
314 call.function.arguments,
315 context,
316 )
318 def handle_function_call(
319 self,
320 call: ResponseFunctionToolCall | litellm.ChatCompletionMessageToolCall,
321 context: object = None,
322 ) -> object:
323 """
324 Handle function call synchronously (unified interface).
326 Args:
327 call: Function tool call (OpenAI or LiteLLM)
328 context: Context object to inject
330 Returns:
331 Function execution result
332 """
333 if isinstance(call, ResponseFunctionToolCall):
334 return self.handle_openai_function_call(call, context)
335 if isinstance(call, litellm.ChatCompletionMessageToolCall): 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true
336 return self.handle_litellm_function_call(call, context)
337 msg = "call must be an instance of ResponseFunctionToolCall or litellm.ChatCompletionMessageToolCall"
338 raise TypeError(msg)
340 async def handle_function_call_async(
341 self,
342 call: ResponseFunctionToolCall | litellm.ChatCompletionMessageToolCall,
343 context: object = None,
344 ) -> object:
345 """
346 Handle function call asynchronously (unified interface).
348 Args:
349 call: Function tool call (OpenAI or LiteLLM)
350 context: Context object to inject
352 Returns:
353 Function execution result
354 """
355 if isinstance(call, ResponseFunctionToolCall):
356 return await self.handle_openai_function_call_async(call, context)
357 if isinstance(call, litellm.ChatCompletionMessageToolCall): 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true
358 return await self.handle_litellm_function_call_async(call, context)
359 msg = "call must be an instance of ResponseFunctionToolCall or litellm.ChatCompletionMessageToolCall"
360 raise TypeError(msg)