hat.syslog.server.ui

Web server implementation

  1"""Web server implementation"""
  2
  3import asyncio
  4import contextlib
  5import importlib
  6import itertools
  7import logging
  8import urllib
  9
 10from hat import aio
 11from hat import juggler
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import encoder
 15import hat.syslog.server.backend
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21max_results_limit: int = 200
 22"""Max results limit"""
 23
 24autoflush_delay: float = 0.2
 25"""Juggler autoflush delay"""
 26
 27default_filter = common.Filter(max_results=max_results_limit)
 28"""Default filter"""
 29
 30
 31async def create_web_server(addr: str,
 32                            backend: hat.syslog.server.backend.Backend
 33                            ) -> 'WebServer':
 34    """Create web server"""
 35    srv = WebServer()
 36    srv._backend = backend
 37    srv._locks = {}
 38    srv._filters = {}
 39
 40    exit_stack = contextlib.ExitStack()
 41    try:
 42        ui_path = exit_stack.enter_context(
 43            importlib.resources.path(__package__, 'ui'))
 44
 45        url = urllib.parse.urlparse(addr)
 46        srv._srv = await juggler.listen(host=url.hostname,
 47                                        port=url.port,
 48                                        connection_cb=srv._on_connection,
 49                                        request_cb=srv._on_request,
 50                                        static_dir=ui_path,
 51                                        autoflush_delay=autoflush_delay)
 52
 53        try:
 54            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
 55
 56        except BaseException:
 57            await aio.uncancellable(srv.async_close())
 58            raise
 59
 60    except BaseException:
 61        exit_stack.close()
 62        raise
 63
 64    mlog.debug("web server listening on %s", addr)
 65    return srv
 66
 67
 68class WebServer(aio.Resource):
 69
 70    @property
 71    def async_group(self) -> aio.Group:
 72        """Async group"""
 73        return self._srv.async_group
 74
 75    async def _on_connection(self, conn):
 76        try:
 77            mlog.debug("new connection")
 78
 79            self._locks[conn] = asyncio.Lock()
 80            self._filters[conn] = default_filter
 81
 82            change_queue = aio.Queue()
 83            with self._backend.register_change_cb(change_queue.put_nowait):
 84                async with self._locks[conn]:
 85                    prev_filter = self._filters[conn]
 86                    prev_filter_json = encoder.filter_to_json(prev_filter)
 87
 88                    entries = await self._backend.query(prev_filter)
 89                    entries_json = [encoder.entry_to_json(entry)
 90                                    for entry in entries]
 91
 92                    conn.state.set([], {'filter': prev_filter_json,
 93                                        'entries': entries_json,
 94                                        'first_id': self._backend.first_id,
 95                                        'last_id': self._backend.last_id})
 96
 97                while True:
 98                    entries = await change_queue.get()
 99
100                    async with self._locks[conn]:
101                        prev_filter = self._filters[conn]
102                        prev_filter_json = conn.state.get('filter')
103                        prev_entries_json = conn.state.get('entries')
104
105                        previous_id = (prev_entries_json[0]['id']
106                                       if prev_entries_json else 0)
107                        entries = (entry for entry in entries
108                                   if entry.id > previous_id)
109                        entries = _filter_entries(prev_filter, entries)
110                        entries_json = [encoder.entry_to_json(entry)
111                                        for entry in entries]
112
113                        if entries_json:
114                            new_entries_json = itertools.chain(
115                                entries_json, prev_entries_json)
116                            new_entries_json = itertools.islice(
117                                new_entries_json, prev_filter.max_results)
118                            new_entries_json = list(new_entries_json)
119
120                        else:
121                            new_entries_json = prev_entries_json
122
123                        conn.state.set([], {'filter': prev_filter_json,
124                                            'entries': new_entries_json,
125                                            'first_id': self._backend.first_id,
126                                            'last_id': self._backend.last_id})
127
128        except Exception as e:
129            mlog.error("connection error: %s", e, exc_info=e)
130
131        finally:
132            mlog.debug("closing connection")
133            conn.close()
134            self._locks.pop(conn)
135            self._filters.pop(conn)
136
137    async def _on_request(self, conn, name, data):
138        if name != 'filter':
139            raise Exception('invalid request name')
140
141        new_filter = encoder.filter_from_json(data)
142        new_filter = _sanitize_filter(new_filter)
143
144        async with self._locks[conn]:
145            prev_filter = self._filters[conn]
146            if new_filter == prev_filter:
147                return
148
149            mlog.debug('setting new filter: %s', new_filter)
150            new_filter_json = encoder.filter_to_json(new_filter)
151
152            entries = await self._backend.query(new_filter)
153            entries_json = [encoder.entry_to_json(entry) for entry in entries]
154
155            self._filters[conn] = new_filter
156            conn.state.set([], {'filter': new_filter_json,
157                                'entries': entries_json,
158                                'first_id': self._backend.first_id,
159                                'last_id': self._backend.last_id})
160
161
162def _sanitize_filter(f):
163    if f.max_results is None or f.max_results > max_results_limit:
164        f = f._replace(max_results=max_results_limit)
165
166    return f
167
168
169def _filter_entries(f, entries):
170    for i in entries:
171        if f.last_id is not None and i.id > f.last_id:
172            continue
173
174        if (f.entry_timestamp_from is not None
175                and i.timestamp < f.entry_timestamp_from):
176            continue
177
178        if (f.entry_timestamp_to is not None
179                and i.timestamp > f.entry_timestamp_to):
180            continue
181
182        if f.facility is not None and i.msg.facility != f.facility:
183            continue
184
185        if f.severity is not None and i.msg.severity != f.severity:
186            continue
187
188        if not _match_str_filter(f.hostname, i.msg.hostname):
189            continue
190
191        if not _match_str_filter(f.app_name, i.msg.app_name):
192            continue
193
194        if not _match_str_filter(f.procid, i.msg.procid):
195            continue
196
197        if not _match_str_filter(f.msgid, i.msg.msgid):
198            continue
199
200        if not _match_str_filter(f.msg, i.msg.msg):
201            continue
202
203        yield i
204
205
206def _match_str_filter(f, value):
207    return not f or f in value
mlog: logging.Logger = <Logger hat.syslog.server.ui (WARNING)>

Module logger

max_results_limit: int = 200

Max results limit

autoflush_delay: float = 0.2

Juggler autoflush delay

default_filter = Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)

Default filter

async def create_web_server( addr: str, backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.ui.WebServer:
32async def create_web_server(addr: str,
33                            backend: hat.syslog.server.backend.Backend
34                            ) -> 'WebServer':
35    """Create web server"""
36    srv = WebServer()
37    srv._backend = backend
38    srv._locks = {}
39    srv._filters = {}
40
41    exit_stack = contextlib.ExitStack()
42    try:
43        ui_path = exit_stack.enter_context(
44            importlib.resources.path(__package__, 'ui'))
45
46        url = urllib.parse.urlparse(addr)
47        srv._srv = await juggler.listen(host=url.hostname,
48                                        port=url.port,
49                                        connection_cb=srv._on_connection,
50                                        request_cb=srv._on_request,
51                                        static_dir=ui_path,
52                                        autoflush_delay=autoflush_delay)
53
54        try:
55            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
56
57        except BaseException:
58            await aio.uncancellable(srv.async_close())
59            raise
60
61    except BaseException:
62        exit_stack.close()
63        raise
64
65    mlog.debug("web server listening on %s", addr)
66    return srv

Create web server

class WebServer(hat.aio.Resource):
 69class WebServer(aio.Resource):
 70
 71    @property
 72    def async_group(self) -> aio.Group:
 73        """Async group"""
 74        return self._srv.async_group
 75
 76    async def _on_connection(self, conn):
 77        try:
 78            mlog.debug("new connection")
 79
 80            self._locks[conn] = asyncio.Lock()
 81            self._filters[conn] = default_filter
 82
 83            change_queue = aio.Queue()
 84            with self._backend.register_change_cb(change_queue.put_nowait):
 85                async with self._locks[conn]:
 86                    prev_filter = self._filters[conn]
 87                    prev_filter_json = encoder.filter_to_json(prev_filter)
 88
 89                    entries = await self._backend.query(prev_filter)
 90                    entries_json = [encoder.entry_to_json(entry)
 91                                    for entry in entries]
 92
 93                    conn.state.set([], {'filter': prev_filter_json,
 94                                        'entries': entries_json,
 95                                        'first_id': self._backend.first_id,
 96                                        'last_id': self._backend.last_id})
 97
 98                while True:
 99                    entries = await change_queue.get()
100
101                    async with self._locks[conn]:
102                        prev_filter = self._filters[conn]
103                        prev_filter_json = conn.state.get('filter')
104                        prev_entries_json = conn.state.get('entries')
105
106                        previous_id = (prev_entries_json[0]['id']
107                                       if prev_entries_json else 0)
108                        entries = (entry for entry in entries
109                                   if entry.id > previous_id)
110                        entries = _filter_entries(prev_filter, entries)
111                        entries_json = [encoder.entry_to_json(entry)
112                                        for entry in entries]
113
114                        if entries_json:
115                            new_entries_json = itertools.chain(
116                                entries_json, prev_entries_json)
117                            new_entries_json = itertools.islice(
118                                new_entries_json, prev_filter.max_results)
119                            new_entries_json = list(new_entries_json)
120
121                        else:
122                            new_entries_json = prev_entries_json
123
124                        conn.state.set([], {'filter': prev_filter_json,
125                                            'entries': new_entries_json,
126                                            'first_id': self._backend.first_id,
127                                            'last_id': self._backend.last_id})
128
129        except Exception as e:
130            mlog.error("connection error: %s", e, exc_info=e)
131
132        finally:
133            mlog.debug("closing connection")
134            conn.close()
135            self._locks.pop(conn)
136            self._filters.pop(conn)
137
138    async def _on_request(self, conn, name, data):
139        if name != 'filter':
140            raise Exception('invalid request name')
141
142        new_filter = encoder.filter_from_json(data)
143        new_filter = _sanitize_filter(new_filter)
144
145        async with self._locks[conn]:
146            prev_filter = self._filters[conn]
147            if new_filter == prev_filter:
148                return
149
150            mlog.debug('setting new filter: %s', new_filter)
151            new_filter_json = encoder.filter_to_json(new_filter)
152
153            entries = await self._backend.query(new_filter)
154            entries_json = [encoder.entry_to_json(entry) for entry in entries]
155
156            self._filters[conn] = new_filter
157            conn.state.set([], {'filter': new_filter_json,
158                                'entries': entries_json,
159                                'first_id': self._backend.first_id,
160                                'last_id': self._backend.last_id})

Resource with lifetime control based on Group.

async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close