hat.syslog.server.main

Syslog Server main module

  1"""Syslog Server main module"""
  2
  3from pathlib import Path
  4import argparse
  5import asyncio
  6import contextlib
  7import logging.config
  8import sys
  9import time
 10import typing
 11
 12import appdirs
 13
 14from hat import aio
 15from hat import json
 16
 17from hat.syslog.server import common
 18from hat.syslog.server.backend import create_backend
 19from hat.syslog.server.syslog import create_syslog_server
 20from hat.syslog.server.ui import create_web_server
 21
 22
 23mlog: logging.Logger = logging.getLogger('hat.syslog.server.main')
 24"""Module logger"""
 25
 26user_data_dir: Path = Path(appdirs.user_data_dir('hat'))
 27"""User data directory path"""
 28
 29user_conf_dir: Path = Path(appdirs.user_config_dir('hat'))
 30"""User configuration directory path"""
 31
 32default_syslog_addr: str = 'tcp://0.0.0.0:6514'
 33"""Default syslog listening address"""
 34
 35default_ui_addr: str = 'http://0.0.0.0:23020'
 36"""Default UI listening address"""
 37
 38default_db_path: Path = user_data_dir / 'syslog.db'
 39"""Default DB file path"""
 40
 41default_db_low_size: int = int(1e6)
 42"""Default DB low size count"""
 43
 44default_db_high_size: int = int(1e7)
 45"""Default DB high size count"""
 46
 47
 48def create_argument_parser() -> argparse.ArgumentParser:
 49    """Create argument parser"""
 50    parser = argparse.ArgumentParser()
 51    parser.add_argument(
 52        '--conf', metavar='PATH', type=Path,
 53        help="configuration defined by hat-syslog://server.yaml# "
 54             "(default $XDG_CONFIG_HOME/hat/syslog.{yaml|yml|json})")
 55    parser.add_argument(
 56        '--log', metavar='LEVEL', type=str,
 57        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
 58        help="console log level")
 59    parser.add_argument(
 60        '--syslog-addr', metavar='ADDR', type=str,
 61        help=f"syslog listening address (default {default_syslog_addr})")
 62    parser.add_argument(
 63        '--syslog-pem', metavar='PATH', type=Path,
 64        help="pem file path - mandatory for ssl communication")
 65    parser.add_argument(
 66        '--ui-addr', metavar='ADDR', type=str,
 67        help=f"UI listening address (default {default_ui_addr})")
 68    parser.add_argument(
 69        '--db-path', metavar='PATH', type=Path,
 70        help="sqlite database file path "
 71             "(default $XDG_DATA_HOME/hat/syslog.db)")
 72    parser.add_argument(
 73        '--db-low-size', metavar='N', type=int,
 74        help=f"number of messages kept in database after database "
 75             f"cleanup (default {default_db_low_size})")
 76    parser.add_argument(
 77        '--db-high-size', metavar='N', type=int,
 78        help=f"number of messages that will trigger database cleanup "
 79             f"(default {default_db_high_size})")
 80    parser.add_argument(
 81        '--db-enable-archive', action='store_true',
 82        help="should messages, deleted during database cleanup, be kept "
 83             "in archive files")
 84    parser.add_argument(
 85        '--db-disable-journal', action='store_true',
 86        help="disable sqlite jurnaling")
 87    return parser
 88
 89
 90def main():
 91    """Syslog Server"""
 92    parser = create_argument_parser()
 93    args = parser.parse_args()
 94
 95    conf_path = args.conf
 96    if not conf_path:
 97        for suffix in ('.yaml', '.yml', '.json'):
 98            conf_path = (user_conf_dir / 'syslog').with_suffix(suffix)
 99            if conf_path.exists():
100                break
101        else:
102            conf_path = None
103
104    if conf_path == Path('-'):
105        conf = json.decode_stream(sys.stdin)
106    elif conf_path:
107        conf = json.decode_file(conf_path)
108    else:
109        conf = None
110
111    if conf:
112        common.json_schema_repo.validate('hat-syslog://server.yaml#', conf)
113
114    if args.log:
115        log_conf = _get_console_log_conf(args.log)
116    elif conf:
117        log_conf = conf['log']
118    else:
119        log_conf = {'version': 1}
120
121    logging.config.dictConfig(log_conf)
122
123    syslog_addr = (args.syslog_addr if args.syslog_addr else
124                   conf['syslog_addr'] if conf else
125                   default_syslog_addr)
126
127    syslog_pem = (
128        args.syslog_pem if args.syslog_pem else
129        Path(conf['syslog_pem']) if conf and 'syslog_pem' in conf else
130        None)
131
132    ui_addr = (args.ui_addr if args.ui_addr else
133               conf['ui_addr'] if conf else
134               default_ui_addr)
135
136    db_path = (args.db_path if args.db_path else
137               Path(conf['db_path']) if conf else
138               default_db_path)
139
140    db_low_size = (args.db_low_size if args.db_low_size is not None else
141                   conf['db_low_size'] if conf else
142                   default_db_low_size)
143
144    db_high_size = (args.db_high_size if args.db_high_size is not None else
145                    conf['db_high_size'] if conf else
146                    default_db_high_size)
147
148    db_enable_archive = (True if args.db_enable_archive else
149                         conf['db_enable_archive'] if conf else
150                         False)
151
152    db_disable_journal = (True if args.db_disable_journal else
153                          conf['db_disable_journal'] if conf else
154                          False)
155
156    aio.init_asyncio()
157    with contextlib.suppress(asyncio.CancelledError):
158        aio.run_asyncio(async_main(syslog_addr=syslog_addr,
159                                   syslog_pem=syslog_pem,
160                                   ui_addr=ui_addr,
161                                   db_path=db_path,
162                                   db_low_size=db_low_size,
163                                   db_high_size=db_high_size,
164                                   db_enable_archive=db_enable_archive,
165                                   db_disable_journal=db_disable_journal))
166
167
168async def async_main(syslog_addr: str,
169                     syslog_pem: typing.Optional[Path],
170                     ui_addr: str,
171                     db_path: Path,
172                     db_low_size: int,
173                     db_high_size: int,
174                     db_enable_archive: bool,
175                     db_disable_journal: bool):
176    """Syslog Server async main"""
177    async_group = aio.Group()
178
179    async def on_msg(msg):
180        await backend.register(time.time(), msg)
181
182    async def async_close():
183        await async_group.async_close()
184        await asyncio.sleep(0.1)
185
186    try:
187        mlog.debug("creating backend...")
188        backend = await _create_resource(async_group, create_backend,
189                                         db_path, db_low_size, db_high_size,
190                                         db_enable_archive, db_disable_journal)
191
192        mlog.debug("creating web server...")
193        await _create_resource(async_group, create_web_server, ui_addr,
194                               backend)
195
196        mlog.debug("creating syslog server...")
197        await _create_resource(async_group, create_syslog_server, syslog_addr,
198                               on_msg, syslog_pem)
199
200        mlog.debug("initialization done")
201        await async_group.wait_closing()
202
203    finally:
204        mlog.debug("closing...")
205        await aio.uncancellable(async_close())
206
207
208def _get_console_log_conf(level):
209    return {
210        'version': 1,
211        'formatters': {
212            'syslog_server_console': {
213                'format': '[%(asctime)s %(levelname)s %(name)s] %(message)s'}},
214        'handlers': {
215            'syslog_server_console': {
216                'class': 'logging.StreamHandler',
217                'formatter': 'syslog_server_console',
218                'level': level}},
219        'loggers': {
220            'hat.syslog': {
221                'level': level}},
222        'root': {
223            'level': 'INFO' if level == 'DEBUG' else level,
224            'handlers': ['syslog_server_console']},
225        'disable_existing_loggers': False}
226
227
228async def _create_resource(async_group, fn, *args):
229    resource = await async_group.spawn(fn, *args)
230    async_group.spawn(aio.call_on_cancel, resource.async_close)
231    async_group.spawn(aio.call_on_done, resource.wait_closing(),
232                      async_group.close)
233    return resource
234
235
236if __name__ == '__main__':
237    sys.argv[0] = 'hat-syslog-server'
238    sys.exit(main())
mlog: logging.Logger = <Logger hat.syslog.server.main (WARNING)>

Module logger

user_data_dir: pathlib.Path = PosixPath('/home/bozo/.local/share/hat')

User data directory path

user_conf_dir: pathlib.Path = PosixPath('/home/bozo/.config/hat')

User configuration directory path

default_syslog_addr: str = 'tcp://0.0.0.0:6514'

Default syslog listening address

default_ui_addr: str = 'http://0.0.0.0:23020'

Default UI listening address

default_db_path: pathlib.Path = PosixPath('/home/bozo/.local/share/hat/syslog.db')

Default DB file path

default_db_low_size: int = 1000000

Default DB low size count

default_db_high_size: int = 10000000

Default DB high size count

def create_argument_parser() -> argparse.ArgumentParser:
49def create_argument_parser() -> argparse.ArgumentParser:
50    """Create argument parser"""
51    parser = argparse.ArgumentParser()
52    parser.add_argument(
53        '--conf', metavar='PATH', type=Path,
54        help="configuration defined by hat-syslog://server.yaml# "
55             "(default $XDG_CONFIG_HOME/hat/syslog.{yaml|yml|json})")
56    parser.add_argument(
57        '--log', metavar='LEVEL', type=str,
58        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
59        help="console log level")
60    parser.add_argument(
61        '--syslog-addr', metavar='ADDR', type=str,
62        help=f"syslog listening address (default {default_syslog_addr})")
63    parser.add_argument(
64        '--syslog-pem', metavar='PATH', type=Path,
65        help="pem file path - mandatory for ssl communication")
66    parser.add_argument(
67        '--ui-addr', metavar='ADDR', type=str,
68        help=f"UI listening address (default {default_ui_addr})")
69    parser.add_argument(
70        '--db-path', metavar='PATH', type=Path,
71        help="sqlite database file path "
72             "(default $XDG_DATA_HOME/hat/syslog.db)")
73    parser.add_argument(
74        '--db-low-size', metavar='N', type=int,
75        help=f"number of messages kept in database after database "
76             f"cleanup (default {default_db_low_size})")
77    parser.add_argument(
78        '--db-high-size', metavar='N', type=int,
79        help=f"number of messages that will trigger database cleanup "
80             f"(default {default_db_high_size})")
81    parser.add_argument(
82        '--db-enable-archive', action='store_true',
83        help="should messages, deleted during database cleanup, be kept "
84             "in archive files")
85    parser.add_argument(
86        '--db-disable-journal', action='store_true',
87        help="disable sqlite jurnaling")
88    return parser

Create argument parser

def main():
 91def main():
 92    """Syslog Server"""
 93    parser = create_argument_parser()
 94    args = parser.parse_args()
 95
 96    conf_path = args.conf
 97    if not conf_path:
 98        for suffix in ('.yaml', '.yml', '.json'):
 99            conf_path = (user_conf_dir / 'syslog').with_suffix(suffix)
100            if conf_path.exists():
101                break
102        else:
103            conf_path = None
104
105    if conf_path == Path('-'):
106        conf = json.decode_stream(sys.stdin)
107    elif conf_path:
108        conf = json.decode_file(conf_path)
109    else:
110        conf = None
111
112    if conf:
113        common.json_schema_repo.validate('hat-syslog://server.yaml#', conf)
114
115    if args.log:
116        log_conf = _get_console_log_conf(args.log)
117    elif conf:
118        log_conf = conf['log']
119    else:
120        log_conf = {'version': 1}
121
122    logging.config.dictConfig(log_conf)
123
124    syslog_addr = (args.syslog_addr if args.syslog_addr else
125                   conf['syslog_addr'] if conf else
126                   default_syslog_addr)
127
128    syslog_pem = (
129        args.syslog_pem if args.syslog_pem else
130        Path(conf['syslog_pem']) if conf and 'syslog_pem' in conf else
131        None)
132
133    ui_addr = (args.ui_addr if args.ui_addr else
134               conf['ui_addr'] if conf else
135               default_ui_addr)
136
137    db_path = (args.db_path if args.db_path else
138               Path(conf['db_path']) if conf else
139               default_db_path)
140
141    db_low_size = (args.db_low_size if args.db_low_size is not None else
142                   conf['db_low_size'] if conf else
143                   default_db_low_size)
144
145    db_high_size = (args.db_high_size if args.db_high_size is not None else
146                    conf['db_high_size'] if conf else
147                    default_db_high_size)
148
149    db_enable_archive = (True if args.db_enable_archive else
150                         conf['db_enable_archive'] if conf else
151                         False)
152
153    db_disable_journal = (True if args.db_disable_journal else
154                          conf['db_disable_journal'] if conf else
155                          False)
156
157    aio.init_asyncio()
158    with contextlib.suppress(asyncio.CancelledError):
159        aio.run_asyncio(async_main(syslog_addr=syslog_addr,
160                                   syslog_pem=syslog_pem,
161                                   ui_addr=ui_addr,
162                                   db_path=db_path,
163                                   db_low_size=db_low_size,
164                                   db_high_size=db_high_size,
165                                   db_enable_archive=db_enable_archive,
166                                   db_disable_journal=db_disable_journal))

Syslog Server

async def async_main( syslog_addr: str, syslog_pem: Optional[pathlib.Path], ui_addr: str, db_path: pathlib.Path, db_low_size: int, db_high_size: int, db_enable_archive: bool, db_disable_journal: bool):
169async def async_main(syslog_addr: str,
170                     syslog_pem: typing.Optional[Path],
171                     ui_addr: str,
172                     db_path: Path,
173                     db_low_size: int,
174                     db_high_size: int,
175                     db_enable_archive: bool,
176                     db_disable_journal: bool):
177    """Syslog Server async main"""
178    async_group = aio.Group()
179
180    async def on_msg(msg):
181        await backend.register(time.time(), msg)
182
183    async def async_close():
184        await async_group.async_close()
185        await asyncio.sleep(0.1)
186
187    try:
188        mlog.debug("creating backend...")
189        backend = await _create_resource(async_group, create_backend,
190                                         db_path, db_low_size, db_high_size,
191                                         db_enable_archive, db_disable_journal)
192
193        mlog.debug("creating web server...")
194        await _create_resource(async_group, create_web_server, ui_addr,
195                               backend)
196
197        mlog.debug("creating syslog server...")
198        await _create_resource(async_group, create_syslog_server, syslog_addr,
199                               on_msg, syslog_pem)
200
201        mlog.debug("initialization done")
202        await async_group.wait_closing()
203
204    finally:
205        mlog.debug("closing...")
206        await aio.uncancellable(async_close())

Syslog Server async main