hat.syslog.handler

Syslog hadler

Implementation of logging.Handler for syslog logging protocol.

  1"""Syslog hadler
  2
  3Implementation of `logging.Handler` for syslog logging protocol.
  4
  5"""
  6
  7import collections
  8import contextlib
  9import datetime
 10import logging
 11import os
 12import socket
 13import ssl
 14import sys
 15import threading
 16import time
 17import traceback
 18import typing
 19
 20from hat import json
 21
 22from hat.syslog import common
 23from hat.syslog import encoder
 24
 25
 26class SysLogHandler(logging.Handler):
 27    """Syslog handler
 28
 29    Args:
 30        host: remote host name
 31        port: remote TCP/UDP port
 32        comm_type: communication type
 33        queue_size: message queue size
 34        reconnect_delay: delay in seconds before retrying connection with
 35            remote syslog server
 36
 37    """
 38
 39    def __init__(self,
 40                 host: str,
 41                 port: int,
 42                 comm_type: typing.Union[common.CommType, str],
 43                 queue_size: int = 1024,
 44                 reconnect_delay: float = 5):
 45        super().__init__()
 46        self.__state = _ThreadState(
 47            host=host,
 48            port=port,
 49            comm_type=(common.CommType[comm_type]
 50                       if not isinstance(comm_type, common.CommType)
 51                       else comm_type),
 52            queue=collections.deque(),
 53            queue_size=queue_size,
 54            reconnect_delay=reconnect_delay,
 55            cv=threading.Condition(),
 56            closed=threading.Event(),
 57            dropped=[0])
 58        self.__thread = threading.Thread(
 59            target=_logging_handler_thread,
 60            args=(self.__state, ),
 61            daemon=True)
 62        self.__thread.start()
 63
 64    def close(self):
 65        """"See `logging.Handler.close`"""
 66        state = self.__state
 67        with state.cv:
 68            if state.closed.is_set():
 69                return
 70            state.closed.set()
 71            with contextlib.suppress(Exception):
 72                # workaround for errors/0001.txt
 73                state.cv.notify_all()
 74
 75    def emit(self, record):
 76        """"See `logging.Handler.emit`"""
 77        msg = _record_to_msg(record)
 78        state = self.__state
 79        with state.cv:
 80            if state.closed.is_set():
 81                return
 82            state.queue.append(msg)
 83            while len(state.queue) > state.queue_size:
 84                state.queue.popleft()
 85                state.dropped[0] += 1
 86            with contextlib.suppress(Exception):
 87                # workaround for errors/0001.txt
 88                state.cv.notify_all()
 89
 90
 91class _ThreadState(typing.NamedTuple):
 92    """Handler thread state"""
 93    host: str
 94    """Hostname"""
 95    port: int
 96    """TCP port"""
 97    comm_type: typing.Union[common.CommType, str]
 98    """Communication type"""
 99    queue: collections.deque
100    """Message queue"""
101    queue_size: int
102    """Message queue size"""
103    reconnect_delay: float
104    """Reconnect delay"""
105    cv: threading.Condition
106    """Conditional variable"""
107    closed: threading.Event
108    """Closed flag"""
109    dropped: typing.List[int]
110    """Dropped message counter"""
111
112
113def _logging_handler_thread(state):
114    msg = None
115    if state.comm_type == common.CommType.SSL:
116        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
117        ctx.check_hostname = False
118        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
119    while not state.closed.is_set():
120        try:
121            if state.comm_type == common.CommType.UDP:
122                s = socket.socket(type=socket.SOCK_DGRAM)
123                s.connect((state.host, state.port))
124            elif state.comm_type == common.CommType.TCP:
125                s = socket.create_connection((state.host, state.port))
126                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
127            elif state.comm_type == common.CommType.SSL:
128                s = ctx.wrap_socket(socket.create_connection(
129                    (state.host, state.port)))
130                s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
131            else:
132                raise NotImplementedError()
133        except Exception:
134            time.sleep(state.reconnect_delay)
135            continue
136        try:
137            while True:
138                if not msg:
139                    with state.cv:
140                        state.cv.wait_for(lambda: (state.closed.is_set() or
141                                                   len(state.queue) or
142                                                   state.dropped[0]))
143                        if state.closed.is_set():
144                            return
145                        if state.dropped[0]:
146                            msg = _create_dropped_msg(
147                                state.dropped[0], '_logging_handler_thread', 0)
148                            state.dropped[0] = 0
149                        else:
150                            msg = state.queue.popleft()
151                msg_bytes = encoder.msg_to_str(msg).encode()
152                s.send(f'{len(msg_bytes)} '.encode() + msg_bytes)
153                msg = None
154        except Exception:
155            pass
156        finally:
157            with contextlib.suppress(Exception):
158                s.close()
159
160
161def _record_to_msg(record):
162    exc_info = ''
163    with contextlib.suppress(Exception):
164        if record.exc_info:
165            exc_info = ''.join(
166                traceback.TracebackException(*record.exc_info).format())
167    return common.Msg(
168        facility=common.Facility.USER,
169        severity={
170            logging.NOTSET: common.Severity.INFORMATIONAL,
171            logging.DEBUG: common.Severity.DEBUG,
172            logging.INFO: common.Severity.INFORMATIONAL,
173            logging.WARNING: common.Severity.WARNING,
174            logging.ERROR: common.Severity.ERROR,
175            logging.CRITICAL: common.Severity.CRITICAL}[record.levelno],
176        version=1,
177        timestamp=record.created,
178        hostname=socket.gethostname(),
179        app_name=sys.argv[0],  # record.processName
180        procid=str(record.process) if record.process else None,
181        msgid=record.name[:32],
182        data=json.encode({
183            'hat@1': {
184                'name': str(record.name),
185                'thread': str(record.thread),
186                'funcName': str(record.funcName),
187                'lineno': str(record.lineno),
188                'exc_info': exc_info}}),
189        msg=record.getMessage())
190
191
192def _create_dropped_msg(dropped, func_name, lineno):
193    return common.Msg(
194        facility=common.Facility.USER,
195        severity=common.Severity.ERROR,
196        version=1,
197        timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
198        hostname=socket.gethostname(),
199        app_name=sys.argv[0],  # record.processName
200        procid=str(os.getpid()),
201        msgid=__name__,
202        data=json.encode({
203            'hat@1': {
204                'name': __name__,
205                'thread': str(threading.get_ident()),
206                'funcName': str(func_name),
207                'lineno': str(lineno),
208                'exc_info': ''}}),
209        msg=f'dropped {dropped} log messages')
class SysLogHandler(logging.Handler):
27class SysLogHandler(logging.Handler):
28    """Syslog handler
29
30    Args:
31        host: remote host name
32        port: remote TCP/UDP port
33        comm_type: communication type
34        queue_size: message queue size
35        reconnect_delay: delay in seconds before retrying connection with
36            remote syslog server
37
38    """
39
40    def __init__(self,
41                 host: str,
42                 port: int,
43                 comm_type: typing.Union[common.CommType, str],
44                 queue_size: int = 1024,
45                 reconnect_delay: float = 5):
46        super().__init__()
47        self.__state = _ThreadState(
48            host=host,
49            port=port,
50            comm_type=(common.CommType[comm_type]
51                       if not isinstance(comm_type, common.CommType)
52                       else comm_type),
53            queue=collections.deque(),
54            queue_size=queue_size,
55            reconnect_delay=reconnect_delay,
56            cv=threading.Condition(),
57            closed=threading.Event(),
58            dropped=[0])
59        self.__thread = threading.Thread(
60            target=_logging_handler_thread,
61            args=(self.__state, ),
62            daemon=True)
63        self.__thread.start()
64
65    def close(self):
66        """"See `logging.Handler.close`"""
67        state = self.__state
68        with state.cv:
69            if state.closed.is_set():
70                return
71            state.closed.set()
72            with contextlib.suppress(Exception):
73                # workaround for errors/0001.txt
74                state.cv.notify_all()
75
76    def emit(self, record):
77        """"See `logging.Handler.emit`"""
78        msg = _record_to_msg(record)
79        state = self.__state
80        with state.cv:
81            if state.closed.is_set():
82                return
83            state.queue.append(msg)
84            while len(state.queue) > state.queue_size:
85                state.queue.popleft()
86                state.dropped[0] += 1
87            with contextlib.suppress(Exception):
88                # workaround for errors/0001.txt
89                state.cv.notify_all()

Syslog handler

Arguments:
  • host: remote host name
  • port: remote TCP/UDP port
  • comm_type: communication type
  • queue_size: message queue size
  • reconnect_delay: delay in seconds before retrying connection with remote syslog server
SysLogHandler( host: str, port: int, comm_type: Union[hat.syslog.common.CommType, str], queue_size: int = 1024, reconnect_delay: float = 5)
40    def __init__(self,
41                 host: str,
42                 port: int,
43                 comm_type: typing.Union[common.CommType, str],
44                 queue_size: int = 1024,
45                 reconnect_delay: float = 5):
46        super().__init__()
47        self.__state = _ThreadState(
48            host=host,
49            port=port,
50            comm_type=(common.CommType[comm_type]
51                       if not isinstance(comm_type, common.CommType)
52                       else comm_type),
53            queue=collections.deque(),
54            queue_size=queue_size,
55            reconnect_delay=reconnect_delay,
56            cv=threading.Condition(),
57            closed=threading.Event(),
58            dropped=[0])
59        self.__thread = threading.Thread(
60            target=_logging_handler_thread,
61            args=(self.__state, ),
62            daemon=True)
63        self.__thread.start()

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def close(self):
65    def close(self):
66        """"See `logging.Handler.close`"""
67        state = self.__state
68        with state.cv:
69            if state.closed.is_set():
70                return
71            state.closed.set()
72            with contextlib.suppress(Exception):
73                # workaround for errors/0001.txt
74                state.cv.notify_all()

"See logging.Handler.close

def emit(self, record):
76    def emit(self, record):
77        """"See `logging.Handler.emit`"""
78        msg = _record_to_msg(record)
79        state = self.__state
80        with state.cv:
81            if state.closed.is_set():
82                return
83            state.queue.append(msg)
84            while len(state.queue) > state.queue_size:
85                state.queue.popleft()
86                state.dropped[0] += 1
87            with contextlib.suppress(Exception):
88                # workaround for errors/0001.txt
89                state.cv.notify_all()

"See logging.Handler.emit

Inherited Members
logging.Handler
get_name
set_name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
handleError
logging.Filterer
addFilter
removeFilter
filter