hat.syslog.server.syslog

Syslog server implementation

 1"""Syslog server implementation"""
 2
 3from pathlib import Path
 4import asyncio.sslproto
 5import contextlib
 6import datetime
 7import functools
 8import logging
 9import ssl
10import typing
11import urllib.parse
12
13from hat import aio
14
15from hat.syslog.server import encoder
16import hat.syslog.server.backend
17
18
19mlog: logging.Logger = logging.getLogger(__name__)
20"""Module logger"""
21
22
23async def create_syslog_server(addr: str,
24                               pem: typing.Optional[Path],
25                               backend: hat.syslog.server.backend.Backend
26                               ) -> 'SysLogServer':
27    """Create syslog server"""
28    addr = urllib.parse.urlparse(addr)
29    if addr.scheme == 'ssl':
30        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
31        ssl_ctx.load_cert_chain(pem)
32    else:
33        ssl_ctx = None
34
35    async_group = aio.Group()
36    srv = await asyncio.start_server(
37        functools.partial(async_group.spawn, _client_loop, backend),
38        addr.hostname, addr.port, ssl=ssl_ctx)
39    async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv)
40
41    mlog.debug('listening for syslog clients on %s:%s',
42               addr.hostname, addr.port)
43
44    srv = SysLogServer()
45    srv._async_group = async_group
46    return srv
47
48
49class SysLogServer(aio.Resource):
50    """Syslog server
51
52    For creating new instance see :func:`create_syslog_server`.
53
54    """
55
56    @property
57    def async_group(self) -> aio.Group:
58        """Async group"""
59        return self._async_group
60
61
62async def _client_loop(backend, reader, writer):
63    try:
64        while True:
65            size = await reader.readuntil(b' ')
66            buff = await reader.readexactly(int(size[:-1]))
67            t = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
68            msg = encoder.msg_from_str(buff.decode())
69            mlog.debug("received new syslog message")
70            await backend.register(t, msg)
71    except asyncio.IncompleteReadError:
72        pass
73    except Exception as e:
74        mlog.warning('syslog client error: %s', e, exc_info=e)
75    finally:
76        # BUGFIX
77        if isinstance(writer.transport,
78                      asyncio.sslproto._SSLProtocolTransport):
79            # TODO for SSL connection Protocol.connection_lost is never called
80            writer.close()
81            await aio.uncancellable(asyncio.sleep(0.001))
82        else:
83            await aio.uncancellable(_asyncio_async_close(writer))
84        mlog.debug('syslog client connection closed')
85
86
87async def _asyncio_async_close(x):
88    with contextlib.suppress(Exception):
89        x.close()
90    await x.wait_closed()
mlog: logging.Logger = <Logger hat.syslog.server.syslog (WARNING)>

Module logger

async def create_syslog_server( addr: str, pem: Optional[pathlib.Path], backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.syslog.SysLogServer:
24async def create_syslog_server(addr: str,
25                               pem: typing.Optional[Path],
26                               backend: hat.syslog.server.backend.Backend
27                               ) -> 'SysLogServer':
28    """Create syslog server"""
29    addr = urllib.parse.urlparse(addr)
30    if addr.scheme == 'ssl':
31        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
32        ssl_ctx.load_cert_chain(pem)
33    else:
34        ssl_ctx = None
35
36    async_group = aio.Group()
37    srv = await asyncio.start_server(
38        functools.partial(async_group.spawn, _client_loop, backend),
39        addr.hostname, addr.port, ssl=ssl_ctx)
40    async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv)
41
42    mlog.debug('listening for syslog clients on %s:%s',
43               addr.hostname, addr.port)
44
45    srv = SysLogServer()
46    srv._async_group = async_group
47    return srv

Create syslog server

class SysLogServer(hat.aio.Resource):
50class SysLogServer(aio.Resource):
51    """Syslog server
52
53    For creating new instance see :func:`create_syslog_server`.
54
55    """
56
57    @property
58    def async_group(self) -> aio.Group:
59        """Async group"""
60        return self._async_group

Syslog server

For creating new instance see create_syslog_server().

SysLogServer()
async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close