Coverage for src/extratools_core/itertools.py: 98%

57 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-06 23:52 -0700

1from collections.abc import Callable, Iterable, Iterator, Sequence 

2from itertools import chain, count, repeat 

3from typing import Mapping, cast 

4 

5from toolz.itertoolz import sliding_window 

6 

7from .dict import invert 

8 

9from .seq import iter_to_seq 

10from .typing import Comparable 

11 

12 

13def iter_to_grams[T]( 

14 data: Iterable[T], 

15 *, 

16 n: int, 

17 pad: T | None = None, 

18) -> Iterable[Sequence[T]]: 

19 if pad is not None: 

20 data = chain( 

21 repeat(pad, n - 1), 

22 data, 

23 repeat(pad, n - 1), 

24 ) 

25 

26 return sliding_window(n, data) 

27 

28 

29def is_sorted[T]( 

30 data: Iterable[T], 

31 *, 

32 key: Callable[[T], Comparable] | None = None, 

33 reverse: bool = False, 

34) -> bool: 

35 local_key: Callable[[T], Comparable] 

36 if key is None: 

37 def default_key(v: T) -> Comparable: 

38 return cast("Comparable", v) 

39 

40 local_key = default_key 

41 else: 

42 local_key = key 

43 

44 return all( 

45 ( 

46 local_key(prev) >= local_key(curr) if reverse 

47 else local_key(prev) <= local_key(curr) 

48 ) 

49 for prev, curr in sliding_window(2, data) 

50 ) 

51 

52 

53def filter_by_positions[T](poss: Iterable[int], data: Iterable[T]) -> Iterable[T]: 

54 p: Iterator[int] = iter(poss) 

55 

56 pos: int | None = next(p, None) 

57 if pos is None: 

58 return 

59 

60 for i, v in enumerate(data): 

61 if i == pos: 

62 yield v 

63 

64 pos = next(p, None) 

65 if pos is None: 

66 return 

67 

68 

69def filter_by_others[T](func: Callable[[T, T], bool], data: Iterable[T]) -> Iterable[T]: 

70 seq: Sequence[T] = iter_to_seq(data) 

71 

72 filtered_ids: set[int] = set(range(len(seq))) 

73 

74 for i, x in enumerate(seq): 

75 remove: bool = False 

76 for j in filtered_ids: 

77 if i == j: 

78 continue 

79 

80 if not func(x, seq[j]): 

81 remove = True 

82 break 

83 

84 if remove: 

85 filtered_ids.remove(i) 

86 

87 for i in filtered_ids: 

88 yield seq[i] 

89 

90 

91def remap[KT, VT]( 

92 data: Iterable[KT], 

93 mapping: Mapping[KT, VT], 

94 *, 

95 key: Callable[[KT], VT] | None = None, 

96) -> Iterable[VT]: 

97 local_key: Callable[[KT], VT] 

98 if key is None: 

99 inverted_mapping: Mapping[VT, KT] = invert(mapping) 

100 c = count(start=0) 

101 

102 def default_key(_: KT) -> VT: 

103 while True: 

104 v: int = next(c) 

105 if v not in inverted_mapping: 

106 return cast("VT", v) 

107 

108 local_key = default_key 

109 else: 

110 local_key = key 

111 

112 k: KT 

113 for k in data: 

114 yield mapping.setdefault(k, local_key(k))