hat.syslog.server.ui
Web server implementation
1"""Web server implementation""" 2 3import asyncio 4import contextlib 5import importlib 6import itertools 7import logging 8import urllib 9 10from hat import aio 11from hat import juggler 12 13from hat.syslog.server import common 14from hat.syslog.server import encoder 15import hat.syslog.server.backend 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21max_results_limit: int = 200 22"""Max results limit""" 23 24autoflush_delay: float = 0.2 25"""Juggler autoflush delay""" 26 27default_filter = common.Filter(max_results=max_results_limit) 28"""Default filter""" 29 30 31async def create_web_server(addr: str, 32 backend: hat.syslog.server.backend.Backend 33 ) -> 'WebServer': 34 """Create web server""" 35 srv = WebServer() 36 srv._backend = backend 37 srv._locks = {} 38 srv._filters = {} 39 40 exit_stack = contextlib.ExitStack() 41 try: 42 ui_path = exit_stack.enter_context( 43 importlib.resources.path(__package__, 'ui')) 44 45 url = urllib.parse.urlparse(addr) 46 srv._srv = await juggler.listen(host=url.hostname, 47 port=url.port, 48 connection_cb=srv._on_connection, 49 request_cb=srv._on_request, 50 static_dir=ui_path, 51 autoflush_delay=autoflush_delay) 52 53 try: 54 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 55 56 except BaseException: 57 await aio.uncancellable(srv.async_close()) 58 raise 59 60 except BaseException: 61 exit_stack.close() 62 raise 63 64 mlog.debug("web server listening on %s", addr) 65 return srv 66 67 68class WebServer(aio.Resource): 69 70 @property 71 def async_group(self) -> aio.Group: 72 """Async group""" 73 return self._srv.async_group 74 75 async def _on_connection(self, conn): 76 try: 77 self._locks[conn] = asyncio.Lock() 78 self._filters[conn] = default_filter 79 80 change_queue = aio.Queue() 81 with self._backend.register_change_cb(change_queue.put_nowait): 82 async with self._locks[conn]: 83 prev_filter = self._filters[conn] 84 prev_filter_json = encoder.filter_to_json(prev_filter) 85 86 entries = await self._backend.query(prev_filter) 87 entries_json = [encoder.entry_to_json(entry) 88 for entry in entries] 89 90 conn.state.set([], {'filter': prev_filter_json, 91 'entries': entries_json, 92 'first_id': self._backend.first_id, 93 'last_id': self._backend.last_id}) 94 95 while True: 96 entries = await change_queue.get() 97 98 async with self._locks[conn]: 99 prev_filter = self._filters[conn] 100 prev_filter_json = conn.state.get('filter') 101 prev_entries_json = conn.state.get('entries') 102 103 previous_id = (prev_entries_json[0]['id'] 104 if prev_entries_json else 0) 105 entries = (entry for entry in entries 106 if entry.id > previous_id) 107 entries = _filter_entries(prev_filter, entries) 108 entries_json = [encoder.entry_to_json(entry) 109 for entry in entries] 110 111 if entries_json: 112 new_entries_json = itertools.chain( 113 entries_json, prev_entries_json) 114 new_entries_json = itertools.islice( 115 new_entries_json, prev_filter.max_results) 116 new_entries_json = list(new_entries_json) 117 118 else: 119 new_entries_json = prev_entries_json 120 121 conn.state.set([], {'filter': prev_filter_json, 122 'entries': new_entries_json, 123 'first_id': self._backend.first_id, 124 'last_id': self._backend.last_id}) 125 126 except Exception as e: 127 mlog.error("connection error: %s", e, exc_info=e) 128 129 finally: 130 conn.close() 131 self._locks.pop(conn) 132 self._filters.pop(conn) 133 134 async def _on_request(self, conn, name, data): 135 if name != 'filter': 136 raise Exception('invalid request name') 137 138 new_filter = encoder.filter_from_json(data) 139 new_filter = _sanitize_filter(new_filter) 140 141 async with self._locks[conn]: 142 prev_filter = self._filters[conn] 143 if new_filter == prev_filter: 144 return 145 146 mlog.debug('setting new filter: %s', new_filter) 147 new_filter_json = encoder.filter_to_json(new_filter) 148 149 entries = await self._backend.query(new_filter) 150 entries_json = [encoder.entry_to_json(entry) for entry in entries] 151 152 self._filters[conn] = new_filter 153 conn.state.set([], {'filter': new_filter_json, 154 'entries': entries_json, 155 'first_id': self._backend.first_id, 156 'last_id': self._backend.last_id}) 157 158 159def _sanitize_filter(f): 160 if f.max_results is None or f.max_results > max_results_limit: 161 f = f._replace(max_results=max_results_limit) 162 163 return f 164 165 166def _filter_entries(f, entries): 167 for i in entries: 168 if f.last_id is not None and i.id > f.last_id: 169 continue 170 171 if (f.entry_timestamp_from is not None 172 and i.timestamp < f.entry_timestamp_from): 173 continue 174 175 if (f.entry_timestamp_to is not None 176 and i.timestamp > f.entry_timestamp_to): 177 continue 178 179 if f.facility is not None and i.msg.facility != f.facility: 180 continue 181 182 if f.severity is not None and i.msg.severity != f.severity: 183 continue 184 185 if not _match_str_filter(f.hostname, i.msg.hostname): 186 continue 187 188 if not _match_str_filter(f.app_name, i.msg.app_name): 189 continue 190 191 if not _match_str_filter(f.procid, i.msg.procid): 192 continue 193 194 if not _match_str_filter(f.msgid, i.msg.msgid): 195 continue 196 197 if not _match_str_filter(f.msg, i.msg.msg): 198 continue 199 200 yield i 201 202 203def _match_str_filter(f, value): 204 return not f or f in value
Module logger
max_results_limit: int = 200
Max results limit
autoflush_delay: float = 0.2
Juggler autoflush delay
default_filter = Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)
Default filter
async def
create_web_server( addr: str, backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.ui.WebServer:
32async def create_web_server(addr: str, 33 backend: hat.syslog.server.backend.Backend 34 ) -> 'WebServer': 35 """Create web server""" 36 srv = WebServer() 37 srv._backend = backend 38 srv._locks = {} 39 srv._filters = {} 40 41 exit_stack = contextlib.ExitStack() 42 try: 43 ui_path = exit_stack.enter_context( 44 importlib.resources.path(__package__, 'ui')) 45 46 url = urllib.parse.urlparse(addr) 47 srv._srv = await juggler.listen(host=url.hostname, 48 port=url.port, 49 connection_cb=srv._on_connection, 50 request_cb=srv._on_request, 51 static_dir=ui_path, 52 autoflush_delay=autoflush_delay) 53 54 try: 55 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 56 57 except BaseException: 58 await aio.uncancellable(srv.async_close()) 59 raise 60 61 except BaseException: 62 exit_stack.close() 63 raise 64 65 mlog.debug("web server listening on %s", addr) 66 return srv
Create web server
class
WebServer(hat.aio.Resource):
69class WebServer(aio.Resource): 70 71 @property 72 def async_group(self) -> aio.Group: 73 """Async group""" 74 return self._srv.async_group 75 76 async def _on_connection(self, conn): 77 try: 78 self._locks[conn] = asyncio.Lock() 79 self._filters[conn] = default_filter 80 81 change_queue = aio.Queue() 82 with self._backend.register_change_cb(change_queue.put_nowait): 83 async with self._locks[conn]: 84 prev_filter = self._filters[conn] 85 prev_filter_json = encoder.filter_to_json(prev_filter) 86 87 entries = await self._backend.query(prev_filter) 88 entries_json = [encoder.entry_to_json(entry) 89 for entry in entries] 90 91 conn.state.set([], {'filter': prev_filter_json, 92 'entries': entries_json, 93 'first_id': self._backend.first_id, 94 'last_id': self._backend.last_id}) 95 96 while True: 97 entries = await change_queue.get() 98 99 async with self._locks[conn]: 100 prev_filter = self._filters[conn] 101 prev_filter_json = conn.state.get('filter') 102 prev_entries_json = conn.state.get('entries') 103 104 previous_id = (prev_entries_json[0]['id'] 105 if prev_entries_json else 0) 106 entries = (entry for entry in entries 107 if entry.id > previous_id) 108 entries = _filter_entries(prev_filter, entries) 109 entries_json = [encoder.entry_to_json(entry) 110 for entry in entries] 111 112 if entries_json: 113 new_entries_json = itertools.chain( 114 entries_json, prev_entries_json) 115 new_entries_json = itertools.islice( 116 new_entries_json, prev_filter.max_results) 117 new_entries_json = list(new_entries_json) 118 119 else: 120 new_entries_json = prev_entries_json 121 122 conn.state.set([], {'filter': prev_filter_json, 123 'entries': new_entries_json, 124 'first_id': self._backend.first_id, 125 'last_id': self._backend.last_id}) 126 127 except Exception as e: 128 mlog.error("connection error: %s", e, exc_info=e) 129 130 finally: 131 conn.close() 132 self._locks.pop(conn) 133 self._filters.pop(conn) 134 135 async def _on_request(self, conn, name, data): 136 if name != 'filter': 137 raise Exception('invalid request name') 138 139 new_filter = encoder.filter_from_json(data) 140 new_filter = _sanitize_filter(new_filter) 141 142 async with self._locks[conn]: 143 prev_filter = self._filters[conn] 144 if new_filter == prev_filter: 145 return 146 147 mlog.debug('setting new filter: %s', new_filter) 148 new_filter_json = encoder.filter_to_json(new_filter) 149 150 entries = await self._backend.query(new_filter) 151 entries_json = [encoder.entry_to_json(entry) for entry in entries] 152 153 self._filters[conn] = new_filter 154 conn.state.set([], {'filter': new_filter_json, 155 'entries': entries_json, 156 'first_id': self._backend.first_id, 157 'last_id': self._backend.last_id})
Resource with lifetime control based on Group
.
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close