Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/kombu/serialization.py : 3%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Serialization utilities."""
2from __future__ import absolute_import, unicode_literals
4import codecs
5import os
6import sys
8import pickle as pypickle
9try:
10 import cPickle as cpickle
11except ImportError: # pragma: no cover
12 cpickle = None # noqa
14from collections import namedtuple
15from contextlib import contextmanager
16from io import BytesIO
18from .exceptions import (
19 ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
20)
21from .five import reraise, text_t
22from .utils.compat import entrypoints
23from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t
25__all__ = ('pickle', 'loads', 'dumps', 'register', 'unregister')
26SKIP_DECODE = frozenset(['binary', 'ascii-8bit'])
27TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])
29if sys.platform.startswith('java'): # pragma: no cover
31 def _decode(t, coding):
32 return codecs.getdecoder(coding)(t)[0]
33else:
34 _decode = codecs.decode
36pickle = cpickle or pypickle
37pickle_load = pickle.load
39#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
40#: There's a new protocol (3) but this is only supported by Python 3.
41pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))
43codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))
46@contextmanager
47def _reraise_errors(wrapper,
48 include=(Exception,), exclude=(SerializerNotInstalled,)):
49 try:
50 yield
51 except exclude:
52 raise
53 except include as exc:
54 reraise(wrapper, wrapper(exc), sys.exc_info()[2])
57def pickle_loads(s, load=pickle_load):
58 # used to support buffer objects
59 return load(BytesIO(s))
62def parenthesize_alias(first, second):
63 return '%s (%s)' % (first, second) if first else second
66class SerializerRegistry(object):
67 """The registry keeps track of serialization methods."""
69 def __init__(self):
70 self._encoders = {}
71 self._decoders = {}
72 self._default_encode = None
73 self._default_content_type = None
74 self._default_content_encoding = None
75 self._disabled_content_types = set()
76 self.type_to_name = {}
77 self.name_to_type = {}
79 def register(self, name, encoder, decoder, content_type,
80 content_encoding='utf-8'):
81 """Register a new encoder/decoder.
83 Arguments:
84 name (str): A convenience name for the serialization method.
86 encoder (callable): A method that will be passed a python data
87 structure and should return a string representing the
88 serialized data. If :const:`None`, then only a decoder
89 will be registered. Encoding will not be possible.
91 decoder (Callable): A method that will be passed a string
92 representing serialized data and should return a python
93 data structure. If :const:`None`, then only an encoder
94 will be registered. Decoding will not be possible.
96 content_type (str): The mime-type describing the serialized
97 structure.
99 content_encoding (str): The content encoding (character set) that
100 the `decoder` method will be returning. Will usually be
101 `utf-8`, `us-ascii`, or `binary`.
102 """
103 if encoder:
104 self._encoders[name] = codec(
105 content_type, content_encoding, encoder,
106 )
107 if decoder:
108 self._decoders[content_type] = decoder
109 self.type_to_name[content_type] = name
110 self.name_to_type[name] = content_type
112 def enable(self, name):
113 if '/' not in name:
114 name = self.name_to_type[name]
115 self._disabled_content_types.discard(name)
117 def disable(self, name):
118 if '/' not in name:
119 name = self.name_to_type[name]
120 self._disabled_content_types.add(name)
122 def unregister(self, name):
123 """Unregister registered encoder/decoder.
125 Arguments:
126 name (str): Registered serialization method name.
128 Raises:
129 SerializerNotInstalled: If a serializer by that name
130 cannot be found.
131 """
132 try:
133 content_type = self.name_to_type[name]
134 self._decoders.pop(content_type, None)
135 self._encoders.pop(name, None)
136 self.type_to_name.pop(content_type, None)
137 self.name_to_type.pop(name, None)
138 except KeyError:
139 raise SerializerNotInstalled(
140 'No encoder/decoder installed for {0}'.format(name))
142 def _set_default_serializer(self, name):
143 """Set the default serialization method used by this library.
145 Arguments:
146 name (str): The name of the registered serialization method.
147 For example, `json` (default), `pickle`, `yaml`, `msgpack`,
148 or any custom methods registered using :meth:`register`.
150 Raises:
151 SerializerNotInstalled: If the serialization method
152 requested is not available.
153 """
154 try:
155 (self._default_content_type, self._default_content_encoding,
156 self._default_encode) = self._encoders[name]
157 except KeyError:
158 raise SerializerNotInstalled(
159 'No encoder installed for {0}'.format(name))
161 def dumps(self, data, serializer=None):
162 """Encode data.
164 Serialize a data structure into a string suitable for sending
165 as an AMQP message body.
167 Arguments:
168 data (List, Dict, str): The message data to send.
170 serializer (str): An optional string representing
171 the serialization method you want the data marshalled
172 into. (For example, `json`, `raw`, or `pickle`).
174 If :const:`None` (default), then json will be used, unless
175 `data` is a :class:`str` or :class:`unicode` object. In this
176 latter case, no serialization occurs as it would be
177 unnecessary.
179 Note that if `serializer` is specified, then that
180 serialization method will be used even if a :class:`str`
181 or :class:`unicode` object is passed in.
183 Returns:
184 Tuple[str, str, str]: A three-item tuple containing the
185 content type (e.g., `application/json`), content encoding, (e.g.,
186 `utf-8`) and a string containing the serialized data.
188 Raises:
189 SerializerNotInstalled: If the serialization method
190 requested is not available.
191 """
192 if serializer == 'raw':
193 return raw_encode(data)
194 if serializer and not self._encoders.get(serializer):
195 raise SerializerNotInstalled(
196 'No encoder installed for {0}'.format(serializer))
198 # If a raw string was sent, assume binary encoding
199 # (it's likely either ASCII or a raw binary file, and a character
200 # set of 'binary' will encompass both, even if not ideal.
201 if not serializer and isinstance(data, bytes_t):
202 # In Python 3+, this would be "bytes"; allow binary data to be
203 # sent as a message without getting encoder errors
204 return 'application/data', 'binary', data
206 # For Unicode objects, force it into a string
207 if not serializer and isinstance(data, text_t):
208 with _reraise_errors(EncodeError, exclude=()):
209 payload = data.encode('utf-8')
210 return 'text/plain', 'utf-8', payload
212 if serializer:
213 content_type, content_encoding, encoder = \
214 self._encoders[serializer]
215 else:
216 encoder = self._default_encode
217 content_type = self._default_content_type
218 content_encoding = self._default_content_encoding
220 with _reraise_errors(EncodeError):
221 payload = encoder(data)
222 return content_type, content_encoding, payload
224 def loads(self, data, content_type, content_encoding,
225 accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
226 """Decode serialized data.
228 Deserialize a data stream as serialized using `dumps`
229 based on `content_type`.
231 Arguments:
232 data (bytes, buffer, str): The message data to deserialize.
234 content_type (str): The content-type of the data.
235 (e.g., `application/json`).
237 content_encoding (str): The content-encoding of the data.
238 (e.g., `utf-8`, `binary`, or `us-ascii`).
240 accept (Set): List of content-types to accept.
242 Raises:
243 ContentDisallowed: If the content-type is not accepted.
245 Returns:
246 Any: The unserialized data.
247 """
248 content_type = (bytes_to_str(content_type) if content_type
249 else 'application/data')
250 if accept is not None:
251 if content_type not in _trusted_content \
252 and content_type not in accept:
253 raise self._for_untrusted_content(content_type, 'untrusted')
254 else:
255 if content_type in self._disabled_content_types and not force:
256 raise self._for_untrusted_content(content_type, 'disabled')
257 content_encoding = (content_encoding or 'utf-8').lower()
259 if data:
260 decode = self._decoders.get(content_type)
261 if decode:
262 with _reraise_errors(DecodeError):
263 return decode(data)
264 if content_encoding not in SKIP_DECODE and \
265 not isinstance(data, text_t):
266 with _reraise_errors(DecodeError):
267 return _decode(data, content_encoding)
268 return data
270 def _for_untrusted_content(self, ctype, why):
271 return ContentDisallowed(
272 'Refusing to deserialize {0} content of type {1}'.format(
273 why,
274 parenthesize_alias(self.type_to_name.get(ctype, ctype), ctype),
275 ),
276 )
279#: Global registry of serializers/deserializers.
280registry = SerializerRegistry()
281dumps = registry.dumps
282loads = registry.loads
283register = registry.register
284unregister = registry.unregister
287def raw_encode(data):
288 """Special case serializer."""
289 content_type = 'application/data'
290 payload = data
291 if isinstance(payload, text_t):
292 content_encoding = 'utf-8'
293 with _reraise_errors(EncodeError, exclude=()):
294 payload = payload.encode(content_encoding)
295 else:
296 content_encoding = 'binary'
297 return content_type, content_encoding, payload
300def register_json():
301 """Register a encoder/decoder for JSON serialization."""
302 from kombu.utils import json as _json
304 registry.register('json', _json.dumps, _json.loads,
305 content_type='application/json',
306 content_encoding='utf-8')
309def register_yaml():
310 """Register a encoder/decoder for YAML serialization.
312 It is slower than JSON, but allows for more data types
313 to be serialized. Useful if you need to send data such as dates
315 """
316 try:
317 import yaml
318 registry.register('yaml', yaml.safe_dump, yaml.safe_load,
319 content_type='application/x-yaml',
320 content_encoding='utf-8')
321 except ImportError:
323 def not_available(*args, **kwargs):
324 """Raise SerializerNotInstalled.
326 Used in case a client receives a yaml message, but yaml
327 isn't installed.
328 """
329 raise SerializerNotInstalled(
330 'No decoder installed for YAML. Install the PyYAML library')
331 registry.register('yaml', None, not_available, 'application/x-yaml')
334if sys.version_info[0] == 3: # pragma: no cover
336 def unpickle(s):
337 return pickle_loads(str_to_bytes(s))
339else:
340 unpickle = pickle_loads # noqa
343def register_pickle():
344 """Register pickle serializer.
346 The fastest serialization method, but restricts
347 you to python clients.
348 """
349 def pickle_dumps(obj, dumper=pickle.dumps):
350 return dumper(obj, protocol=pickle_protocol)
352 registry.register('pickle', pickle_dumps, unpickle,
353 content_type='application/x-python-serialize',
354 content_encoding='binary')
357def register_msgpack():
358 """Register msgpack serializer.
360 See Also:
361 https://msgpack.org/.
362 """
363 pack = unpack = None
364 try:
365 import msgpack
366 if msgpack.version >= (0, 4):
367 from msgpack import packb, unpackb
369 def pack(s):
370 return packb(s, use_bin_type=True)
372 def unpack(s):
373 return unpackb(s, raw=False)
374 else:
375 def version_mismatch(*args, **kwargs):
376 raise SerializerNotInstalled(
377 'msgpack requires msgpack-python >= 0.4.0')
378 pack = unpack = version_mismatch
379 except (ImportError, ValueError):
380 def not_available(*args, **kwargs):
381 raise SerializerNotInstalled(
382 'No decoder installed for msgpack. '
383 'Please install the msgpack-python library')
384 pack = unpack = not_available
385 registry.register(
386 'msgpack', pack, unpack,
387 content_type='application/x-msgpack',
388 content_encoding='binary',
389 )
392# Register the base serialization methods.
393register_json()
394register_pickle()
395register_yaml()
396register_msgpack()
398# Default serializer is 'json'
399registry._set_default_serializer('json')
402_setupfuns = {
403 'json': register_json,
404 'pickle': register_pickle,
405 'yaml': register_yaml,
406 'msgpack': register_msgpack,
407 'application/json': register_json,
408 'application/x-yaml': register_yaml,
409 'application/x-python-serialize': register_pickle,
410 'application/x-msgpack': register_msgpack,
411}
413NOTSET = object()
416def enable_insecure_serializers(choices=NOTSET):
417 """Enable serializers that are considered to be unsafe.
419 Note:
420 Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you
421 can also specify a list of serializers (by name or content type)
422 to enable.
423 """
424 choices = ['pickle', 'yaml', 'msgpack'] if choices is NOTSET else choices
425 if choices is not None:
426 for choice in choices:
427 try:
428 registry.enable(choice)
429 except KeyError:
430 pass
433def disable_insecure_serializers(allowed=NOTSET):
434 """Disable untrusted serializers.
436 Will disable all serializers except ``json``
437 or you can specify a list of deserializers to allow.
439 Note:
440 Producers will still be able to serialize data
441 in these formats, but consumers will not accept
442 incoming data using the untrusted content types.
443 """
444 allowed = ['json'] if allowed is NOTSET else allowed
445 for name in registry._decoders:
446 registry.disable(name)
447 if allowed is not None:
448 for name in allowed:
449 registry.enable(name)
452# Insecure serializers are disabled by default since v3.0
453disable_insecure_serializers()
455# Load entrypoints from installed extensions
456for ep, args in entrypoints('kombu.serializers'): # pragma: no cover
457 register(ep.name, *args)
460def prepare_accept_content(content_types, name_to_type=None):
461 name_to_type = registry.name_to_type if not name_to_type else name_to_type
462 if content_types is not None:
463 return {n if '/' in n else name_to_type[n] for n in content_types}
464 return content_types