hat.syslog.server.encoder
Data structures encoder/decoder
1"""Data structures encoder/decoder""" 2 3from hat import json 4 5from hat.syslog.encoder import (msg_from_json, 6 msg_to_json) 7from hat.syslog.encoder import * # NOQA 8 9# common imported after base encoder to replace base encoder's common' 10from hat.syslog.server import common 11 12 13def filter_to_json(filter: common.Filter) -> json.Data: 14 """Convert filter to json data""" 15 return dict(filter._asdict(), 16 facility=filter.facility.name if filter.facility else None, 17 severity=filter.severity.name if filter.severity else None) 18 19 20def filter_from_json(json_filter: json.Data) -> common.Filter: 21 """Create filter from json data""" 22 return common.Filter(**dict( 23 json_filter, 24 facility=(common.Facility[json_filter['facility']] 25 if json_filter['facility'] else None), 26 severity=(common.Severity[json_filter['severity']] 27 if json_filter['severity'] else None))) 28 29 30def entry_to_json(entry: common.Entry) -> json.Data: 31 """Convert entry to json data""" 32 return dict(entry._asdict(), 33 msg=msg_to_json(entry.msg)) 34 35 36def entry_from_json(json_entry: json.Data) -> common.Entry: 37 """Create entry from json data""" 38 return common.Entry(**dict( 39 json_entry, 40 msg=msg_from_json(json_entry['msg'])))
14def filter_to_json(filter: common.Filter) -> json.Data: 15 """Convert filter to json data""" 16 return dict(filter._asdict(), 17 facility=filter.facility.name if filter.facility else None, 18 severity=filter.severity.name if filter.severity else None)
Convert filter to json data
21def filter_from_json(json_filter: json.Data) -> common.Filter: 22 """Create filter from json data""" 23 return common.Filter(**dict( 24 json_filter, 25 facility=(common.Facility[json_filter['facility']] 26 if json_filter['facility'] else None), 27 severity=(common.Severity[json_filter['severity']] 28 if json_filter['severity'] else None)))
Create filter from json data
31def entry_to_json(entry: common.Entry) -> json.Data: 32 """Convert entry to json data""" 33 return dict(entry._asdict(), 34 msg=msg_to_json(entry.msg))
Convert entry to json data
37def entry_from_json(json_entry: json.Data) -> common.Entry: 38 """Create entry from json data""" 39 return common.Entry(**dict( 40 json_entry, 41 msg=msg_from_json(json_entry['msg'])))
Create entry from json data