hat.syslog.server.syslog
Syslog server implementation
1"""Syslog server implementation""" 2 3from pathlib import Path 4import asyncio.sslproto 5import contextlib 6import datetime 7import functools 8import logging 9import ssl 10import typing 11import urllib.parse 12 13from hat import aio 14 15from hat.syslog.server import encoder 16import hat.syslog.server.backend 17 18 19mlog: logging.Logger = logging.getLogger(__name__) 20"""Module logger""" 21 22 23async def create_syslog_server(addr: str, 24 pem: typing.Optional[Path], 25 backend: hat.syslog.server.backend.Backend 26 ) -> 'SysLogServer': 27 """Create syslog server""" 28 addr = urllib.parse.urlparse(addr) 29 if addr.scheme == 'ssl': 30 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 31 ssl_ctx.load_cert_chain(pem) 32 else: 33 ssl_ctx = None 34 35 async_group = aio.Group() 36 srv = await asyncio.start_server( 37 functools.partial(async_group.spawn, _client_loop, backend), 38 addr.hostname, addr.port, ssl=ssl_ctx) 39 async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv) 40 41 mlog.debug('listening for syslog clients on %s:%s', 42 addr.hostname, addr.port) 43 44 srv = SysLogServer() 45 srv._async_group = async_group 46 return srv 47 48 49class SysLogServer(aio.Resource): 50 """Syslog server 51 52 For creating new instance see :func:`create_syslog_server`. 53 54 """ 55 56 @property 57 def async_group(self) -> aio.Group: 58 """Async group""" 59 return self._async_group 60 61 62async def _client_loop(backend, reader, writer): 63 try: 64 while True: 65 size = await reader.readuntil(b' ') 66 buff = await reader.readexactly(int(size[:-1])) 67 t = datetime.datetime.now(tz=datetime.timezone.utc).timestamp() 68 msg = encoder.msg_from_str(buff.decode()) 69 mlog.debug("received new syslog message") 70 await backend.register(t, msg) 71 except asyncio.IncompleteReadError: 72 pass 73 except Exception as e: 74 mlog.warning('syslog client error: %s', e, exc_info=e) 75 finally: 76 # BUGFIX 77 if isinstance(writer.transport, 78 asyncio.sslproto._SSLProtocolTransport): 79 # TODO for SSL connection Protocol.connection_lost is never called 80 writer.close() 81 await aio.uncancellable(asyncio.sleep(0.001)) 82 else: 83 await aio.uncancellable(_asyncio_async_close(writer)) 84 mlog.debug('syslog client connection closed') 85 86 87async def _asyncio_async_close(x): 88 with contextlib.suppress(Exception): 89 x.close() 90 await x.wait_closed()
Module logger
async def
create_syslog_server( addr: str, pem: Optional[pathlib.Path], backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.syslog.SysLogServer:
24async def create_syslog_server(addr: str, 25 pem: typing.Optional[Path], 26 backend: hat.syslog.server.backend.Backend 27 ) -> 'SysLogServer': 28 """Create syslog server""" 29 addr = urllib.parse.urlparse(addr) 30 if addr.scheme == 'ssl': 31 ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 32 ssl_ctx.load_cert_chain(pem) 33 else: 34 ssl_ctx = None 35 36 async_group = aio.Group() 37 srv = await asyncio.start_server( 38 functools.partial(async_group.spawn, _client_loop, backend), 39 addr.hostname, addr.port, ssl=ssl_ctx) 40 async_group.spawn(aio.call_on_cancel, _asyncio_async_close, srv) 41 42 mlog.debug('listening for syslog clients on %s:%s', 43 addr.hostname, addr.port) 44 45 srv = SysLogServer() 46 srv._async_group = async_group 47 return srv
Create syslog server
class
SysLogServer(hat.aio.Resource):
50class SysLogServer(aio.Resource): 51 """Syslog server 52 53 For creating new instance see :func:`create_syslog_server`. 54 55 """ 56 57 @property 58 def async_group(self) -> aio.Group: 59 """Async group""" 60 return self._async_group
Syslog server
For creating new instance see create_syslog_server()
.
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close