Coverage for src/extratools_core/itertools.py: 98%
57 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-06 23:52 -0700
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-06 23:52 -0700
1from collections.abc import Callable, Iterable, Iterator, Sequence
2from itertools import chain, count, repeat
3from typing import Mapping, cast
5from toolz.itertoolz import sliding_window
7from .dict import invert
9from .seq import iter_to_seq
10from .typing import Comparable
13def iter_to_grams[T](
14 data: Iterable[T],
15 *,
16 n: int,
17 pad: T | None = None,
18) -> Iterable[Sequence[T]]:
19 if pad is not None:
20 data = chain(
21 repeat(pad, n - 1),
22 data,
23 repeat(pad, n - 1),
24 )
26 return sliding_window(n, data)
29def is_sorted[T](
30 data: Iterable[T],
31 *,
32 key: Callable[[T], Comparable] | None = None,
33 reverse: bool = False,
34) -> bool:
35 local_key: Callable[[T], Comparable]
36 if key is None:
37 def default_key(v: T) -> Comparable:
38 return cast("Comparable", v)
40 local_key = default_key
41 else:
42 local_key = key
44 return all(
45 (
46 local_key(prev) >= local_key(curr) if reverse
47 else local_key(prev) <= local_key(curr)
48 )
49 for prev, curr in sliding_window(2, data)
50 )
53def filter_by_positions[T](poss: Iterable[int], data: Iterable[T]) -> Iterable[T]:
54 p: Iterator[int] = iter(poss)
56 pos: int | None = next(p, None)
57 if pos is None:
58 return
60 for i, v in enumerate(data):
61 if i == pos:
62 yield v
64 pos = next(p, None)
65 if pos is None:
66 return
69def filter_by_others[T](func: Callable[[T, T], bool], data: Iterable[T]) -> Iterable[T]:
70 seq: Sequence[T] = iter_to_seq(data)
72 filtered_ids: set[int] = set(range(len(seq)))
74 for i, x in enumerate(seq):
75 remove: bool = False
76 for j in filtered_ids:
77 if i == j:
78 continue
80 if not func(x, seq[j]):
81 remove = True
82 break
84 if remove:
85 filtered_ids.remove(i)
87 for i in filtered_ids:
88 yield seq[i]
91def remap[KT, VT](
92 data: Iterable[KT],
93 mapping: Mapping[KT, VT],
94 *,
95 key: Callable[[KT], VT] | None = None,
96) -> Iterable[VT]:
97 local_key: Callable[[KT], VT]
98 if key is None:
99 inverted_mapping: Mapping[VT, KT] = invert(mapping)
100 c = count(start=0)
102 def default_key(_: KT) -> VT:
103 while True:
104 v: int = next(c)
105 if v not in inverted_mapping:
106 return cast("VT", v)
108 local_key = default_key
109 else:
110 local_key = key
112 k: KT
113 for k in data:
114 yield mapping.setdefault(k, local_key(k))