hat.syslog.handler
Syslog hadler
Implementation of logging.Handler
for syslog logging protocol.
1"""Syslog hadler 2 3Implementation of `logging.Handler` for syslog logging protocol. 4 5""" 6 7import collections 8import contextlib 9import datetime 10import logging 11import os 12import socket 13import ssl 14import sys 15import threading 16import time 17import traceback 18import typing 19 20from hat import json 21 22from hat.syslog import common 23from hat.syslog import encoder 24 25 26class SysLogHandler(logging.Handler): 27 """Syslog handler 28 29 Args: 30 host: remote host name 31 port: remote TCP/UDP port 32 comm_type: communication type 33 queue_size: message queue size 34 reconnect_delay: delay in seconds before retrying connection with 35 remote syslog server 36 37 """ 38 39 def __init__(self, 40 host: str, 41 port: int, 42 comm_type: typing.Union[common.CommType, str], 43 queue_size: int = 1024, 44 reconnect_delay: float = 5): 45 super().__init__() 46 self.__state = _ThreadState( 47 host=host, 48 port=port, 49 comm_type=(common.CommType[comm_type] 50 if not isinstance(comm_type, common.CommType) 51 else comm_type), 52 queue=collections.deque(), 53 queue_size=queue_size, 54 reconnect_delay=reconnect_delay, 55 cv=threading.Condition(), 56 closed=threading.Event(), 57 dropped=[0]) 58 self.__thread = threading.Thread( 59 target=_logging_handler_thread, 60 args=(self.__state, ), 61 daemon=True) 62 self.__thread.start() 63 64 def close(self): 65 """"See `logging.Handler.close`""" 66 state = self.__state 67 with state.cv: 68 if state.closed.is_set(): 69 return 70 state.closed.set() 71 with contextlib.suppress(Exception): 72 # workaround for errors/0001.txt 73 state.cv.notify_all() 74 75 def emit(self, record): 76 """"See `logging.Handler.emit`""" 77 msg = _record_to_msg(record) 78 state = self.__state 79 with state.cv: 80 if state.closed.is_set(): 81 return 82 state.queue.append(msg) 83 while len(state.queue) > state.queue_size: 84 state.queue.popleft() 85 state.dropped[0] += 1 86 with contextlib.suppress(Exception): 87 # workaround for errors/0001.txt 88 state.cv.notify_all() 89 90 91class _ThreadState(typing.NamedTuple): 92 """Handler thread state""" 93 host: str 94 """Hostname""" 95 port: int 96 """TCP port""" 97 comm_type: typing.Union[common.CommType, str] 98 """Communication type""" 99 queue: collections.deque 100 """Message queue""" 101 queue_size: int 102 """Message queue size""" 103 reconnect_delay: float 104 """Reconnect delay""" 105 cv: threading.Condition 106 """Conditional variable""" 107 closed: threading.Event 108 """Closed flag""" 109 dropped: typing.List[int] 110 """Dropped message counter""" 111 112 113def _logging_handler_thread(state): 114 msg = None 115 if state.comm_type == common.CommType.SSL: 116 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 117 ctx.check_hostname = False 118 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 119 while not state.closed.is_set(): 120 try: 121 if state.comm_type == common.CommType.UDP: 122 s = socket.socket(type=socket.SOCK_DGRAM) 123 s.connect((state.host, state.port)) 124 elif state.comm_type == common.CommType.TCP: 125 s = socket.create_connection((state.host, state.port)) 126 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 127 elif state.comm_type == common.CommType.SSL: 128 s = ctx.wrap_socket(socket.create_connection( 129 (state.host, state.port))) 130 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 131 else: 132 raise NotImplementedError() 133 except Exception: 134 time.sleep(state.reconnect_delay) 135 continue 136 try: 137 while True: 138 if not msg: 139 with state.cv: 140 state.cv.wait_for(lambda: (state.closed.is_set() or 141 len(state.queue) or 142 state.dropped[0])) 143 if state.closed.is_set(): 144 return 145 if state.dropped[0]: 146 msg = _create_dropped_msg( 147 state.dropped[0], '_logging_handler_thread', 0) 148 state.dropped[0] = 0 149 else: 150 msg = state.queue.popleft() 151 msg_bytes = encoder.msg_to_str(msg).encode() 152 s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) 153 msg = None 154 except Exception: 155 pass 156 finally: 157 with contextlib.suppress(Exception): 158 s.close() 159 160 161def _record_to_msg(record): 162 exc_info = '' 163 with contextlib.suppress(Exception): 164 if record.exc_info: 165 exc_info = ''.join( 166 traceback.TracebackException(*record.exc_info).format()) 167 return common.Msg( 168 facility=common.Facility.USER, 169 severity={ 170 logging.NOTSET: common.Severity.INFORMATIONAL, 171 logging.DEBUG: common.Severity.DEBUG, 172 logging.INFO: common.Severity.INFORMATIONAL, 173 logging.WARNING: common.Severity.WARNING, 174 logging.ERROR: common.Severity.ERROR, 175 logging.CRITICAL: common.Severity.CRITICAL}[record.levelno], 176 version=1, 177 timestamp=record.created, 178 hostname=socket.gethostname(), 179 app_name=sys.argv[0], # record.processName 180 procid=str(record.process) if record.process else None, 181 msgid=record.name[:32], 182 data=json.encode({ 183 'hat@1': { 184 'name': str(record.name), 185 'thread': str(record.thread), 186 'funcName': str(record.funcName), 187 'lineno': str(record.lineno), 188 'exc_info': exc_info}}), 189 msg=record.getMessage()) 190 191 192def _create_dropped_msg(dropped, func_name, lineno): 193 return common.Msg( 194 facility=common.Facility.USER, 195 severity=common.Severity.ERROR, 196 version=1, 197 timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(), 198 hostname=socket.gethostname(), 199 app_name=sys.argv[0], # record.processName 200 procid=str(os.getpid()), 201 msgid=__name__, 202 data=json.encode({ 203 'hat@1': { 204 'name': __name__, 205 'thread': str(threading.get_ident()), 206 'funcName': str(func_name), 207 'lineno': str(lineno), 208 'exc_info': ''}}), 209 msg=f'dropped {dropped} log messages')
class
SysLogHandler(logging.Handler):
27class SysLogHandler(logging.Handler): 28 """Syslog handler 29 30 Args: 31 host: remote host name 32 port: remote TCP/UDP port 33 comm_type: communication type 34 queue_size: message queue size 35 reconnect_delay: delay in seconds before retrying connection with 36 remote syslog server 37 38 """ 39 40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: typing.Union[common.CommType, str], 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 self.__state = _ThreadState( 48 host=host, 49 port=port, 50 comm_type=(common.CommType[comm_type] 51 if not isinstance(comm_type, common.CommType) 52 else comm_type), 53 queue=collections.deque(), 54 queue_size=queue_size, 55 reconnect_delay=reconnect_delay, 56 cv=threading.Condition(), 57 closed=threading.Event(), 58 dropped=[0]) 59 self.__thread = threading.Thread( 60 target=_logging_handler_thread, 61 args=(self.__state, ), 62 daemon=True) 63 self.__thread.start() 64 65 def close(self): 66 """"See `logging.Handler.close`""" 67 state = self.__state 68 with state.cv: 69 if state.closed.is_set(): 70 return 71 state.closed.set() 72 with contextlib.suppress(Exception): 73 # workaround for errors/0001.txt 74 state.cv.notify_all() 75 76 def emit(self, record): 77 """"See `logging.Handler.emit`""" 78 msg = _record_to_msg(record) 79 state = self.__state 80 with state.cv: 81 if state.closed.is_set(): 82 return 83 state.queue.append(msg) 84 while len(state.queue) > state.queue_size: 85 state.queue.popleft() 86 state.dropped[0] += 1 87 with contextlib.suppress(Exception): 88 # workaround for errors/0001.txt 89 state.cv.notify_all()
Syslog handler
Arguments:
- host: remote host name
- port: remote TCP/UDP port
- comm_type: communication type
- queue_size: message queue size
- reconnect_delay: delay in seconds before retrying connection with remote syslog server
SysLogHandler( host: str, port: int, comm_type: Union[hat.syslog.common.CommType, str], queue_size: int = 1024, reconnect_delay: float = 5)
40 def __init__(self, 41 host: str, 42 port: int, 43 comm_type: typing.Union[common.CommType, str], 44 queue_size: int = 1024, 45 reconnect_delay: float = 5): 46 super().__init__() 47 self.__state = _ThreadState( 48 host=host, 49 port=port, 50 comm_type=(common.CommType[comm_type] 51 if not isinstance(comm_type, common.CommType) 52 else comm_type), 53 queue=collections.deque(), 54 queue_size=queue_size, 55 reconnect_delay=reconnect_delay, 56 cv=threading.Condition(), 57 closed=threading.Event(), 58 dropped=[0]) 59 self.__thread = threading.Thread( 60 target=_logging_handler_thread, 61 args=(self.__state, ), 62 daemon=True) 63 self.__thread.start()
Initializes the instance - basically setting the formatter to None and the filter list to empty.
def
close(self):
65 def close(self): 66 """"See `logging.Handler.close`""" 67 state = self.__state 68 with state.cv: 69 if state.closed.is_set(): 70 return 71 state.closed.set() 72 with contextlib.suppress(Exception): 73 # workaround for errors/0001.txt 74 state.cv.notify_all()
"See logging.Handler.close
def
emit(self, record):
76 def emit(self, record): 77 """"See `logging.Handler.emit`""" 78 msg = _record_to_msg(record) 79 state = self.__state 80 with state.cv: 81 if state.closed.is_set(): 82 return 83 state.queue.append(msg) 84 while len(state.queue) > state.queue_size: 85 state.queue.popleft() 86 state.dropped[0] += 1 87 with contextlib.suppress(Exception): 88 # workaround for errors/0001.txt 89 state.cv.notify_all()
"See logging.Handler.emit
Inherited Members
- logging.Handler
- get_name
- set_name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- handleError
- logging.Filterer
- addFilter
- removeFilter
- filter