tsemekwes.abctools

  1# Copyright (c) 2017-2026 Juancarlo AƱez (apalala@gmail.com)
  2# SPDX-License-Identifier: BSD-4-Clause
  3from __future__ import annotations
  4
  5import functools
  6import operator
  7from collections.abc import Callable, Iterable
  8from itertools import zip_longest
  9from typing import Any, NamedTuple
 10
 11type Predicate[K, V] = Callable[[K, V], bool]
 12
 13
 14def true(*_: Any, **__: Any) -> bool:
 15    return True
 16
 17
 18def to_list(o: Any) -> list[Any]:
 19    if o is None:
 20        return []
 21    elif isinstance(o, list):
 22        return o
 23    else:
 24        return [o]
 25
 26
 27def is_namedtuple(obj) -> bool:
 28    return (
 29        len(type(obj).__bases__) == 1
 30        and isinstance(obj, tuple)
 31        and hasattr(obj, "_asdict")
 32        and hasattr(obj, "_fields")
 33        and all(isinstance(f, str) for f in getattr(obj, "_fields", ()))
 34    )
 35
 36
 37def as_namedtuple(obj: Any) -> NamedTuple | None:
 38    if is_namedtuple(obj):
 39        return obj
 40    else:
 41        return None
 42
 43
 44def simplify_list(x) -> list[Any] | Any:
 45    if isinstance(x, list) and len(x) == 1:
 46        return simplify_list(x[0])
 47    return x
 48
 49
 50def extend_list(x: list[Any], n: int, default=None) -> None:
 51    def _null():
 52        pass
 53
 54    default = default or _null
 55
 56    missing = max(0, 1 + n - len(x))
 57    x.extend(default() for _ in range(missing))
 58
 59
 60def contains_sublist(lst: list[Any], sublst: list[Any]) -> bool:
 61    n = len(sublst)
 62    return any(sublst == lst[i : i + n] for i in range(1 + len(lst) - n))
 63
 64
 65def join_lists(*lists: list[Any]) -> list[Any]:
 66    return list(functools.reduce(operator.iadd, lists, []))
 67
 68
 69def flatten(o: Iterable[Any] | Any) -> list[Any]:
 70    def iterate(x: Any) -> Iterable[Any]:
 71        if not isinstance(o, list | tuple):
 72            yield x
 73            return
 74
 75        for item in x:
 76            yield from flatten(item)
 77
 78    return list(iterate(o))
 79
 80
 81def compress_seq(seq: Iterable[Any]) -> list[Any]:
 82    seen = set()
 83    result = []
 84    for x in seq:
 85        if x not in seen:
 86            result.append(x)
 87            seen.add(x)
 88    return result
 89
 90
 91def isiter(obj: Any) -> bool:
 92    """
 93    Check if an object is a non-string-like iterable suitable for JSON array conversion.
 94    # by Gemini (2026-01-26)
 95    # by [apalala@gmail.com](https://github.com/apalala)
 96    """
 97    if isinstance(obj, str | bytes | bytearray):
 98        return False
 99    if isinstance(obj, list | set | tuple | dict):
100        return True
101    try:
102        iter(obj)
103        return True
104    except (TypeError, AttributeError):
105        return False
106
107
108def prune_dict[K, V](d: dict[K, V], predicate: Callable[[K, V], bool]):
109    """Remove all items x where predicate(x, d[x])"""
110
111    keys = [k for k, v in d.items() if predicate(k, v)]
112    for k in keys:
113        del d[k]
114
115
116def chunks(iterable, size, fillvalue=None):
117    return zip_longest(*[iter(iterable)] * size, fillvalue=fillvalue)
118
119
120def left_assoc(elements) -> Any:
121    if not elements:
122        return ()
123
124    it = iter(elements)
125    expre = next(it)
126    for e in it:  # type: ignore
127        op = e
128        expre = [op, expre, next(it)]  # type: ignore
129    return expre
130
131
132def right_assoc(elements) -> Any:
133    if not elements:
134        return ()
135
136    def assoc(it) -> Any:
137        left = next(it)
138        try:
139            op = next(it)
140        except StopIteration:
141            return left
142        else:
143            return [op, left, assoc(it)]
144
145    return assoc(iter(elements))
146
147
148def rowselect[K, V](
149    keys: Iterable[K],
150    row: dict[K, V],
151    *,
152    where: Predicate[K, V] = true,
153) -> dict[K, V]:
154    # by [apalala@gmail.com](https://github.com/apalala)
155    # by Gemini (2026-02-05)
156    return {k: row[k] for k in keys if k in row and where(k, row[k])}
157
158
159def select[K, V](
160    keys: Iterable[K],
161    *rows: dict[K, V],
162    where: Predicate[K, V] = true,
163) -> list[dict[K, V]]:
164    return [rowselect(keys, row, where=where) for row in rows]
type Predicate = Callable[[K, V], bool]
def true(*_: Any, **__: Any) -> bool:
15def true(*_: Any, **__: Any) -> bool:
16    return True
def to_list(o: Any) -> list[typing.Any]:
19def to_list(o: Any) -> list[Any]:
20    if o is None:
21        return []
22    elif isinstance(o, list):
23        return o
24    else:
25        return [o]
def is_namedtuple(obj) -> bool:
28def is_namedtuple(obj) -> bool:
29    return (
30        len(type(obj).__bases__) == 1
31        and isinstance(obj, tuple)
32        and hasattr(obj, "_asdict")
33        and hasattr(obj, "_fields")
34        and all(isinstance(f, str) for f in getattr(obj, "_fields", ()))
35    )
def as_namedtuple(obj: Any) -> 'NamedTuple | None':
38def as_namedtuple(obj: Any) -> NamedTuple | None:
39    if is_namedtuple(obj):
40        return obj
41    else:
42        return None
def simplify_list(x) -> list[Any] | Any:
45def simplify_list(x) -> list[Any] | Any:
46    if isinstance(x, list) and len(x) == 1:
47        return simplify_list(x[0])
48    return x
def extend_list(x: list[typing.Any], n: int, default=None) -> None:
51def extend_list(x: list[Any], n: int, default=None) -> None:
52    def _null():
53        pass
54
55    default = default or _null
56
57    missing = max(0, 1 + n - len(x))
58    x.extend(default() for _ in range(missing))
def contains_sublist(lst: list[typing.Any], sublst: list[typing.Any]) -> bool:
61def contains_sublist(lst: list[Any], sublst: list[Any]) -> bool:
62    n = len(sublst)
63    return any(sublst == lst[i : i + n] for i in range(1 + len(lst) - n))
def join_lists(*lists: list[typing.Any]) -> list[typing.Any]:
66def join_lists(*lists: list[Any]) -> list[Any]:
67    return list(functools.reduce(operator.iadd, lists, []))
def flatten(o: Iterable[Any] | Any) -> list[typing.Any]:
70def flatten(o: Iterable[Any] | Any) -> list[Any]:
71    def iterate(x: Any) -> Iterable[Any]:
72        if not isinstance(o, list | tuple):
73            yield x
74            return
75
76        for item in x:
77            yield from flatten(item)
78
79    return list(iterate(o))
def compress_seq(seq: Iterable[typing.Any]) -> list[typing.Any]:
82def compress_seq(seq: Iterable[Any]) -> list[Any]:
83    seen = set()
84    result = []
85    for x in seq:
86        if x not in seen:
87            result.append(x)
88            seen.add(x)
89    return result
def isiter(obj: Any) -> bool:
 92def isiter(obj: Any) -> bool:
 93    """
 94    Check if an object is a non-string-like iterable suitable for JSON array conversion.
 95    # by Gemini (2026-01-26)
 96    # by [apalala@gmail.com](https://github.com/apalala)
 97    """
 98    if isinstance(obj, str | bytes | bytearray):
 99        return False
100    if isinstance(obj, list | set | tuple | dict):
101        return True
102    try:
103        iter(obj)
104        return True
105    except (TypeError, AttributeError):
106        return False

Check if an object is a non-string-like iterable suitable for JSON array conversion.

by Gemini (2026-01-26)

by apalala@gmail.com

def prune_dict(d: 'dict[K, V]', predicate: 'Callable[[K, V], bool]'):
109def prune_dict[K, V](d: dict[K, V], predicate: Callable[[K, V], bool]):
110    """Remove all items x where predicate(x, d[x])"""
111
112    keys = [k for k, v in d.items() if predicate(k, v)]
113    for k in keys:
114        del d[k]

Remove all items x where predicate(x, d[x])

def chunks(iterable, size, fillvalue=None):
117def chunks(iterable, size, fillvalue=None):
118    return zip_longest(*[iter(iterable)] * size, fillvalue=fillvalue)
def left_assoc(elements) -> Any:
121def left_assoc(elements) -> Any:
122    if not elements:
123        return ()
124
125    it = iter(elements)
126    expre = next(it)
127    for e in it:  # type: ignore
128        op = e
129        expre = [op, expre, next(it)]  # type: ignore
130    return expre
def right_assoc(elements) -> Any:
133def right_assoc(elements) -> Any:
134    if not elements:
135        return ()
136
137    def assoc(it) -> Any:
138        left = next(it)
139        try:
140            op = next(it)
141        except StopIteration:
142            return left
143        else:
144            return [op, left, assoc(it)]
145
146    return assoc(iter(elements))
def rowselect( keys: 'Iterable[K]', row: 'dict[K, V]', *, where: 'Predicate[K, V]' = <function true>) -> 'dict[K, V]':
149def rowselect[K, V](
150    keys: Iterable[K],
151    row: dict[K, V],
152    *,
153    where: Predicate[K, V] = true,
154) -> dict[K, V]:
155    # by [apalala@gmail.com](https://github.com/apalala)
156    # by Gemini (2026-02-05)
157    return {k: row[k] for k in keys if k in row and where(k, row[k])}
def select( keys: 'Iterable[K]', *rows: 'dict[K, V]', where: 'Predicate[K, V]' = <function true>) -> 'list[dict[K, V]]':
160def select[K, V](
161    keys: Iterable[K],
162    *rows: dict[K, V],
163    where: Predicate[K, V] = true,
164) -> list[dict[K, V]]:
165    return [rowselect(keys, row, where=where) for row in rows]