docs for zanj v0.5.2
View Source on GitHub

zanj.serializing


  1from __future__ import annotations
  2
  3import json
  4import sys
  5from dataclasses import dataclass
  6from typing import IO, Any, Callable, Iterable, Sequence
  7import warnings
  8
  9import numpy as np
 10from muutils.json_serialize.array import arr_metadata
 11from muutils.json_serialize.json_serialize import (  # JsonSerializer,
 12    DEFAULT_HANDLERS,
 13    ObjectPath,
 14    SerializerHandler,
 15)
 16from muutils.json_serialize.util import (
 17    JSONdict,
 18    JSONitem,
 19    MonoTuple,
 20    _FORMAT_KEY,
 21    _REF_KEY,
 22)
 23
 24from zanj.externals import ExternalItem, ExternalItemType, _ZANJ_pre
 25
 26KW_ONLY_KWARGS: dict = dict()
 27if sys.version_info >= (3, 10):
 28    KW_ONLY_KWARGS["kw_only"] = True
 29
 30# pylint: disable=unused-argument, protected-access, unexpected-keyword-arg
 31# for some reason pylint complains about kwargs to ZANJSerializerHandler
 32
 33
 34def jsonl_metadata(data: list[JSONdict]) -> dict:
 35    """metadata about a jsonl object"""
 36    all_cols: set[str] = set([col for item in data for col in item.keys()])
 37    return {
 38        "data[0]": data[0],
 39        "len(data)": len(data),
 40        "columns": {
 41            col: {
 42                "types": list(
 43                    set([type(item[col]).__name__ for item in data if col in item])
 44                ),
 45                "len": len([item[col] for item in data if col in item]),
 46            }
 47            for col in all_cols
 48            if col != _FORMAT_KEY
 49        },
 50    }
 51
 52
 53def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None:
 54    """store numpy array to given file as .npy"""
 55    # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5
 56    # info: rule `unresolved-attribute` is enabled by default
 57    np.lib.format.write_array(  # ty: ignore[unresolved-attribute]
 58        fp=fp,
 59        array=np.asanyarray(data),
 60        allow_pickle=False,
 61    )
 62
 63
 64def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None:
 65    """store sequence to given file as .jsonl"""
 66
 67    for item in data:
 68        fp.write(json.dumps(item).encode("utf-8"))
 69        fp.write("\n".encode("utf-8"))
 70
 71
 72EXTERNAL_STORE_FUNCS: dict[
 73    ExternalItemType, Callable[[_ZANJ_pre, IO[bytes], Any], None]
 74] = {
 75    "npy": store_npy,
 76    "jsonl": store_jsonl,
 77}
 78
 79
 80@dataclass(**KW_ONLY_KWARGS)
 81class ZANJSerializerHandler(SerializerHandler):
 82    """a handler for ZANJ serialization"""
 83
 84    # unique identifier for the handler, saved in _FORMAT_KEY field
 85    # uid: str
 86    # source package of the handler -- note that this might be overridden by ZANJ
 87    source_pckg: str
 88    # (self_config, object) -> whether to use this handler
 89    check: Callable[[_ZANJ_pre, Any, ObjectPath], bool]
 90    # (self_config, object, path) -> serialized object
 91    serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem]
 92    # optional description of how this serializer works
 93    # desc: str = "(no description)"
 94
 95
 96def zanj_external_serialize(
 97    jser: _ZANJ_pre,
 98    data: Any,
 99    path: ObjectPath,
100    item_type: ExternalItemType,
101    _format: str,
102) -> JSONitem:
103    """stores a numpy array or jsonl externally in a ZANJ object
104
105    # Parameters:
106     - `jser: ZANJ`
107     - `data: Any`
108     - `path: ObjectPath`
109     - `item_type: ExternalItemType`
110
111    # Returns:
112     - `JSONitem`
113       json data with reference
114
115    # Modifies:
116     - modifies `jser._externals`
117    """
118    # get the path, make sure its unique
119    assert isinstance(path, tuple), (
120        f"path must be a tuple, got {type(path) = } {path = }"
121    )
122    joined_path: str = "/".join([str(p) for p in path])
123    archive_path: str = f"{joined_path}.{item_type}"
124
125    # TODO: somehow need to control whether a failure here causes a fallback to other handlers, or whether the except should propagate
126    # this will probably require changes to the upstream muutils.json_serialize code
127    if archive_path in jser._externals:
128        err_msg = f"external path {archive_path} already exists!"
129        warnings.warn(err_msg)
130        raise ValueError(err_msg)
131    # Check for true path prefix conflicts (not just string prefix)
132    # Only flag when one path is a directory ancestor of another (contains "/" separator)
133    for p in jser._externals.keys():
134        # Remove the file extension to get the joined_path
135        existing_joined_path = p.rsplit(".", 1)[0]
136        # Check if one is a true path prefix with "/" separator
137        if existing_joined_path.startswith(joined_path + "/") or joined_path.startswith(
138            existing_joined_path + "/"
139        ):
140            err_msg = (
141                f"external path {joined_path} is a prefix of another path {p}!\n"
142                + f"{jser._externals.keys() = }\n{joined_path = }\n{path = }\n{p = }\n{existing_joined_path = }\n{archive_path = }\n{_format = }"
143            )
144            warnings.warn(err_msg)
145            raise ValueError(err_msg)
146
147    # process the data if needed, assemble metadata
148    data_new: Any = data
149    output: dict = {
150        _FORMAT_KEY: _format,
151        _REF_KEY: archive_path,
152    }
153    if item_type == "npy":
154        # check type
155        data_type_str: str = str(type(data))
156        if data_type_str == "<class 'torch.Tensor'>":
157            # detach and convert
158            data_new = data.detach().cpu().numpy()
159        elif data_type_str == "<class 'numpy.ndarray'>":
160            pass
161        else:
162            # if not a numpy array, except
163            raise TypeError(f"expected numpy.ndarray, got {data_type_str}")
164        # get metadata
165        output.update(arr_metadata(data))
166    elif item_type.startswith("jsonl"):
167        # check via mro to avoid importing pandas
168        if any("pandas.core.frame.DataFrame" in str(t) for t in data.__class__.__mro__):
169            output["columns"] = data.columns.tolist()
170            data_new = data.to_dict(orient="records")
171        elif isinstance(data, (list, tuple, Iterable, Sequence)):
172            data_new = [
173                jser.json_serialize(item, tuple(path) + (i,))
174                for i, item in enumerate(data)
175            ]
176        else:
177            raise TypeError(
178                f"expected list or pandas.DataFrame for jsonl, got {type(data)}"
179            )
180
181        if all([isinstance(item, dict) for item in data_new]):
182            output.update(jsonl_metadata(data_new))
183
184    # store the item for external serialization
185    jser._externals[archive_path] = ExternalItem(
186        item_type=item_type,
187        data=data_new,
188        path=path,
189    )
190
191    return output
192
193
194DEFAULT_SERIALIZER_HANDLERS_ZANJ: MonoTuple[ZANJSerializerHandler] = tuple(
195    [
196        ZANJSerializerHandler(
197            check=lambda self, obj, path: (
198                isinstance(obj, np.ndarray)
199                and obj.size >= self.external_array_threshold
200            ),
201            serialize_func=lambda self, obj, path: zanj_external_serialize(
202                self, obj, path, item_type="npy", _format="numpy.ndarray:external"
203            ),
204            uid="numpy.ndarray:external",
205            source_pckg="zanj",
206            desc="external numpy array",
207        ),
208        ZANJSerializerHandler(
209            check=lambda self, obj, path: (
210                str(type(obj)) == "<class 'torch.Tensor'>"
211                and int(obj.nelement()) >= self.external_array_threshold
212            ),
213            serialize_func=lambda self, obj, path: zanj_external_serialize(
214                self, obj, path, item_type="npy", _format="torch.Tensor:external"
215            ),
216            uid="torch.Tensor:external",
217            source_pckg="zanj",
218            desc="external torch tensor",
219        ),
220        ZANJSerializerHandler(
221            check=lambda self, obj, path: isinstance(obj, list)
222            and len(obj) >= self.external_list_threshold,
223            serialize_func=lambda self, obj, path: zanj_external_serialize(
224                self, obj, path, item_type="jsonl", _format="list:external"
225            ),
226            uid="list:external",
227            source_pckg="zanj",
228            desc="external list",
229        ),
230        ZANJSerializerHandler(
231            check=lambda self, obj, path: isinstance(obj, tuple)
232            and len(obj) >= self.external_list_threshold,
233            serialize_func=lambda self, obj, path: zanj_external_serialize(
234                self, obj, path, item_type="jsonl", _format="tuple:external"
235            ),
236            uid="tuple:external",
237            source_pckg="zanj",
238            desc="external tuple",
239        ),
240        ZANJSerializerHandler(
241            check=lambda self, obj, path: (
242                any(
243                    "pandas.core.frame.DataFrame" in str(t)
244                    for t in obj.__class__.__mro__
245                )
246                and len(obj) >= self.external_list_threshold
247            ),
248            serialize_func=lambda self, obj, path: zanj_external_serialize(
249                self, obj, path, item_type="jsonl", _format="pandas.DataFrame:external"
250            ),
251            uid="pandas.DataFrame:external",
252            source_pckg="zanj",
253            desc="external pandas DataFrame",
254        ),
255        # ZANJSerializerHandler(
256        #     check=lambda self, obj, path: "<class 'torch.nn.modules.module.Module'>"
257        #     in [str(t) for t in obj.__class__.__mro__],
258        #     serialize_func=lambda self, obj, path: zanj_serialize_torchmodule(
259        #         self, obj, path,
260        #     ),
261        #     uid="torch.nn.Module",
262        #     source_pckg="zanj",
263        #     desc="fallback torch serialization",
264        # ),
265    ]
266) + tuple(
267    DEFAULT_HANDLERS  # type: ignore[arg-type]
268)
269
270# the complaint above is:
271# error: Argument 1 to "tuple" has incompatible type "Sequence[SerializerHandler]"; expected "Iterable[ZANJSerializerHandler]"  [arg-type]

KW_ONLY_KWARGS: dict = {'kw_only': True}
def jsonl_metadata( data: list[typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.List[typing.Union[bool, int, float, str, NoneType, typing.List[typing.Any], typing.Dict[str, typing.Any]]], typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.List[typing.Any], typing.Dict[str, typing.Any]]]]]]) -> dict:
35def jsonl_metadata(data: list[JSONdict]) -> dict:
36    """metadata about a jsonl object"""
37    all_cols: set[str] = set([col for item in data for col in item.keys()])
38    return {
39        "data[0]": data[0],
40        "len(data)": len(data),
41        "columns": {
42            col: {
43                "types": list(
44                    set([type(item[col]).__name__ for item in data if col in item])
45                ),
46                "len": len([item[col] for item in data if col in item]),
47            }
48            for col in all_cols
49            if col != _FORMAT_KEY
50        },
51    }

metadata about a jsonl object

def store_npy(self: Any, fp: IO[bytes], data: numpy.ndarray) -> None:
54def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None:
55    """store numpy array to given file as .npy"""
56    # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5
57    # info: rule `unresolved-attribute` is enabled by default
58    np.lib.format.write_array(  # ty: ignore[unresolved-attribute]
59        fp=fp,
60        array=np.asanyarray(data),
61        allow_pickle=False,
62    )

store numpy array to given file as .npy

def store_jsonl( self: Any, fp: IO[bytes], data: Sequence[Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]]) -> None:
65def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None:
66    """store sequence to given file as .jsonl"""
67
68    for item in data:
69        fp.write(json.dumps(item).encode("utf-8"))
70        fp.write("\n".encode("utf-8"))

store sequence to given file as .jsonl

EXTERNAL_STORE_FUNCS: dict[typing.Literal['jsonl', 'npy'], typing.Callable[[typing.Any, typing.IO[bytes], typing.Any], NoneType]] = {'npy': <function store_npy>, 'jsonl': <function store_jsonl>}
@dataclass(**KW_ONLY_KWARGS)
class ZANJSerializerHandler(muutils.json_serialize.json_serialize.SerializerHandler):
81@dataclass(**KW_ONLY_KWARGS)
82class ZANJSerializerHandler(SerializerHandler):
83    """a handler for ZANJ serialization"""
84
85    # unique identifier for the handler, saved in _FORMAT_KEY field
86    # uid: str
87    # source package of the handler -- note that this might be overridden by ZANJ
88    source_pckg: str
89    # (self_config, object) -> whether to use this handler
90    check: Callable[[_ZANJ_pre, Any, ObjectPath], bool]
91    # (self_config, object, path) -> serialized object
92    serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem]
93    # optional description of how this serializer works
94    # desc: str = "(no description)"

a handler for ZANJ serialization

ZANJSerializerHandler( uid: str, desc: str, *, check: Callable[[Any, Any, tuple[Union[str, int], ...]], bool], serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]], source_pckg: str)
source_pckg: str
check: Callable[[Any, Any, tuple[Union[str, int], ...]], bool]
serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]]
Inherited Members
muutils.json_serialize.json_serialize.SerializerHandler
uid
desc
serialize
def zanj_external_serialize( jser: Any, data: Any, path: tuple[typing.Union[str, int], ...], item_type: Literal['jsonl', 'npy'], _format: str) -> Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]:
 97def zanj_external_serialize(
 98    jser: _ZANJ_pre,
 99    data: Any,
100    path: ObjectPath,
101    item_type: ExternalItemType,
102    _format: str,
103) -> JSONitem:
104    """stores a numpy array or jsonl externally in a ZANJ object
105
106    # Parameters:
107     - `jser: ZANJ`
108     - `data: Any`
109     - `path: ObjectPath`
110     - `item_type: ExternalItemType`
111
112    # Returns:
113     - `JSONitem`
114       json data with reference
115
116    # Modifies:
117     - modifies `jser._externals`
118    """
119    # get the path, make sure its unique
120    assert isinstance(path, tuple), (
121        f"path must be a tuple, got {type(path) = } {path = }"
122    )
123    joined_path: str = "/".join([str(p) for p in path])
124    archive_path: str = f"{joined_path}.{item_type}"
125
126    # TODO: somehow need to control whether a failure here causes a fallback to other handlers, or whether the except should propagate
127    # this will probably require changes to the upstream muutils.json_serialize code
128    if archive_path in jser._externals:
129        err_msg = f"external path {archive_path} already exists!"
130        warnings.warn(err_msg)
131        raise ValueError(err_msg)
132    # Check for true path prefix conflicts (not just string prefix)
133    # Only flag when one path is a directory ancestor of another (contains "/" separator)
134    for p in jser._externals.keys():
135        # Remove the file extension to get the joined_path
136        existing_joined_path = p.rsplit(".", 1)[0]
137        # Check if one is a true path prefix with "/" separator
138        if existing_joined_path.startswith(joined_path + "/") or joined_path.startswith(
139            existing_joined_path + "/"
140        ):
141            err_msg = (
142                f"external path {joined_path} is a prefix of another path {p}!\n"
143                + f"{jser._externals.keys() = }\n{joined_path = }\n{path = }\n{p = }\n{existing_joined_path = }\n{archive_path = }\n{_format = }"
144            )
145            warnings.warn(err_msg)
146            raise ValueError(err_msg)
147
148    # process the data if needed, assemble metadata
149    data_new: Any = data
150    output: dict = {
151        _FORMAT_KEY: _format,
152        _REF_KEY: archive_path,
153    }
154    if item_type == "npy":
155        # check type
156        data_type_str: str = str(type(data))
157        if data_type_str == "<class 'torch.Tensor'>":
158            # detach and convert
159            data_new = data.detach().cpu().numpy()
160        elif data_type_str == "<class 'numpy.ndarray'>":
161            pass
162        else:
163            # if not a numpy array, except
164            raise TypeError(f"expected numpy.ndarray, got {data_type_str}")
165        # get metadata
166        output.update(arr_metadata(data))
167    elif item_type.startswith("jsonl"):
168        # check via mro to avoid importing pandas
169        if any("pandas.core.frame.DataFrame" in str(t) for t in data.__class__.__mro__):
170            output["columns"] = data.columns.tolist()
171            data_new = data.to_dict(orient="records")
172        elif isinstance(data, (list, tuple, Iterable, Sequence)):
173            data_new = [
174                jser.json_serialize(item, tuple(path) + (i,))
175                for i, item in enumerate(data)
176            ]
177        else:
178            raise TypeError(
179                f"expected list or pandas.DataFrame for jsonl, got {type(data)}"
180            )
181
182        if all([isinstance(item, dict) for item in data_new]):
183            output.update(jsonl_metadata(data_new))
184
185    # store the item for external serialization
186    jser._externals[archive_path] = ExternalItem(
187        item_type=item_type,
188        data=data_new,
189        path=path,
190    )
191
192    return output

stores a numpy array or jsonl externally in a ZANJ object

Parameters:

  • jser: ZANJ
  • data: Any
  • path: ObjectPath
  • item_type: ExternalItemType

Returns:

  • JSONitem json data with reference

Modifies:

  • modifies jser._externals
DEFAULT_SERIALIZER_HANDLERS_ZANJ: None = (ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray:external', desc='external numpy array', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor:external', desc='external torch tensor', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='list:external', desc='external list', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='tuple:external', desc='external tuple', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame:external', desc='external pandas DataFrame', source_pckg='zanj'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='base types', desc='base types (bool, int, float, str, None)'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dictionaries', desc='dictionaries'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='(list, tuple) -> list', desc='lists and tuples as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function _serialize_override_serialize_func>, uid='.serialize override', desc='objects with .serialize method'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='namedtuple -> dict', desc='namedtuples as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dataclass -> dict', desc='dataclasses as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='path -> str', desc='Path objects as posix strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='obj -> str(obj)', desc='directly serialize objects in `SERIALIZE_DIRECT_AS_STR` to strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray', desc='numpy arrays'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor', desc='pytorch tensors'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame', desc='pandas DataFrames'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='(set, list, tuple, Iterable) -> list', desc='sets, lists, tuples, and Iterables as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='fallback', desc='fallback handler -- serialize object attributes and special functions as strings'))