docs for zanj v0.6.0
View Source on GitHub

zanj.serializing


  1from __future__ import annotations
  2
  3import json
  4import sys
  5from dataclasses import dataclass
  6from typing import IO, Any, Callable, Iterable, Sequence
  7import warnings
  8
  9import numpy as np
 10from muutils.json_serialize.array import arr_metadata
 11from muutils.json_serialize.json_serialize import (  # JsonSerializer,
 12    DEFAULT_HANDLERS,
 13    ObjectPath,
 14    SerializerHandler,
 15)
 16
 17from zanj.consts import JSONdict, JSONitem, MonoTuple, _FORMAT_KEY, _REF_KEY
 18
 19from zanj.externals import ExternalItem, ExternalItemType, _ZANJ_pre
 20
 21KW_ONLY_KWARGS: dict = dict()
 22if sys.version_info >= (3, 10):
 23    KW_ONLY_KWARGS["kw_only"] = True
 24
 25# pylint: disable=unused-argument, protected-access, unexpected-keyword-arg
 26# for some reason pylint complains about kwargs to ZANJSerializerHandler
 27
 28
 29def jsonl_metadata(data: list[JSONdict]) -> dict[str, Any]:
 30    """metadata about a jsonl object"""
 31    all_cols: set[str] = set([col for item in data for col in item.keys()])
 32    output: dict[str, Any] = {
 33        "len(data)": len(data),
 34        "columns": {
 35            col: {
 36                "types": list(
 37                    set([type(item[col]).__name__ for item in data if col in item])
 38                ),
 39                "len": len([item[col] for item in data if col in item]),
 40            }
 41            for col in all_cols
 42            if col != _FORMAT_KEY
 43        },
 44    }
 45    if len(data) > 0:
 46        output["data[0]"] = data[0]
 47    return output
 48
 49
 50def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None:
 51    """store numpy array to given file as .npy"""
 52    # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5
 53    # info: rule `unresolved-attribute` is enabled by default
 54    np.lib.format.write_array(  # ty: ignore[unresolved-attribute]
 55        fp=fp,
 56        array=np.asanyarray(data),
 57        allow_pickle=False,
 58    )
 59
 60
 61def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None:
 62    """store sequence to given file as .jsonl"""
 63
 64    for item in data:
 65        fp.write(json.dumps(item).encode("utf-8"))
 66        fp.write("\n".encode("utf-8"))
 67
 68
 69EXTERNAL_STORE_FUNCS: dict[
 70    ExternalItemType, Callable[[_ZANJ_pre, IO[bytes], Any], None]
 71] = {
 72    "npy": store_npy,
 73    "jsonl": store_jsonl,
 74}
 75
 76
 77@dataclass(**KW_ONLY_KWARGS)
 78class ZANJSerializerHandler(SerializerHandler):
 79    """a handler for ZANJ serialization"""
 80
 81    # unique identifier for the handler, saved in _FORMAT_KEY field
 82    # uid: str
 83    # source package of the handler -- note that this might be overridden by ZANJ
 84    source_pckg: str
 85    # (self_config, object) -> whether to use this handler
 86    check: Callable[[_ZANJ_pre, Any, ObjectPath], bool]
 87    # (self_config, object, path) -> serialized object
 88    serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem]
 89    # optional description of how this serializer works
 90    # desc: str = "(no description)"
 91
 92
 93def zanj_external_serialize(
 94    jser: _ZANJ_pre,
 95    data: Any,
 96    path: ObjectPath,
 97    item_type: ExternalItemType,
 98    _format: str,
 99) -> JSONitem:
100    """stores a numpy array or jsonl externally in a ZANJ object
101
102    # Parameters:
103     - `jser: ZANJ`
104     - `data: Any`
105     - `path: ObjectPath`
106     - `item_type: ExternalItemType`
107
108    # Returns:
109     - `JSONitem`
110       json data with reference
111
112    # Modifies:
113     - modifies `jser._externals`
114    """
115    # get the path, make sure its unique
116    assert isinstance(path, tuple), (
117        f"path must be a tuple, got {type(path) = } {path = }"
118    )
119    joined_path: str = "/".join([str(p) for p in path])
120    archive_path: str = f"{joined_path}.{item_type}"
121
122    # TODO: somehow need to control whether a failure here causes a fallback to other handlers, or whether the except should propagate
123    # this will probably require changes to the upstream muutils.json_serialize code
124    if archive_path in jser._externals:
125        err_msg = f"external path {archive_path} already exists!"
126        warnings.warn(err_msg)
127        raise ValueError(err_msg)
128    # Check for true path prefix conflicts (not just string prefix)
129    # Only flag when one path is a directory ancestor of another (contains "/" separator)
130    for p in jser._externals.keys():
131        # Remove the file extension to get the joined_path
132        existing_joined_path = p.rsplit(".", 1)[0]
133        # Check if one is a true path prefix with "/" separator
134        if existing_joined_path.startswith(joined_path + "/") or joined_path.startswith(
135            existing_joined_path + "/"
136        ):
137            err_msg = (
138                f"external path {joined_path} is a prefix of another path {p}!\n"
139                + f"{jser._externals.keys() = }\n{joined_path = }\n{path = }\n{p = }\n{existing_joined_path = }\n{archive_path = }\n{_format = }"
140            )
141            warnings.warn(err_msg)
142            raise ValueError(err_msg)
143
144    # process the data if needed, assemble metadata
145    data_new: Any = data
146    output: dict = {
147        _FORMAT_KEY: _format,
148        _REF_KEY: archive_path,
149    }
150    if item_type == "npy":
151        # check type
152        data_type_str: str = str(type(data))
153        if data_type_str == "<class 'torch.Tensor'>":
154            # detach and convert
155            data_new = data.detach().cpu().numpy()
156        elif data_type_str == "<class 'numpy.ndarray'>":
157            pass
158        else:
159            # if not a numpy array, except
160            raise TypeError(f"expected numpy.ndarray, got {data_type_str}")
161        # get metadata
162        output.update(arr_metadata(data))
163    elif item_type.startswith("jsonl"):
164        # check via module and class name to avoid importing pandas (works with pandas 3.0+)
165        dataframe_columns = None
166        if (
167            "pandas" in data.__class__.__module__
168            and data.__class__.__name__ == "DataFrame"
169        ):
170            dataframe_columns = data.columns.tolist()
171            data_new = data.to_dict(orient="records")
172        elif (
173            "polars" in data.__class__.__module__
174            and data.__class__.__name__ == "DataFrame"
175        ):
176            dataframe_columns = data.columns
177            data_new = data.to_dicts()
178        elif isinstance(data, (list, tuple, Iterable, Sequence)):
179            data_new = [
180                jser.json_serialize(item, tuple(path) + (i,))
181                for i, item in enumerate(data)
182            ]
183        else:
184            raise TypeError(
185                f"expected list or pandas.DataFrame for jsonl, got {type(data)}"
186            )
187
188        if all([isinstance(item, dict) for item in data_new]):
189            output.update(jsonl_metadata(data_new))
190
191        # set DataFrame columns after jsonl_metadata to avoid being overwritten
192        if dataframe_columns is not None:
193            output["columns"] = dataframe_columns
194
195    # store the item for external serialization
196    jser._externals[archive_path] = ExternalItem(
197        item_type=item_type,
198        data=data_new,
199        path=path,
200    )
201
202    return output
203
204
205DEFAULT_SERIALIZER_HANDLERS_ZANJ: MonoTuple[ZANJSerializerHandler] = tuple(
206    [
207        ZANJSerializerHandler(
208            check=lambda self, obj, path: (
209                isinstance(obj, np.ndarray)
210                and obj.size >= self.external_array_threshold
211            ),
212            serialize_func=lambda self, obj, path: zanj_external_serialize(
213                self, obj, path, item_type="npy", _format="numpy.ndarray:external"
214            ),
215            uid="numpy.ndarray:external",
216            source_pckg="zanj",
217            desc="external numpy array",
218        ),
219        ZANJSerializerHandler(
220            check=lambda self, obj, path: (
221                str(type(obj)) == "<class 'torch.Tensor'>"
222                and int(obj.nelement()) >= self.external_array_threshold
223            ),
224            serialize_func=lambda self, obj, path: zanj_external_serialize(
225                self, obj, path, item_type="npy", _format="torch.Tensor:external"
226            ),
227            uid="torch.Tensor:external",
228            source_pckg="zanj",
229            desc="external torch tensor",
230        ),
231        ZANJSerializerHandler(
232            check=lambda self, obj, path: (
233                isinstance(obj, list) and len(obj) >= self.external_list_threshold
234            ),
235            serialize_func=lambda self, obj, path: zanj_external_serialize(
236                self, obj, path, item_type="jsonl", _format="list:external"
237            ),
238            uid="list:external",
239            source_pckg="zanj",
240            desc="external list",
241        ),
242        ZANJSerializerHandler(
243            check=lambda self, obj, path: (
244                isinstance(obj, tuple) and len(obj) >= self.external_list_threshold
245            ),
246            serialize_func=lambda self, obj, path: zanj_external_serialize(
247                self, obj, path, item_type="jsonl", _format="tuple:external"
248            ),
249            uid="tuple:external",
250            source_pckg="zanj",
251            desc="external tuple",
252        ),
253        ZANJSerializerHandler(
254            check=lambda self, obj, path: (
255                "pandas" in obj.__class__.__module__
256                and obj.__class__.__name__ == "DataFrame"
257            ),
258            serialize_func=lambda self, obj, path: zanj_external_serialize(
259                self, obj, path, item_type="jsonl", _format="pandas.DataFrame:external"
260            ),
261            uid="pandas.DataFrame:external",
262            source_pckg="zanj",
263            desc="external pandas DataFrame",
264        ),
265        ZANJSerializerHandler(
266            check=lambda self, obj, path: (
267                "polars" in obj.__class__.__module__
268                and obj.__class__.__name__ == "DataFrame"
269            ),
270            serialize_func=lambda self, obj, path: zanj_external_serialize(
271                self, obj, path, item_type="jsonl", _format="polars.DataFrame:external"
272            ),
273            uid="polars.DataFrame:external",
274            source_pckg="zanj",
275            desc="external polars DataFrame",
276        ),
277        # ZANJSerializerHandler(
278        #     check=lambda self, obj, path: "<class 'torch.nn.modules.module.Module'>"
279        #     in [str(t) for t in obj.__class__.__mro__],
280        #     serialize_func=lambda self, obj, path: zanj_serialize_torchmodule(
281        #         self, obj, path,
282        #     ),
283        #     uid="torch.nn.Module",
284        #     source_pckg="zanj",
285        #     desc="fallback torch serialization",
286        # ),
287    ]
288) + tuple(
289    DEFAULT_HANDLERS  # type: ignore[arg-type]
290)
291
292# the complaint above is:
293# error: Argument 1 to "tuple" has incompatible type "Sequence[SerializerHandler]"; expected "Iterable[ZANJSerializerHandler]"  [arg-type]

KW_ONLY_KWARGS: dict = {'kw_only': True}
def jsonl_metadata( data: list[typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.Sequence[typing.Union[bool, int, float, str, NoneType, typing.Sequence[ForwardRef('JSONitem')], typing.Dict[str, ForwardRef('JSONitem')]]], typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.Sequence[ForwardRef('JSONitem')], typing.Dict[str, ForwardRef('JSONitem')]]]]]]) -> dict[str, typing.Any]:
30def jsonl_metadata(data: list[JSONdict]) -> dict[str, Any]:
31    """metadata about a jsonl object"""
32    all_cols: set[str] = set([col for item in data for col in item.keys()])
33    output: dict[str, Any] = {
34        "len(data)": len(data),
35        "columns": {
36            col: {
37                "types": list(
38                    set([type(item[col]).__name__ for item in data if col in item])
39                ),
40                "len": len([item[col] for item in data if col in item]),
41            }
42            for col in all_cols
43            if col != _FORMAT_KEY
44        },
45    }
46    if len(data) > 0:
47        output["data[0]"] = data[0]
48    return output

metadata about a jsonl object

def store_npy(self: Any, fp: IO[bytes], data: numpy.ndarray) -> None:
51def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None:
52    """store numpy array to given file as .npy"""
53    # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5
54    # info: rule `unresolved-attribute` is enabled by default
55    np.lib.format.write_array(  # ty: ignore[unresolved-attribute]
56        fp=fp,
57        array=np.asanyarray(data),
58        allow_pickle=False,
59    )

store numpy array to given file as .npy

def store_jsonl( self: Any, fp: IO[bytes], data: Sequence[Union[bool, int, float, str, NoneType, Sequence[Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]], Dict[str, Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]]]]) -> None:
62def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None:
63    """store sequence to given file as .jsonl"""
64
65    for item in data:
66        fp.write(json.dumps(item).encode("utf-8"))
67        fp.write("\n".encode("utf-8"))

store sequence to given file as .jsonl

EXTERNAL_STORE_FUNCS: dict[typing.Literal['jsonl', 'npy'], typing.Callable[[typing.Any, typing.IO[bytes], typing.Any], NoneType]] = {'npy': <function store_npy>, 'jsonl': <function store_jsonl>}
@dataclass(**KW_ONLY_KWARGS)
class ZANJSerializerHandler(muutils.json_serialize.json_serialize.SerializerHandler):
78@dataclass(**KW_ONLY_KWARGS)
79class ZANJSerializerHandler(SerializerHandler):
80    """a handler for ZANJ serialization"""
81
82    # unique identifier for the handler, saved in _FORMAT_KEY field
83    # uid: str
84    # source package of the handler -- note that this might be overridden by ZANJ
85    source_pckg: str
86    # (self_config, object) -> whether to use this handler
87    check: Callable[[_ZANJ_pre, Any, ObjectPath], bool]
88    # (self_config, object, path) -> serialized object
89    serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem]
90    # optional description of how this serializer works
91    # desc: str = "(no description)"

a handler for ZANJ serialization

ZANJSerializerHandler( uid: str, desc: str, *, check: Callable[[Any, Any, tuple[Union[str, int], ...]], bool], serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, Sequence[Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]], Dict[str, Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]]]], source_pckg: str)
source_pckg: str
check: Callable[[Any, Any, tuple[Union[str, int], ...]], bool]
serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, Sequence[Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]], Dict[str, Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]]]]
Inherited Members
muutils.json_serialize.json_serialize.SerializerHandler
uid
desc
serialize
def zanj_external_serialize( jser: Any, data: Any, path: tuple[typing.Union[str, int], ...], item_type: Literal['jsonl', 'npy'], _format: str) -> Union[bool, int, float, str, NoneType, Sequence[ForwardRef('JSONitem')], Dict[str, ForwardRef('JSONitem')]]:
 94def zanj_external_serialize(
 95    jser: _ZANJ_pre,
 96    data: Any,
 97    path: ObjectPath,
 98    item_type: ExternalItemType,
 99    _format: str,
100) -> JSONitem:
101    """stores a numpy array or jsonl externally in a ZANJ object
102
103    # Parameters:
104     - `jser: ZANJ`
105     - `data: Any`
106     - `path: ObjectPath`
107     - `item_type: ExternalItemType`
108
109    # Returns:
110     - `JSONitem`
111       json data with reference
112
113    # Modifies:
114     - modifies `jser._externals`
115    """
116    # get the path, make sure its unique
117    assert isinstance(path, tuple), (
118        f"path must be a tuple, got {type(path) = } {path = }"
119    )
120    joined_path: str = "/".join([str(p) for p in path])
121    archive_path: str = f"{joined_path}.{item_type}"
122
123    # TODO: somehow need to control whether a failure here causes a fallback to other handlers, or whether the except should propagate
124    # this will probably require changes to the upstream muutils.json_serialize code
125    if archive_path in jser._externals:
126        err_msg = f"external path {archive_path} already exists!"
127        warnings.warn(err_msg)
128        raise ValueError(err_msg)
129    # Check for true path prefix conflicts (not just string prefix)
130    # Only flag when one path is a directory ancestor of another (contains "/" separator)
131    for p in jser._externals.keys():
132        # Remove the file extension to get the joined_path
133        existing_joined_path = p.rsplit(".", 1)[0]
134        # Check if one is a true path prefix with "/" separator
135        if existing_joined_path.startswith(joined_path + "/") or joined_path.startswith(
136            existing_joined_path + "/"
137        ):
138            err_msg = (
139                f"external path {joined_path} is a prefix of another path {p}!\n"
140                + f"{jser._externals.keys() = }\n{joined_path = }\n{path = }\n{p = }\n{existing_joined_path = }\n{archive_path = }\n{_format = }"
141            )
142            warnings.warn(err_msg)
143            raise ValueError(err_msg)
144
145    # process the data if needed, assemble metadata
146    data_new: Any = data
147    output: dict = {
148        _FORMAT_KEY: _format,
149        _REF_KEY: archive_path,
150    }
151    if item_type == "npy":
152        # check type
153        data_type_str: str = str(type(data))
154        if data_type_str == "<class 'torch.Tensor'>":
155            # detach and convert
156            data_new = data.detach().cpu().numpy()
157        elif data_type_str == "<class 'numpy.ndarray'>":
158            pass
159        else:
160            # if not a numpy array, except
161            raise TypeError(f"expected numpy.ndarray, got {data_type_str}")
162        # get metadata
163        output.update(arr_metadata(data))
164    elif item_type.startswith("jsonl"):
165        # check via module and class name to avoid importing pandas (works with pandas 3.0+)
166        dataframe_columns = None
167        if (
168            "pandas" in data.__class__.__module__
169            and data.__class__.__name__ == "DataFrame"
170        ):
171            dataframe_columns = data.columns.tolist()
172            data_new = data.to_dict(orient="records")
173        elif (
174            "polars" in data.__class__.__module__
175            and data.__class__.__name__ == "DataFrame"
176        ):
177            dataframe_columns = data.columns
178            data_new = data.to_dicts()
179        elif isinstance(data, (list, tuple, Iterable, Sequence)):
180            data_new = [
181                jser.json_serialize(item, tuple(path) + (i,))
182                for i, item in enumerate(data)
183            ]
184        else:
185            raise TypeError(
186                f"expected list or pandas.DataFrame for jsonl, got {type(data)}"
187            )
188
189        if all([isinstance(item, dict) for item in data_new]):
190            output.update(jsonl_metadata(data_new))
191
192        # set DataFrame columns after jsonl_metadata to avoid being overwritten
193        if dataframe_columns is not None:
194            output["columns"] = dataframe_columns
195
196    # store the item for external serialization
197    jser._externals[archive_path] = ExternalItem(
198        item_type=item_type,
199        data=data_new,
200        path=path,
201    )
202
203    return output

stores a numpy array or jsonl externally in a ZANJ object

Parameters:

  • jser: ZANJ
  • data: Any
  • path: ObjectPath
  • item_type: ExternalItemType

Returns:

  • JSONitem json data with reference

Modifies:

  • modifies jser._externals
DEFAULT_SERIALIZER_HANDLERS_ZANJ: None = (ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray:external', desc='external numpy array', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor:external', desc='external torch tensor', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='list:external', desc='external list', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='tuple:external', desc='external tuple', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame:external', desc='external pandas DataFrame', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='polars.DataFrame:external', desc='external polars DataFrame', source_pckg='zanj'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='base types', desc='base types (bool, int, float, str, None)'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dictionaries', desc='dictionaries'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='namedtuple -> dict', desc='namedtuples as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='(list, tuple) -> list', desc='lists and tuples as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function _serialize_override_serialize_func>, uid='.serialize override', desc='objects with .serialize method'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dataclass -> dict', desc='dataclasses as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='path -> str', desc='Path objects as posix strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='obj -> str(obj)', desc='directly serialize objects in `SERIALIZE_DIRECT_AS_STR` to strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray', desc='numpy arrays'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor', desc='pytorch tensors'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame', desc='pandas DataFrames'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid="set -> dict[_FORMAT_KEY: 'set', data: list(...)]", desc='sets as dicts with format key'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='Iterable -> list', desc='Iterables (not lists/tuples/strings) as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='fallback', desc='fallback handler -- serialize object attributes and special functions as strings'))