Coverage for intelligence_toolkit/helpers/decorators.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4import random 

5import time 

6from collections.abc import Callable 

7from functools import wraps 

8from typing import Any, TypeVar 

9 

10from intelligence_toolkit.helpers.constants import ( 

11 VECTOR_STORE_MAX_RETRIES, 

12 VECTOR_STORE_MAX_RETRIES_WAIT_TIME, 

13) 

14 

15T = TypeVar("T") 

16 

17 

18def retry_with_backoff( 

19 retries=VECTOR_STORE_MAX_RETRIES, 

20 backoff_in_seconds=VECTOR_STORE_MAX_RETRIES_WAIT_TIME, 

21) -> Callable[[Callable[..., T]], Callable[..., T]]: 

22 def decorator(func) -> Any: 

23 @wraps(func) 

24 def wrapper(*args: Any, **kwargs: Any) -> T: 

25 x = 0 

26 while True: 

27 try: 

28 return func(*args, **kwargs) 

29 except Exception: 

30 if x == retries: 

31 raise 

32 sleep = backoff_in_seconds * 2**x + random.uniform(0, 1) 

33 time.sleep(sleep) 

34 x += 1 

35 

36 return wrapper 

37 

38 return decorator