Coverage for src\funcall\funcall.py: 76%

134 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 12:46 +0900

1import asyncio 

2import concurrent.futures 

3import inspect 

4import json 

5from collections.abc import Callable 

6from logging import getLogger 

7from typing import Literal, Union, get_type_hints 

8 

9import litellm 

10from openai.types.responses import ( 

11 FunctionToolParam, 

12 ResponseFunctionToolCall, 

13) 

14from pydantic import BaseModel 

15 

16from funcall.types import is_context_type 

17 

18from .metadata import generate_function_metadata 

19 

20 

21def _convert_argument_type(value: list, hint: type) -> object: 

22 """ 

23 Convert argument values to match expected types. 

24 

25 Args: 

26 value: The value to convert 

27 hint: The type hint to convert to 

28 

29 Returns: 

30 Converted value 

31 """ 

32 origin = getattr(hint, "__origin__", None) 

33 result = value 

34 if origin in (list, set, tuple): 

35 args = getattr(hint, "__args__", []) 

36 item_type = args[0] if args else str 

37 result = [_convert_argument_type(v, item_type) for v in value] 

38 elif origin is dict: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 result = value 

40 elif getattr(hint, "__origin__", None) is Union: 

41 args = getattr(hint, "__args__", []) 

42 non_none_types = [a for a in args if a is not type(None)] 

43 result = _convert_argument_type(value, non_none_types[0]) if len(non_none_types) == 1 else value 

44 elif isinstance(hint, type) and BaseModel and issubclass(hint, BaseModel): 

45 if isinstance(value, dict): 45 ↛ 50line 45 didn't jump to line 50 because the condition on line 45 was always true

46 fields = hint.model_fields 

47 converted_data = {k: _convert_argument_type(v, fields[k].annotation) if k in fields else v for k, v in value.items()} # type: ignore 

48 result = hint(**converted_data) 

49 else: 

50 result = value 

51 elif hasattr(hint, "__dataclass_fields__"): 

52 if isinstance(value, dict): 52 ↛ 57line 52 didn't jump to line 57 because the condition on line 52 was always true

53 field_types = {f: t.type for f, t in hint.__dataclass_fields__.items()} 

54 converted_data = {k: _convert_argument_type(v, field_types.get(k, type(v))) for k, v in value.items()} 

55 result = hint(**converted_data) 

56 else: 

57 result = value 

58 return result 

59 

60 

61def _is_async_function(func: object) -> bool: 

62 """Check if a function is asynchronous.""" 

63 return inspect.iscoroutinefunction(func) 

64 

65 

66logger = getLogger("funcall") 

67 

68 

69class Funcall: 

70 """Handler for function calling in LLM interactions.""" 

71 

72 def __init__(self, functions: list[Callable] | None = None) -> None: 

73 """ 

74 Initialize the function call handler. 

75 

76 Args: 

77 functions: List of functions to register 

78 """ 

79 self.functions = functions or [] 

80 self.function_registry = {func.__name__: func for func in self.functions} 

81 

82 def get_tools(self, target: Literal["openai", "litellm"] = "openai") -> list[FunctionToolParam]: 

83 """ 

84 Get tool definitions for the specified target platform. 

85 

86 Args: 

87 target: Target platform ("openai" or "litellm") 

88 

89 Returns: 

90 List of function tool parameters 

91 """ 

92 return [generate_function_metadata(func, target) for func in self.functions] # type: ignore 

93 

94 def _prepare_function_execution( 

95 self, 

96 func_name: str, 

97 args: str, 

98 context: object = None, 

99 ) -> tuple[Callable, dict]: 

100 """ 

101 Prepare function call arguments and context injection. 

102 

103 Args: 

104 func_name: Name of the function to call 

105 args: JSON string of function arguments 

106 context: Context object to inject 

107 

108 Returns: 

109 Tuple of (function, prepared_kwargs) 

110 """ 

111 if func_name not in self.function_registry: 

112 msg = f"Function {func_name} not found" 

113 raise ValueError(msg) 

114 

115 func = self.function_registry[func_name] 

116 signature = inspect.signature(func) 

117 type_hints = get_type_hints(func) 

118 arguments = json.loads(args) 

119 

120 # Find non-context parameters 

121 non_context_params = [name for name in signature.parameters if not is_context_type(type_hints.get(name, str))] 

122 

123 # Handle single parameter case 

124 if len(non_context_params) == 1 and (not isinstance(arguments, dict) or set(arguments.keys()) != set(non_context_params)): 

125 arguments = {non_context_params[0]: arguments} 

126 

127 # Prepare final kwargs with type conversion and context injection 

128 prepared_kwargs = {} 

129 for param_name in signature.parameters: 

130 hint = type_hints.get(param_name, str) 

131 

132 if is_context_type(hint): 

133 prepared_kwargs[param_name] = context 

134 elif param_name in arguments: 134 ↛ 129line 134 didn't jump to line 129 because the condition on line 134 was always true

135 prepared_kwargs[param_name] = _convert_argument_type(arguments[param_name], hint) # type: ignore 

136 

137 return func, prepared_kwargs 

138 

139 def _execute_sync_in_async_context(self, func: Callable, kwargs: dict) -> object: 

140 """Execute synchronous function in async context safely.""" 

141 try: 

142 loop = asyncio.get_event_loop() 

143 if loop.is_running(): 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was never true

144 # If already in event loop, use thread pool 

145 with concurrent.futures.ThreadPoolExecutor() as executor: 

146 future = executor.submit(func, **kwargs) 

147 return future.result() 

148 else: 

149 return loop.run_until_complete(func(**kwargs)) 

150 except RuntimeError: 

151 # No event loop exists, create new one 

152 return asyncio.run(func(**kwargs)) 

153 

154 def call_function( 

155 self, 

156 name: str, 

157 arguments: str, 

158 context: object = None, 

159 ) -> object: 

160 """ 

161 Call a function by name with JSON arguments synchronously. 

162 

163 Args: 

164 name: Name of the function to call 

165 arguments: JSON string of function arguments 

166 context: Context object to inject (optional) 

167 

168 Returns: 

169 Function execution result 

170 

171 Raises: 

172 ValueError: If function is not found 

173 json.JSONDecodeError: If arguments are not valid JSON 

174 """ 

175 func, kwargs = self._prepare_function_execution(name, arguments, context) 

176 

177 if _is_async_function(func): 

178 logger.warning( 

179 "Function %s is async but being called synchronously. Consider using call_function_async.", 

180 name, 

181 ) 

182 return self._execute_sync_in_async_context(func, kwargs) 

183 

184 return func(**kwargs) 

185 

186 async def call_function_async( 

187 self, 

188 name: str, 

189 arguments: str, 

190 context: object = None, 

191 ) -> object: 

192 """ 

193 Call a function by name with JSON arguments asynchronously. 

194 

195 Args: 

196 name: Name of the function to call 

197 arguments: JSON string of function arguments 

198 context: Context object to inject (optional) 

199 

200 Returns: 

201 Function execution result 

202 

203 Raises: 

204 ValueError: If function is not found 

205 json.JSONDecodeError: If arguments are not valid JSON 

206 """ 

207 func, kwargs = self._prepare_function_execution(name, arguments, context) 

208 

209 if _is_async_function(func): 209 ↛ 213line 209 didn't jump to line 213 because the condition on line 209 was always true

210 return await func(**kwargs) 

211 

212 # Run sync function in thread pool to avoid blocking event loop 

213 loop = asyncio.get_event_loop() 

214 return await loop.run_in_executor(None, lambda: func(**kwargs)) 

215 

216 def handle_openai_function_call( 

217 self, 

218 call: ResponseFunctionToolCall, 

219 context: object = None, 

220 ) -> object: 

221 """ 

222 Handle OpenAI function call synchronously. 

223 

224 Args: 

225 call: OpenAI function tool call 

226 context: Context object to inject 

227 

228 Returns: 

229 Function execution result 

230 """ 

231 if not isinstance(call, ResponseFunctionToolCall): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 msg = "call must be an instance of ResponseFunctionToolCall" 

233 raise TypeError(msg) 

234 

235 return self.call_function(call.name, call.arguments, context) 

236 

237 async def handle_openai_function_call_async( 

238 self, 

239 call: ResponseFunctionToolCall, 

240 context: object = None, 

241 ) -> object: 

242 """ 

243 Handle OpenAI function call asynchronously. 

244 

245 Args: 

246 call: OpenAI function tool call 

247 context: Context object to inject 

248 

249 Returns: 

250 Function execution result 

251 """ 

252 if not isinstance(call, ResponseFunctionToolCall): 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 msg = "call must be an instance of ResponseFunctionToolCall" 

254 raise TypeError(msg) 

255 

256 return await self.call_function_async(call.name, call.arguments, context) 

257 

258 def handle_litellm_function_call( 

259 self, 

260 call: litellm.ChatCompletionMessageToolCall, 

261 context: object = None, 

262 ) -> object: 

263 """ 

264 Handle LiteLLM function call synchronously. 

265 

266 Args: 

267 call: LiteLLM function tool call 

268 context: Context object to inject 

269 

270 Returns: 

271 Function execution result 

272 """ 

273 if not isinstance(call, litellm.ChatCompletionMessageToolCall): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 msg = "call must be an instance of litellm.ChatCompletionMessageToolCall" 

275 raise TypeError(msg) 

276 if not call.function: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 msg = "call.function must not be None" 

278 raise ValueError(msg) 

279 if not call.function.name: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 msg = "call.function.name must not be empty" 

281 raise ValueError(msg) 

282 return self.call_function( 

283 call.function.name, 

284 call.function.arguments, 

285 context, 

286 ) 

287 

288 async def handle_litellm_function_call_async( 

289 self, 

290 call: litellm.ChatCompletionMessageToolCall, 

291 context: object = None, 

292 ) -> object: 

293 """ 

294 Handle LiteLLM function call asynchronously. 

295 

296 Args: 

297 call: LiteLLM function tool call 

298 context: Context object to inject 

299 

300 Returns: 

301 Function execution result 

302 """ 

303 if not isinstance(call, litellm.ChatCompletionMessageToolCall): 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 msg = "call must be an instance of litellm.ChatCompletionMessageToolCall" 

305 raise TypeError(msg) 

306 if not call.function: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 msg = "call.function must not be None" 

308 raise ValueError(msg) 

309 if not call.function.name: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 msg = "call.function.name must not be empty" 

311 raise ValueError(msg) 

312 return await self.call_function_async( 

313 call.function.name, 

314 call.function.arguments, 

315 context, 

316 ) 

317 

318 def handle_function_call( 

319 self, 

320 call: ResponseFunctionToolCall | litellm.ChatCompletionMessageToolCall, 

321 context: object = None, 

322 ) -> object: 

323 """ 

324 Handle function call synchronously (unified interface). 

325 

326 Args: 

327 call: Function tool call (OpenAI or LiteLLM) 

328 context: Context object to inject 

329 

330 Returns: 

331 Function execution result 

332 """ 

333 if isinstance(call, ResponseFunctionToolCall): 

334 return self.handle_openai_function_call(call, context) 

335 if isinstance(call, litellm.ChatCompletionMessageToolCall): 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true

336 return self.handle_litellm_function_call(call, context) 

337 msg = "call must be an instance of ResponseFunctionToolCall or litellm.ChatCompletionMessageToolCall" 

338 raise TypeError(msg) 

339 

340 async def handle_function_call_async( 

341 self, 

342 call: ResponseFunctionToolCall | litellm.ChatCompletionMessageToolCall, 

343 context: object = None, 

344 ) -> object: 

345 """ 

346 Handle function call asynchronously (unified interface). 

347 

348 Args: 

349 call: Function tool call (OpenAI or LiteLLM) 

350 context: Context object to inject 

351 

352 Returns: 

353 Function execution result 

354 """ 

355 if isinstance(call, ResponseFunctionToolCall): 

356 return await self.handle_openai_function_call_async(call, context) 

357 if isinstance(call, litellm.ChatCompletionMessageToolCall): 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true

358 return await self.handle_litellm_function_call_async(call, context) 

359 msg = "call must be an instance of ResponseFunctionToolCall or litellm.ChatCompletionMessageToolCall" 

360 raise TypeError(msg)