Coverage for src/extratools_cloud/common/router.py: 0%

15 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 09:29 -0700

1from abc import ABC, abstractmethod 

2from collections.abc import Iterable 

3from typing import Any, cast 

4 

5 

6class BaseRouter[RT: Any, TT: Any](ABC): 

7 def __init__( 

8 self, 

9 *, 

10 default_target_resource: RT, 

11 ) -> None: 

12 self._default_target_resource: RT = default_target_resource 

13 

14 self.targets: dict[TT, RT] = {} 

15 

16 def register_targets( 

17 self, 

18 resource: RT, 

19 targets: Iterable[TT], 

20 ) -> None: 

21 self.targets.update(dict.fromkeys(targets, resource)) 

22 

23 @abstractmethod 

24 def _route_to_resource[DT: Any]( 

25 self, 

26 data: Iterable[DT], 

27 resource: RT, 

28 target: TT, 

29 ) -> Iterable[Any]: 

30 ... 

31 

32 def route_to_target[DT: Any]( 

33 self, 

34 data: Iterable[DT], 

35 target: TT, 

36 ) -> Iterable[Any]: 

37 resource: RT = cast("RT", self.targets.get(target, self._default_target_resource)) 

38 

39 return self._route_to_resource(data, resource, target)