Coverage for src/extratools_cloud/common/router.py: 0%
15 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
1from abc import ABC, abstractmethod
2from collections.abc import Iterable
3from typing import Any, cast
6class BaseRouter[RT: Any, TT: Any](ABC):
7 def __init__(
8 self,
9 *,
10 default_target_resource: RT,
11 ) -> None:
12 self._default_target_resource: RT = default_target_resource
14 self.targets: dict[TT, RT] = {}
16 def register_targets(
17 self,
18 resource: RT,
19 targets: Iterable[TT],
20 ) -> None:
21 self.targets.update(dict.fromkeys(targets, resource))
23 @abstractmethod
24 def _route_to_resource[DT: Any](
25 self,
26 data: Iterable[DT],
27 resource: RT,
28 target: TT,
29 ) -> Iterable[Any]:
30 ...
32 def route_to_target[DT: Any](
33 self,
34 data: Iterable[DT],
35 target: TT,
36 ) -> Iterable[Any]:
37 resource: RT = cast("RT", self.targets.get(target, self._default_target_resource))
39 return self._route_to_resource(data, resource, target)