hat.syslog.server.ui

Web server implementation

  1"""Web server implementation"""
  2
  3import asyncio
  4import contextlib
  5import importlib
  6import itertools
  7import logging
  8import urllib
  9
 10from hat import aio
 11from hat import juggler
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import encoder
 15import hat.syslog.server.backend
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21max_results_limit: int = 200
 22"""Max results limit"""
 23
 24autoflush_delay: float = 0.2
 25"""Juggler autoflush delay"""
 26
 27default_filter = common.Filter(max_results=max_results_limit)
 28"""Default filter"""
 29
 30
 31async def create_web_server(addr: str,
 32                            backend: hat.syslog.server.backend.Backend
 33                            ) -> 'WebServer':
 34    """Create web server"""
 35    srv = WebServer()
 36    srv._backend = backend
 37    srv._locks = {}
 38    srv._filters = {}
 39
 40    exit_stack = contextlib.ExitStack()
 41    try:
 42        ui_path = exit_stack.enter_context(
 43            importlib.resources.path(__package__, 'ui'))
 44
 45        url = urllib.parse.urlparse(addr)
 46        srv._srv = await juggler.listen(host=url.hostname,
 47                                        port=url.port,
 48                                        connection_cb=srv._on_connection,
 49                                        request_cb=srv._on_request,
 50                                        static_dir=ui_path,
 51                                        autoflush_delay=autoflush_delay)
 52
 53        try:
 54            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
 55
 56        except BaseException:
 57            await aio.uncancellable(srv.async_close())
 58            raise
 59
 60    except BaseException:
 61        exit_stack.close()
 62        raise
 63
 64    mlog.debug("web server listening on %s", addr)
 65    return srv
 66
 67
 68class WebServer(aio.Resource):
 69
 70    @property
 71    def async_group(self) -> aio.Group:
 72        """Async group"""
 73        return self._srv.async_group
 74
 75    async def _on_connection(self, conn):
 76        try:
 77            self._locks[conn] = asyncio.Lock()
 78            self._filters[conn] = default_filter
 79
 80            change_queue = aio.Queue()
 81            with self._backend.register_change_cb(change_queue.put_nowait):
 82                async with self._locks[conn]:
 83                    prev_filter = self._filters[conn]
 84                    prev_filter_json = encoder.filter_to_json(prev_filter)
 85
 86                    entries = await self._backend.query(prev_filter)
 87                    entries_json = [encoder.entry_to_json(entry)
 88                                    for entry in entries]
 89
 90                    conn.state.set([], {'filter': prev_filter_json,
 91                                        'entries': entries_json,
 92                                        'first_id': self._backend.first_id,
 93                                        'last_id': self._backend.last_id})
 94
 95                while True:
 96                    entries = await change_queue.get()
 97
 98                    async with self._locks[conn]:
 99                        prev_filter = self._filters[conn]
100                        prev_filter_json = conn.state.get('filter')
101                        prev_entries_json = conn.state.get('entries')
102
103                        previous_id = (prev_entries_json[0]['id']
104                                       if prev_entries_json else 0)
105                        entries = (entry for entry in entries
106                                   if entry.id > previous_id)
107                        entries = _filter_entries(prev_filter, entries)
108                        entries_json = [encoder.entry_to_json(entry)
109                                        for entry in entries]
110
111                        if entries_json:
112                            new_entries_json = itertools.chain(
113                                entries_json, prev_entries_json)
114                            new_entries_json = itertools.islice(
115                                new_entries_json, prev_filter.max_results)
116                            new_entries_json = list(new_entries_json)
117
118                        else:
119                            new_entries_json = prev_entries_json
120
121                        conn.state.set([], {'filter': prev_filter_json,
122                                            'entries': new_entries_json,
123                                            'first_id': self._backend.first_id,
124                                            'last_id': self._backend.last_id})
125
126        except Exception as e:
127            mlog.error("connection error: %s", e, exc_info=e)
128
129        finally:
130            conn.close()
131            self._locks.pop(conn)
132            self._filters.pop(conn)
133
134    async def _on_request(self, conn, name, data):
135        if name != 'filter':
136            raise Exception('invalid request name')
137
138        new_filter = encoder.filter_from_json(data)
139        new_filter = _sanitize_filter(new_filter)
140
141        async with self._locks[conn]:
142            prev_filter = self._filters[conn]
143            if new_filter == prev_filter:
144                return
145
146            mlog.debug('setting new filter: %s', new_filter)
147            new_filter_json = encoder.filter_to_json(new_filter)
148
149            entries = await self._backend.query(new_filter)
150            entries_json = [encoder.entry_to_json(entry) for entry in entries]
151
152            self._filters[conn] = new_filter
153            conn.state.set([], {'filter': new_filter_json,
154                                'entries': entries_json,
155                                'first_id': self._backend.first_id,
156                                'last_id': self._backend.last_id})
157
158
159def _sanitize_filter(f):
160    if f.max_results is None or f.max_results > max_results_limit:
161        f = f._replace(max_results=max_results_limit)
162
163    return f
164
165
166def _filter_entries(f, entries):
167    for i in entries:
168        if f.last_id is not None and i.id > f.last_id:
169            continue
170
171        if (f.entry_timestamp_from is not None
172                and i.timestamp < f.entry_timestamp_from):
173            continue
174
175        if (f.entry_timestamp_to is not None
176                and i.timestamp > f.entry_timestamp_to):
177            continue
178
179        if f.facility is not None and i.msg.facility != f.facility:
180            continue
181
182        if f.severity is not None and i.msg.severity != f.severity:
183            continue
184
185        if not _match_str_filter(f.hostname, i.msg.hostname):
186            continue
187
188        if not _match_str_filter(f.app_name, i.msg.app_name):
189            continue
190
191        if not _match_str_filter(f.procid, i.msg.procid):
192            continue
193
194        if not _match_str_filter(f.msgid, i.msg.msgid):
195            continue
196
197        if not _match_str_filter(f.msg, i.msg.msg):
198            continue
199
200        yield i
201
202
203def _match_str_filter(f, value):
204    return not f or f in value
mlog: logging.Logger = <Logger hat.syslog.server.ui (WARNING)>

Module logger

max_results_limit: int = 200

Max results limit

autoflush_delay: float = 0.2

Juggler autoflush delay

default_filter = Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)

Default filter

async def create_web_server( addr: str, backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.ui.WebServer:
32async def create_web_server(addr: str,
33                            backend: hat.syslog.server.backend.Backend
34                            ) -> 'WebServer':
35    """Create web server"""
36    srv = WebServer()
37    srv._backend = backend
38    srv._locks = {}
39    srv._filters = {}
40
41    exit_stack = contextlib.ExitStack()
42    try:
43        ui_path = exit_stack.enter_context(
44            importlib.resources.path(__package__, 'ui'))
45
46        url = urllib.parse.urlparse(addr)
47        srv._srv = await juggler.listen(host=url.hostname,
48                                        port=url.port,
49                                        connection_cb=srv._on_connection,
50                                        request_cb=srv._on_request,
51                                        static_dir=ui_path,
52                                        autoflush_delay=autoflush_delay)
53
54        try:
55            srv.async_group.spawn(aio.call_on_cancel, exit_stack.close)
56
57        except BaseException:
58            await aio.uncancellable(srv.async_close())
59            raise
60
61    except BaseException:
62        exit_stack.close()
63        raise
64
65    mlog.debug("web server listening on %s", addr)
66    return srv

Create web server

class WebServer(hat.aio.Resource):
 69class WebServer(aio.Resource):
 70
 71    @property
 72    def async_group(self) -> aio.Group:
 73        """Async group"""
 74        return self._srv.async_group
 75
 76    async def _on_connection(self, conn):
 77        try:
 78            self._locks[conn] = asyncio.Lock()
 79            self._filters[conn] = default_filter
 80
 81            change_queue = aio.Queue()
 82            with self._backend.register_change_cb(change_queue.put_nowait):
 83                async with self._locks[conn]:
 84                    prev_filter = self._filters[conn]
 85                    prev_filter_json = encoder.filter_to_json(prev_filter)
 86
 87                    entries = await self._backend.query(prev_filter)
 88                    entries_json = [encoder.entry_to_json(entry)
 89                                    for entry in entries]
 90
 91                    conn.state.set([], {'filter': prev_filter_json,
 92                                        'entries': entries_json,
 93                                        'first_id': self._backend.first_id,
 94                                        'last_id': self._backend.last_id})
 95
 96                while True:
 97                    entries = await change_queue.get()
 98
 99                    async with self._locks[conn]:
100                        prev_filter = self._filters[conn]
101                        prev_filter_json = conn.state.get('filter')
102                        prev_entries_json = conn.state.get('entries')
103
104                        previous_id = (prev_entries_json[0]['id']
105                                       if prev_entries_json else 0)
106                        entries = (entry for entry in entries
107                                   if entry.id > previous_id)
108                        entries = _filter_entries(prev_filter, entries)
109                        entries_json = [encoder.entry_to_json(entry)
110                                        for entry in entries]
111
112                        if entries_json:
113                            new_entries_json = itertools.chain(
114                                entries_json, prev_entries_json)
115                            new_entries_json = itertools.islice(
116                                new_entries_json, prev_filter.max_results)
117                            new_entries_json = list(new_entries_json)
118
119                        else:
120                            new_entries_json = prev_entries_json
121
122                        conn.state.set([], {'filter': prev_filter_json,
123                                            'entries': new_entries_json,
124                                            'first_id': self._backend.first_id,
125                                            'last_id': self._backend.last_id})
126
127        except Exception as e:
128            mlog.error("connection error: %s", e, exc_info=e)
129
130        finally:
131            conn.close()
132            self._locks.pop(conn)
133            self._filters.pop(conn)
134
135    async def _on_request(self, conn, name, data):
136        if name != 'filter':
137            raise Exception('invalid request name')
138
139        new_filter = encoder.filter_from_json(data)
140        new_filter = _sanitize_filter(new_filter)
141
142        async with self._locks[conn]:
143            prev_filter = self._filters[conn]
144            if new_filter == prev_filter:
145                return
146
147            mlog.debug('setting new filter: %s', new_filter)
148            new_filter_json = encoder.filter_to_json(new_filter)
149
150            entries = await self._backend.query(new_filter)
151            entries_json = [encoder.entry_to_json(entry) for entry in entries]
152
153            self._filters[conn] = new_filter
154            conn.state.set([], {'filter': new_filter_json,
155                                'entries': entries_json,
156                                'first_id': self._backend.first_id,
157                                'last_id': self._backend.last_id})

Resource with lifetime control based on Group.

WebServer()
async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close