Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Serialization utilities.""" 

2from __future__ import absolute_import, unicode_literals 

3 

4import codecs 

5import os 

6import sys 

7 

8import pickle as pypickle 

9try: 

10 import cPickle as cpickle 

11except ImportError: # pragma: no cover 

12 cpickle = None # noqa 

13 

14from collections import namedtuple 

15from contextlib import contextmanager 

16from io import BytesIO 

17 

18from .exceptions import ( 

19 ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled 

20) 

21from .five import reraise, text_t 

22from .utils.compat import entrypoints 

23from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t 

24 

25__all__ = ('pickle', 'loads', 'dumps', 'register', 'unregister') 

26SKIP_DECODE = frozenset(['binary', 'ascii-8bit']) 

27TRUSTED_CONTENT = frozenset(['application/data', 'application/text']) 

28 

29if sys.platform.startswith('java'): # pragma: no cover 

30 

31 def _decode(t, coding): 

32 return codecs.getdecoder(coding)(t)[0] 

33else: 

34 _decode = codecs.decode 

35 

36pickle = cpickle or pypickle 

37pickle_load = pickle.load 

38 

39#: Kombu requires Python 2.5 or later so we use protocol 2 by default. 

40#: There's a new protocol (3) but this is only supported by Python 3. 

41pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2)) 

42 

43codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder')) 

44 

45 

46@contextmanager 

47def _reraise_errors(wrapper, 

48 include=(Exception,), exclude=(SerializerNotInstalled,)): 

49 try: 

50 yield 

51 except exclude: 

52 raise 

53 except include as exc: 

54 reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 

55 

56 

57def pickle_loads(s, load=pickle_load): 

58 # used to support buffer objects 

59 return load(BytesIO(s)) 

60 

61 

62def parenthesize_alias(first, second): 

63 return '%s (%s)' % (first, second) if first else second 

64 

65 

66class SerializerRegistry(object): 

67 """The registry keeps track of serialization methods.""" 

68 

69 def __init__(self): 

70 self._encoders = {} 

71 self._decoders = {} 

72 self._default_encode = None 

73 self._default_content_type = None 

74 self._default_content_encoding = None 

75 self._disabled_content_types = set() 

76 self.type_to_name = {} 

77 self.name_to_type = {} 

78 

79 def register(self, name, encoder, decoder, content_type, 

80 content_encoding='utf-8'): 

81 """Register a new encoder/decoder. 

82 

83 Arguments: 

84 name (str): A convenience name for the serialization method. 

85 

86 encoder (callable): A method that will be passed a python data 

87 structure and should return a string representing the 

88 serialized data. If :const:`None`, then only a decoder 

89 will be registered. Encoding will not be possible. 

90 

91 decoder (Callable): A method that will be passed a string 

92 representing serialized data and should return a python 

93 data structure. If :const:`None`, then only an encoder 

94 will be registered. Decoding will not be possible. 

95 

96 content_type (str): The mime-type describing the serialized 

97 structure. 

98 

99 content_encoding (str): The content encoding (character set) that 

100 the `decoder` method will be returning. Will usually be 

101 `utf-8`, `us-ascii`, or `binary`. 

102 """ 

103 if encoder: 

104 self._encoders[name] = codec( 

105 content_type, content_encoding, encoder, 

106 ) 

107 if decoder: 

108 self._decoders[content_type] = decoder 

109 self.type_to_name[content_type] = name 

110 self.name_to_type[name] = content_type 

111 

112 def enable(self, name): 

113 if '/' not in name: 

114 name = self.name_to_type[name] 

115 self._disabled_content_types.discard(name) 

116 

117 def disable(self, name): 

118 if '/' not in name: 

119 name = self.name_to_type[name] 

120 self._disabled_content_types.add(name) 

121 

122 def unregister(self, name): 

123 """Unregister registered encoder/decoder. 

124 

125 Arguments: 

126 name (str): Registered serialization method name. 

127 

128 Raises: 

129 SerializerNotInstalled: If a serializer by that name 

130 cannot be found. 

131 """ 

132 try: 

133 content_type = self.name_to_type[name] 

134 self._decoders.pop(content_type, None) 

135 self._encoders.pop(name, None) 

136 self.type_to_name.pop(content_type, None) 

137 self.name_to_type.pop(name, None) 

138 except KeyError: 

139 raise SerializerNotInstalled( 

140 'No encoder/decoder installed for {0}'.format(name)) 

141 

142 def _set_default_serializer(self, name): 

143 """Set the default serialization method used by this library. 

144 

145 Arguments: 

146 name (str): The name of the registered serialization method. 

147 For example, `json` (default), `pickle`, `yaml`, `msgpack`, 

148 or any custom methods registered using :meth:`register`. 

149 

150 Raises: 

151 SerializerNotInstalled: If the serialization method 

152 requested is not available. 

153 """ 

154 try: 

155 (self._default_content_type, self._default_content_encoding, 

156 self._default_encode) = self._encoders[name] 

157 except KeyError: 

158 raise SerializerNotInstalled( 

159 'No encoder installed for {0}'.format(name)) 

160 

161 def dumps(self, data, serializer=None): 

162 """Encode data. 

163 

164 Serialize a data structure into a string suitable for sending 

165 as an AMQP message body. 

166 

167 Arguments: 

168 data (List, Dict, str): The message data to send. 

169 

170 serializer (str): An optional string representing 

171 the serialization method you want the data marshalled 

172 into. (For example, `json`, `raw`, or `pickle`). 

173 

174 If :const:`None` (default), then json will be used, unless 

175 `data` is a :class:`str` or :class:`unicode` object. In this 

176 latter case, no serialization occurs as it would be 

177 unnecessary. 

178 

179 Note that if `serializer` is specified, then that 

180 serialization method will be used even if a :class:`str` 

181 or :class:`unicode` object is passed in. 

182 

183 Returns: 

184 Tuple[str, str, str]: A three-item tuple containing the 

185 content type (e.g., `application/json`), content encoding, (e.g., 

186 `utf-8`) and a string containing the serialized data. 

187 

188 Raises: 

189 SerializerNotInstalled: If the serialization method 

190 requested is not available. 

191 """ 

192 if serializer == 'raw': 

193 return raw_encode(data) 

194 if serializer and not self._encoders.get(serializer): 

195 raise SerializerNotInstalled( 

196 'No encoder installed for {0}'.format(serializer)) 

197 

198 # If a raw string was sent, assume binary encoding 

199 # (it's likely either ASCII or a raw binary file, and a character 

200 # set of 'binary' will encompass both, even if not ideal. 

201 if not serializer and isinstance(data, bytes_t): 

202 # In Python 3+, this would be "bytes"; allow binary data to be 

203 # sent as a message without getting encoder errors 

204 return 'application/data', 'binary', data 

205 

206 # For Unicode objects, force it into a string 

207 if not serializer and isinstance(data, text_t): 

208 with _reraise_errors(EncodeError, exclude=()): 

209 payload = data.encode('utf-8') 

210 return 'text/plain', 'utf-8', payload 

211 

212 if serializer: 

213 content_type, content_encoding, encoder = \ 

214 self._encoders[serializer] 

215 else: 

216 encoder = self._default_encode 

217 content_type = self._default_content_type 

218 content_encoding = self._default_content_encoding 

219 

220 with _reraise_errors(EncodeError): 

221 payload = encoder(data) 

222 return content_type, content_encoding, payload 

223 

224 def loads(self, data, content_type, content_encoding, 

225 accept=None, force=False, _trusted_content=TRUSTED_CONTENT): 

226 """Decode serialized data. 

227 

228 Deserialize a data stream as serialized using `dumps` 

229 based on `content_type`. 

230 

231 Arguments: 

232 data (bytes, buffer, str): The message data to deserialize. 

233 

234 content_type (str): The content-type of the data. 

235 (e.g., `application/json`). 

236 

237 content_encoding (str): The content-encoding of the data. 

238 (e.g., `utf-8`, `binary`, or `us-ascii`). 

239 

240 accept (Set): List of content-types to accept. 

241 

242 Raises: 

243 ContentDisallowed: If the content-type is not accepted. 

244 

245 Returns: 

246 Any: The unserialized data. 

247 """ 

248 content_type = (bytes_to_str(content_type) if content_type 

249 else 'application/data') 

250 if accept is not None: 

251 if content_type not in _trusted_content \ 

252 and content_type not in accept: 

253 raise self._for_untrusted_content(content_type, 'untrusted') 

254 else: 

255 if content_type in self._disabled_content_types and not force: 

256 raise self._for_untrusted_content(content_type, 'disabled') 

257 content_encoding = (content_encoding or 'utf-8').lower() 

258 

259 if data: 

260 decode = self._decoders.get(content_type) 

261 if decode: 

262 with _reraise_errors(DecodeError): 

263 return decode(data) 

264 if content_encoding not in SKIP_DECODE and \ 

265 not isinstance(data, text_t): 

266 with _reraise_errors(DecodeError): 

267 return _decode(data, content_encoding) 

268 return data 

269 

270 def _for_untrusted_content(self, ctype, why): 

271 return ContentDisallowed( 

272 'Refusing to deserialize {0} content of type {1}'.format( 

273 why, 

274 parenthesize_alias(self.type_to_name.get(ctype, ctype), ctype), 

275 ), 

276 ) 

277 

278 

279#: Global registry of serializers/deserializers. 

280registry = SerializerRegistry() 

281dumps = registry.dumps 

282loads = registry.loads 

283register = registry.register 

284unregister = registry.unregister 

285 

286 

287def raw_encode(data): 

288 """Special case serializer.""" 

289 content_type = 'application/data' 

290 payload = data 

291 if isinstance(payload, text_t): 

292 content_encoding = 'utf-8' 

293 with _reraise_errors(EncodeError, exclude=()): 

294 payload = payload.encode(content_encoding) 

295 else: 

296 content_encoding = 'binary' 

297 return content_type, content_encoding, payload 

298 

299 

300def register_json(): 

301 """Register a encoder/decoder for JSON serialization.""" 

302 from kombu.utils import json as _json 

303 

304 registry.register('json', _json.dumps, _json.loads, 

305 content_type='application/json', 

306 content_encoding='utf-8') 

307 

308 

309def register_yaml(): 

310 """Register a encoder/decoder for YAML serialization. 

311 

312 It is slower than JSON, but allows for more data types 

313 to be serialized. Useful if you need to send data such as dates 

314 

315 """ 

316 try: 

317 import yaml 

318 registry.register('yaml', yaml.safe_dump, yaml.safe_load, 

319 content_type='application/x-yaml', 

320 content_encoding='utf-8') 

321 except ImportError: 

322 

323 def not_available(*args, **kwargs): 

324 """Raise SerializerNotInstalled. 

325 

326 Used in case a client receives a yaml message, but yaml 

327 isn't installed. 

328 """ 

329 raise SerializerNotInstalled( 

330 'No decoder installed for YAML. Install the PyYAML library') 

331 registry.register('yaml', None, not_available, 'application/x-yaml') 

332 

333 

334if sys.version_info[0] == 3: # pragma: no cover 

335 

336 def unpickle(s): 

337 return pickle_loads(str_to_bytes(s)) 

338 

339else: 

340 unpickle = pickle_loads # noqa 

341 

342 

343def register_pickle(): 

344 """Register pickle serializer. 

345 

346 The fastest serialization method, but restricts 

347 you to python clients. 

348 """ 

349 def pickle_dumps(obj, dumper=pickle.dumps): 

350 return dumper(obj, protocol=pickle_protocol) 

351 

352 registry.register('pickle', pickle_dumps, unpickle, 

353 content_type='application/x-python-serialize', 

354 content_encoding='binary') 

355 

356 

357def register_msgpack(): 

358 """Register msgpack serializer. 

359 

360 See Also: 

361 https://msgpack.org/. 

362 """ 

363 pack = unpack = None 

364 try: 

365 import msgpack 

366 if msgpack.version >= (0, 4): 

367 from msgpack import packb, unpackb 

368 

369 def pack(s): 

370 return packb(s, use_bin_type=True) 

371 

372 def unpack(s): 

373 return unpackb(s, raw=False) 

374 else: 

375 def version_mismatch(*args, **kwargs): 

376 raise SerializerNotInstalled( 

377 'msgpack requires msgpack-python >= 0.4.0') 

378 pack = unpack = version_mismatch 

379 except (ImportError, ValueError): 

380 def not_available(*args, **kwargs): 

381 raise SerializerNotInstalled( 

382 'No decoder installed for msgpack. ' 

383 'Please install the msgpack-python library') 

384 pack = unpack = not_available 

385 registry.register( 

386 'msgpack', pack, unpack, 

387 content_type='application/x-msgpack', 

388 content_encoding='binary', 

389 ) 

390 

391 

392# Register the base serialization methods. 

393register_json() 

394register_pickle() 

395register_yaml() 

396register_msgpack() 

397 

398# Default serializer is 'json' 

399registry._set_default_serializer('json') 

400 

401 

402_setupfuns = { 

403 'json': register_json, 

404 'pickle': register_pickle, 

405 'yaml': register_yaml, 

406 'msgpack': register_msgpack, 

407 'application/json': register_json, 

408 'application/x-yaml': register_yaml, 

409 'application/x-python-serialize': register_pickle, 

410 'application/x-msgpack': register_msgpack, 

411} 

412 

413NOTSET = object() 

414 

415 

416def enable_insecure_serializers(choices=NOTSET): 

417 """Enable serializers that are considered to be unsafe. 

418 

419 Note: 

420 Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you 

421 can also specify a list of serializers (by name or content type) 

422 to enable. 

423 """ 

424 choices = ['pickle', 'yaml', 'msgpack'] if choices is NOTSET else choices 

425 if choices is not None: 

426 for choice in choices: 

427 try: 

428 registry.enable(choice) 

429 except KeyError: 

430 pass 

431 

432 

433def disable_insecure_serializers(allowed=NOTSET): 

434 """Disable untrusted serializers. 

435 

436 Will disable all serializers except ``json`` 

437 or you can specify a list of deserializers to allow. 

438 

439 Note: 

440 Producers will still be able to serialize data 

441 in these formats, but consumers will not accept 

442 incoming data using the untrusted content types. 

443 """ 

444 allowed = ['json'] if allowed is NOTSET else allowed 

445 for name in registry._decoders: 

446 registry.disable(name) 

447 if allowed is not None: 

448 for name in allowed: 

449 registry.enable(name) 

450 

451 

452# Insecure serializers are disabled by default since v3.0 

453disable_insecure_serializers() 

454 

455# Load entrypoints from installed extensions 

456for ep, args in entrypoints('kombu.serializers'): # pragma: no cover 

457 register(ep.name, *args) 

458 

459 

460def prepare_accept_content(content_types, name_to_type=None): 

461 name_to_type = registry.name_to_type if not name_to_type else name_to_type 

462 if content_types is not None: 

463 return {n if '/' in n else name_to_type[n] for n in content_types} 

464 return content_types