nrp_cmd.converter
1import json as _json 2import logging 3from datetime import datetime 4from functools import partial 5from typing import ( 6 Any, 7 Callable, 8 Optional, 9 Protocol, 10 Self, 11 TypeVar, 12 cast, 13 dataclass_transform, 14 get_args, 15 get_origin, 16) 17 18from attrs import Attribute, define, fields 19from cattrs import Converter 20from cattrs.dispatch import StructureHook, UnstructureHook 21from cattrs.gen import ( 22 AttributeOverride, 23 make_dict_structure_fn, 24 make_dict_unstructure_fn, 25 override, 26) 27from yarl import URL 28 29from .errors import StructureError, UnstructureError 30 31 32def _remove_nulls(data): 33 if isinstance(data, dict): 34 for key, value in list(data.items()): 35 if value is None: 36 del data[key] 37 else: 38 _remove_nulls(value) 39 elif isinstance(data, list): 40 for idx in range(len(data) - 1, -1, -1): 41 if data[idx] is None: 42 del data[idx] 43 for item in data: 44 _remove_nulls(item) 45 46 47class NullRemovingConverter(Converter): 48 def unstructure(self, obj: Any, unstructure_as: Any = None) -> Any: 49 try: 50 ret = super().unstructure(obj, unstructure_as) 51 _remove_nulls(ret) 52 return ret 53 except Exception as e: 54 raise UnstructureError(str(e)) from e 55 56 def structure(self, obj: Any, type_: type) -> Any: 57 try: 58 return super().structure(obj, type_) 59 except Exception as e: 60 raise StructureError(str(e)) from e 61 62converter = NullRemovingConverter(omit_if_default=True) 63 64 65class TypeHookStructureWrapper(Protocol): 66 """Wrapper for structure hooks.""" 67 68 def __call__(self, data: Any, type_: type, previous: StructureHook) -> Any: 69 """Wrap the structure hook.""" 70 ... 71 72 73class TypeHookUnstructureWrapper(Protocol): 74 """Wrapper for unstructure hooks.""" 75 76 def __call__(self, data: Any, previous: UnstructureHook) -> Any: 77 """Wrap the unstructure hook.""" 78 ... 79 80 81def structure_extra_data_hook(data: Any, type_: type, previous: StructureHook) -> Any: 82 """Structure hook for extra data that moves extra data to a separate _extra_data attribute.""" 83 ret = previous(data, type) 84 keys: set[str] = set() 85 86 fld: Attribute 87 for fld in fields(type_): 88 if fld.alias: 89 keys.add(fld.alias) 90 else: 91 keys.add(fld.name) 92 extra_keys = data.keys() - keys 93 extra_data = {k: data[k] for k in extra_keys} 94 ret._extra_data = extra_data 95 return ret 96 97 98def unstructure_extra_data_hook(data: Any, previous: UnstructureHook) -> Any: 99 """Unstructure hook for extra data that merges extra data from _extra_data attribute.""" 100 if "_extra_data" not in data.__dict__: 101 return previous(data) 102 extra_data = data._extra_data 103 data = previous(data) 104 data.pop("_extra_data", None) 105 data.update(extra_data) 106 return data 107 108 109class TypeHookBuilder: 110 """Builder for type hooks.""" 111 112 def __init__(self, t: type, allow_extra_data: bool) -> None: 113 """Initialize the builder.""" 114 self._type = t 115 self._structure_omitted_fields: list[str] = [] 116 self._unstructure_omitted_fields: list[str] = [] 117 self._rename_fields: dict[str, str] = {} 118 self._structure_wrappers: list[TypeHookStructureWrapper] = [] 119 self._unstructure_wrappers: list[TypeHookUnstructureWrapper] = [] 120 self._allow_extra_data = allow_extra_data 121 122 def omit_from_structure(self, *fields: str) -> Self: 123 """Omit fields from structuring (deserialization).""" 124 self._structure_omitted_fields.extend(fields) 125 return self 126 127 def omit_from_unstructure(self, *fields: str) -> Self: 128 """Omit fields from unstructuring (serialization).""" 129 self._unstructure_omitted_fields.extend(fields) 130 return self 131 132 def omit(self, *fields: str) -> Self: 133 """Omit fields from both structuring and unstructuring.""" 134 self.omit_from_structure(*fields) 135 self.omit_from_unstructure(*fields) 136 return self 137 138 def rename_field(self, serialized_name: str, model_name: str) -> Self: 139 """Rename a field.""" 140 self._rename_fields[serialized_name] = model_name 141 return self 142 143 def add_structure_wrapper(self, wrapper: TypeHookStructureWrapper) -> Self: 144 """Add a custom function for structuring.""" 145 self._structure_wrappers.append(wrapper) 146 return self 147 148 def add_unstructure_wrapper(self, wrapper: TypeHookUnstructureWrapper) -> Self: 149 """Add a custom function for unstructuring.""" 150 self._unstructure_wrappers.append(wrapper) 151 return self 152 153 def build_type_hook(self) -> None: 154 """Build and register the type hook.""" 155 if self._allow_extra_data: 156 self._structure_wrappers.append(structure_extra_data_hook) 157 self._unstructure_wrappers.append(unstructure_extra_data_hook) 158 159 structure_overrides: dict[str, AttributeOverride] = {} 160 for serialized_name, model_name in self._rename_fields.items(): 161 structure_overrides[model_name] = override(rename=serialized_name) 162 for field in self._structure_omitted_fields: 163 structure_overrides[field] = override(omit=True) 164 165 unstructure_overrides: dict[str, AttributeOverride] = {} 166 for serialized_name, model_name in self._rename_fields.items(): 167 unstructure_overrides[model_name] = override(rename=serialized_name) 168 for field in self._unstructure_omitted_fields: 169 unstructure_overrides[field] = override(omit=True) 170 171 st_hook: StructureHook = make_dict_structure_fn( 172 self._type, converter, **structure_overrides 173 ) 174 175 unst_hook: UnstructureHook = make_dict_unstructure_fn( 176 self._type, converter, **unstructure_overrides 177 ) 178 if self._structure_wrappers: 179 structure_wrappers: list[StructureHook] = [] 180 for structure_wrapper in self._structure_wrappers: 181 if structure_wrappers: 182 structure_wrappers.append( 183 partial(structure_wrapper, previous=structure_wrappers[-1]) 184 ) 185 else: 186 structure_wrappers.append( 187 partial(structure_wrapper, previous=st_hook) 188 ) 189 st_hook = structure_wrappers[-1] 190 191 if self._unstructure_wrappers: 192 unstructure_wrappers: list[UnstructureHook] = [] 193 for unstructure_wrapper in self._unstructure_wrappers: 194 if unstructure_wrappers: 195 unstructure_wrappers.append( 196 partial(unstructure_wrapper, previous=unstructure_wrappers[-1]) 197 ) 198 else: 199 unstructure_wrappers.append( 200 partial(unstructure_wrapper, previous=unst_hook) 201 ) 202 unst_hook = unstructure_wrappers[-1] 203 204 converter.register_structure_hook(self._type, st_hook) 205 converter.register_unstructure_hook(self._type, unst_hook) 206 207 208class SerializationExtension: 209 """Base class for serialization extensions.""" 210 211 def apply(self, builder: TypeHookBuilder) -> None: 212 """Apply the extension to the builder.""" 213 pass 214 215 216@define 217class Omit(SerializationExtension): 218 """Omit a field from serialization.""" 219 220 field: str 221 from_structure: bool = True 222 from_unstructure: bool = True 223 224 def apply(self, builder: TypeHookBuilder) -> None: 225 """Apply the extension to the builder.""" 226 if self.from_structure: 227 builder.omit_from_structure(self.field) 228 if self.from_unstructure: 229 builder.omit_from_unstructure(self.field) 230 231 232@define 233class Rename(SerializationExtension): 234 """Rename a field in serialization.""" 235 236 serialized_name: str 237 model_name: str 238 239 def apply(self, builder: TypeHookBuilder) -> None: 240 """Apply the extension to the builder.""" 241 builder.rename_field(self.serialized_name, self.model_name) 242 243 244@define 245class WrapStructure(SerializationExtension): 246 """Wrap a structure hook.""" 247 248 wrapper: TypeHookStructureWrapper 249 250 def apply(self, builder: TypeHookBuilder) -> None: 251 """Apply the extension to the builder.""" 252 builder.add_structure_wrapper(self.wrapper) 253 254 255@define 256class WrapUnstructure(SerializationExtension): 257 """Wrap an unstructure hook.""" 258 259 wrapper: TypeHookUnstructureWrapper 260 261 def apply(self, builder: TypeHookBuilder) -> None: 262 """Apply the extension to the builder.""" 263 builder.add_unstructure_wrapper(self.wrapper) 264 265 266C = TypeVar("C", bound=type) 267 268 269@dataclass_transform() 270def extend_serialization( 271 *modifications: SerializationExtension, 272 allow_extra_data: bool = False, 273) -> Callable[[C], C]: 274 """Add serialization rules to an attrs class.""" 275 276 def wrapper(t: C) -> C: 277 """Wrap the class and register type for that.""" 278 th = TypeHookBuilder(t, allow_extra_data) 279 for modification in modifications: 280 modification.apply(th) 281 th.build_type_hook() 282 return t 283 284 return wrapper 285 286 287def structure_url_hook(value: str, type: type) -> URL: 288 """Cattrs converter for URL.""" 289 return URL(value) 290 291 292def unstructure_url_hook(value: URL) -> str: 293 """Cattrs converter for URL.""" 294 return str(value) 295 296 297converter.register_structure_hook(URL, structure_url_hook) 298converter.register_unstructure_hook(URL, unstructure_url_hook) 299 300 301def structure_datetime_hook(value: str, type: type) -> datetime: 302 """Cattrs converter for URL.""" 303 return datetime.fromisoformat(value) 304 305 306def unstructure_datetime_hook(value: datetime) -> str: 307 """Cattrs converter for URL.""" 308 return value.isoformat() 309 310 311converter.register_structure_hook(datetime, structure_datetime_hook) 312converter.register_unstructure_hook(datetime, unstructure_datetime_hook) 313 314 315log = logging.getLogger("invenio_nrp.client.deserialize") 316 317 318def deserialize_rest_response[T]( 319 connection: Any, 320 communication_log: logging.Logger, 321 json_payload: bytes, 322 result_class: type[T], 323 etag: Optional[str], 324) -> T: 325 try: 326 if communication_log.isEnabledFor(logging.INFO): 327 communication_log.info("%s", _json.dumps(_json.loads(json_payload))) 328 if get_origin(result_class) is list: 329 arg_type = get_args(result_class)[0] 330 return cast( 331 result_class, 332 [ 333 converter.structure( 334 { 335 **x, 336 }, 337 arg_type, 338 ) 339 for x in _json.loads(json_payload) 340 ], 341 ) 342 ret = converter.structure( 343 { 344 **_json.loads(json_payload), 345 # "context": result_context, 346 }, 347 result_class, 348 ) 349 ret._etag = etag 350 return ret 351 except Exception as e: 352 import traceback 353 traceback.print_exc() 354 log.error("Error validating %s with %s", json_payload, result_class) 355 raise e
48class NullRemovingConverter(Converter): 49 def unstructure(self, obj: Any, unstructure_as: Any = None) -> Any: 50 try: 51 ret = super().unstructure(obj, unstructure_as) 52 _remove_nulls(ret) 53 return ret 54 except Exception as e: 55 raise UnstructureError(str(e)) from e 56 57 def structure(self, obj: Any, type_: type) -> Any: 58 try: 59 return super().structure(obj, type_) 60 except Exception as e: 61 raise StructureError(str(e)) from e
A converter which generates specialized un/structuring functions.
66class TypeHookStructureWrapper(Protocol): 67 """Wrapper for structure hooks.""" 68 69 def __call__(self, data: Any, type_: type, previous: StructureHook) -> Any: 70 """Wrap the structure hook.""" 71 ...
Wrapper for structure hooks.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
74class TypeHookUnstructureWrapper(Protocol): 75 """Wrapper for unstructure hooks.""" 76 77 def __call__(self, data: Any, previous: UnstructureHook) -> Any: 78 """Wrap the unstructure hook.""" 79 ...
Wrapper for unstructure hooks.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
82def structure_extra_data_hook(data: Any, type_: type, previous: StructureHook) -> Any: 83 """Structure hook for extra data that moves extra data to a separate _extra_data attribute.""" 84 ret = previous(data, type) 85 keys: set[str] = set() 86 87 fld: Attribute 88 for fld in fields(type_): 89 if fld.alias: 90 keys.add(fld.alias) 91 else: 92 keys.add(fld.name) 93 extra_keys = data.keys() - keys 94 extra_data = {k: data[k] for k in extra_keys} 95 ret._extra_data = extra_data 96 return ret
Structure hook for extra data that moves extra data to a separate _extra_data attribute.
99def unstructure_extra_data_hook(data: Any, previous: UnstructureHook) -> Any: 100 """Unstructure hook for extra data that merges extra data from _extra_data attribute.""" 101 if "_extra_data" not in data.__dict__: 102 return previous(data) 103 extra_data = data._extra_data 104 data = previous(data) 105 data.pop("_extra_data", None) 106 data.update(extra_data) 107 return data
Unstructure hook for extra data that merges extra data from _extra_data attribute.
110class TypeHookBuilder: 111 """Builder for type hooks.""" 112 113 def __init__(self, t: type, allow_extra_data: bool) -> None: 114 """Initialize the builder.""" 115 self._type = t 116 self._structure_omitted_fields: list[str] = [] 117 self._unstructure_omitted_fields: list[str] = [] 118 self._rename_fields: dict[str, str] = {} 119 self._structure_wrappers: list[TypeHookStructureWrapper] = [] 120 self._unstructure_wrappers: list[TypeHookUnstructureWrapper] = [] 121 self._allow_extra_data = allow_extra_data 122 123 def omit_from_structure(self, *fields: str) -> Self: 124 """Omit fields from structuring (deserialization).""" 125 self._structure_omitted_fields.extend(fields) 126 return self 127 128 def omit_from_unstructure(self, *fields: str) -> Self: 129 """Omit fields from unstructuring (serialization).""" 130 self._unstructure_omitted_fields.extend(fields) 131 return self 132 133 def omit(self, *fields: str) -> Self: 134 """Omit fields from both structuring and unstructuring.""" 135 self.omit_from_structure(*fields) 136 self.omit_from_unstructure(*fields) 137 return self 138 139 def rename_field(self, serialized_name: str, model_name: str) -> Self: 140 """Rename a field.""" 141 self._rename_fields[serialized_name] = model_name 142 return self 143 144 def add_structure_wrapper(self, wrapper: TypeHookStructureWrapper) -> Self: 145 """Add a custom function for structuring.""" 146 self._structure_wrappers.append(wrapper) 147 return self 148 149 def add_unstructure_wrapper(self, wrapper: TypeHookUnstructureWrapper) -> Self: 150 """Add a custom function for unstructuring.""" 151 self._unstructure_wrappers.append(wrapper) 152 return self 153 154 def build_type_hook(self) -> None: 155 """Build and register the type hook.""" 156 if self._allow_extra_data: 157 self._structure_wrappers.append(structure_extra_data_hook) 158 self._unstructure_wrappers.append(unstructure_extra_data_hook) 159 160 structure_overrides: dict[str, AttributeOverride] = {} 161 for serialized_name, model_name in self._rename_fields.items(): 162 structure_overrides[model_name] = override(rename=serialized_name) 163 for field in self._structure_omitted_fields: 164 structure_overrides[field] = override(omit=True) 165 166 unstructure_overrides: dict[str, AttributeOverride] = {} 167 for serialized_name, model_name in self._rename_fields.items(): 168 unstructure_overrides[model_name] = override(rename=serialized_name) 169 for field in self._unstructure_omitted_fields: 170 unstructure_overrides[field] = override(omit=True) 171 172 st_hook: StructureHook = make_dict_structure_fn( 173 self._type, converter, **structure_overrides 174 ) 175 176 unst_hook: UnstructureHook = make_dict_unstructure_fn( 177 self._type, converter, **unstructure_overrides 178 ) 179 if self._structure_wrappers: 180 structure_wrappers: list[StructureHook] = [] 181 for structure_wrapper in self._structure_wrappers: 182 if structure_wrappers: 183 structure_wrappers.append( 184 partial(structure_wrapper, previous=structure_wrappers[-1]) 185 ) 186 else: 187 structure_wrappers.append( 188 partial(structure_wrapper, previous=st_hook) 189 ) 190 st_hook = structure_wrappers[-1] 191 192 if self._unstructure_wrappers: 193 unstructure_wrappers: list[UnstructureHook] = [] 194 for unstructure_wrapper in self._unstructure_wrappers: 195 if unstructure_wrappers: 196 unstructure_wrappers.append( 197 partial(unstructure_wrapper, previous=unstructure_wrappers[-1]) 198 ) 199 else: 200 unstructure_wrappers.append( 201 partial(unstructure_wrapper, previous=unst_hook) 202 ) 203 unst_hook = unstructure_wrappers[-1] 204 205 converter.register_structure_hook(self._type, st_hook) 206 converter.register_unstructure_hook(self._type, unst_hook)
Builder for type hooks.
113 def __init__(self, t: type, allow_extra_data: bool) -> None: 114 """Initialize the builder.""" 115 self._type = t 116 self._structure_omitted_fields: list[str] = [] 117 self._unstructure_omitted_fields: list[str] = [] 118 self._rename_fields: dict[str, str] = {} 119 self._structure_wrappers: list[TypeHookStructureWrapper] = [] 120 self._unstructure_wrappers: list[TypeHookUnstructureWrapper] = [] 121 self._allow_extra_data = allow_extra_data
Initialize the builder.
123 def omit_from_structure(self, *fields: str) -> Self: 124 """Omit fields from structuring (deserialization).""" 125 self._structure_omitted_fields.extend(fields) 126 return self
Omit fields from structuring (deserialization).
128 def omit_from_unstructure(self, *fields: str) -> Self: 129 """Omit fields from unstructuring (serialization).""" 130 self._unstructure_omitted_fields.extend(fields) 131 return self
Omit fields from unstructuring (serialization).
133 def omit(self, *fields: str) -> Self: 134 """Omit fields from both structuring and unstructuring.""" 135 self.omit_from_structure(*fields) 136 self.omit_from_unstructure(*fields) 137 return self
Omit fields from both structuring and unstructuring.
139 def rename_field(self, serialized_name: str, model_name: str) -> Self: 140 """Rename a field.""" 141 self._rename_fields[serialized_name] = model_name 142 return self
Rename a field.
144 def add_structure_wrapper(self, wrapper: TypeHookStructureWrapper) -> Self: 145 """Add a custom function for structuring.""" 146 self._structure_wrappers.append(wrapper) 147 return self
Add a custom function for structuring.
149 def add_unstructure_wrapper(self, wrapper: TypeHookUnstructureWrapper) -> Self: 150 """Add a custom function for unstructuring.""" 151 self._unstructure_wrappers.append(wrapper) 152 return self
Add a custom function for unstructuring.
154 def build_type_hook(self) -> None: 155 """Build and register the type hook.""" 156 if self._allow_extra_data: 157 self._structure_wrappers.append(structure_extra_data_hook) 158 self._unstructure_wrappers.append(unstructure_extra_data_hook) 159 160 structure_overrides: dict[str, AttributeOverride] = {} 161 for serialized_name, model_name in self._rename_fields.items(): 162 structure_overrides[model_name] = override(rename=serialized_name) 163 for field in self._structure_omitted_fields: 164 structure_overrides[field] = override(omit=True) 165 166 unstructure_overrides: dict[str, AttributeOverride] = {} 167 for serialized_name, model_name in self._rename_fields.items(): 168 unstructure_overrides[model_name] = override(rename=serialized_name) 169 for field in self._unstructure_omitted_fields: 170 unstructure_overrides[field] = override(omit=True) 171 172 st_hook: StructureHook = make_dict_structure_fn( 173 self._type, converter, **structure_overrides 174 ) 175 176 unst_hook: UnstructureHook = make_dict_unstructure_fn( 177 self._type, converter, **unstructure_overrides 178 ) 179 if self._structure_wrappers: 180 structure_wrappers: list[StructureHook] = [] 181 for structure_wrapper in self._structure_wrappers: 182 if structure_wrappers: 183 structure_wrappers.append( 184 partial(structure_wrapper, previous=structure_wrappers[-1]) 185 ) 186 else: 187 structure_wrappers.append( 188 partial(structure_wrapper, previous=st_hook) 189 ) 190 st_hook = structure_wrappers[-1] 191 192 if self._unstructure_wrappers: 193 unstructure_wrappers: list[UnstructureHook] = [] 194 for unstructure_wrapper in self._unstructure_wrappers: 195 if unstructure_wrappers: 196 unstructure_wrappers.append( 197 partial(unstructure_wrapper, previous=unstructure_wrappers[-1]) 198 ) 199 else: 200 unstructure_wrappers.append( 201 partial(unstructure_wrapper, previous=unst_hook) 202 ) 203 unst_hook = unstructure_wrappers[-1] 204 205 converter.register_structure_hook(self._type, st_hook) 206 converter.register_unstructure_hook(self._type, unst_hook)
Build and register the type hook.
209class SerializationExtension: 210 """Base class for serialization extensions.""" 211 212 def apply(self, builder: TypeHookBuilder) -> None: 213 """Apply the extension to the builder.""" 214 pass
Base class for serialization extensions.
217@define 218class Omit(SerializationExtension): 219 """Omit a field from serialization.""" 220 221 field: str 222 from_structure: bool = True 223 from_unstructure: bool = True 224 225 def apply(self, builder: TypeHookBuilder) -> None: 226 """Apply the extension to the builder.""" 227 if self.from_structure: 228 builder.omit_from_structure(self.field) 229 if self.from_unstructure: 230 builder.omit_from_unstructure(self.field)
Omit a field from serialization.
2def __init__(self, field, from_structure=attr_dict['from_structure'].default, from_unstructure=attr_dict['from_unstructure'].default): 3 self.field = field 4 self.from_structure = from_structure 5 self.from_unstructure = from_unstructure
Method generated by attrs for class Omit.
233@define 234class Rename(SerializationExtension): 235 """Rename a field in serialization.""" 236 237 serialized_name: str 238 model_name: str 239 240 def apply(self, builder: TypeHookBuilder) -> None: 241 """Apply the extension to the builder.""" 242 builder.rename_field(self.serialized_name, self.model_name)
Rename a field in serialization.
2def __init__(self, serialized_name, model_name): 3 self.serialized_name = serialized_name 4 self.model_name = model_name
Method generated by attrs for class Rename.
245@define 246class WrapStructure(SerializationExtension): 247 """Wrap a structure hook.""" 248 249 wrapper: TypeHookStructureWrapper 250 251 def apply(self, builder: TypeHookBuilder) -> None: 252 """Apply the extension to the builder.""" 253 builder.add_structure_wrapper(self.wrapper)
Wrap a structure hook.
256@define 257class WrapUnstructure(SerializationExtension): 258 """Wrap an unstructure hook.""" 259 260 wrapper: TypeHookUnstructureWrapper 261 262 def apply(self, builder: TypeHookBuilder) -> None: 263 """Apply the extension to the builder.""" 264 builder.add_unstructure_wrapper(self.wrapper)
Wrap an unstructure hook.
Method generated by attrs for class WrapUnstructure.
270@dataclass_transform() 271def extend_serialization( 272 *modifications: SerializationExtension, 273 allow_extra_data: bool = False, 274) -> Callable[[C], C]: 275 """Add serialization rules to an attrs class.""" 276 277 def wrapper(t: C) -> C: 278 """Wrap the class and register type for that.""" 279 th = TypeHookBuilder(t, allow_extra_data) 280 for modification in modifications: 281 modification.apply(th) 282 th.build_type_hook() 283 return t 284 285 return wrapper
Add serialization rules to an attrs class.
288def structure_url_hook(value: str, type: type) -> URL: 289 """Cattrs converter for URL.""" 290 return URL(value)
Cattrs converter for URL.
293def unstructure_url_hook(value: URL) -> str: 294 """Cattrs converter for URL.""" 295 return str(value)
Cattrs converter for URL.
302def structure_datetime_hook(value: str, type: type) -> datetime: 303 """Cattrs converter for URL.""" 304 return datetime.fromisoformat(value)
Cattrs converter for URL.
307def unstructure_datetime_hook(value: datetime) -> str: 308 """Cattrs converter for URL.""" 309 return value.isoformat()
Cattrs converter for URL.
319def deserialize_rest_response[T]( 320 connection: Any, 321 communication_log: logging.Logger, 322 json_payload: bytes, 323 result_class: type[T], 324 etag: Optional[str], 325) -> T: 326 try: 327 if communication_log.isEnabledFor(logging.INFO): 328 communication_log.info("%s", _json.dumps(_json.loads(json_payload))) 329 if get_origin(result_class) is list: 330 arg_type = get_args(result_class)[0] 331 return cast( 332 result_class, 333 [ 334 converter.structure( 335 { 336 **x, 337 }, 338 arg_type, 339 ) 340 for x in _json.loads(json_payload) 341 ], 342 ) 343 ret = converter.structure( 344 { 345 **_json.loads(json_payload), 346 # "context": result_context, 347 }, 348 result_class, 349 ) 350 ret._etag = etag 351 return ret 352 except Exception as e: 353 import traceback 354 traceback.print_exc() 355 log.error("Error validating %s with %s", json_payload, result_class) 356 raise e