hat.syslog.server.backend
Backend implementation
1"""Backend implementation""" 2 3from pathlib import Path 4import asyncio 5import contextlib 6import itertools 7import logging 8import typing 9 10from hat import aio 11from hat import util 12 13from hat.syslog.server import common 14from hat.syslog.server import database 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20register_delay: float = 0.1 21"""Registration delay in seconds""" 22 23register_queue_size: int = 50 24"""Registration queue size""" 25 26register_queue_treshold: int = 10 27"""Registration queue threshold""" 28 29 30async def create_backend(path: Path, 31 low_size: int, 32 high_size: int, 33 enable_archive: bool, 34 disable_journal: bool 35 ) -> 'Backend': 36 """Create backend""" 37 db = await database.create_database(path, disable_journal) 38 try: 39 first_id = await db.get_first_id() 40 last_id = await db.get_last_id() 41 except BaseException: 42 await aio.uncancellable(db.async_close()) 43 raise 44 45 backend = Backend() 46 backend._path = path 47 backend._low_size = low_size 48 backend._high_size = high_size 49 backend._enable_archive = enable_archive 50 backend._disable_journal = disable_journal 51 backend._db = db 52 backend._first_id = first_id 53 backend._last_id = last_id 54 backend._async_group = aio.Group() 55 backend._change_cbs = util.CallbackRegistry() 56 backend._msg_queue = aio.Queue(register_queue_size) 57 backend._executor = aio.create_executor() 58 59 backend._async_group.spawn(aio.call_on_cancel, db.async_close) 60 backend._async_group.spawn(backend._loop) 61 62 mlog.debug('created backend with database %s', path) 63 return backend 64 65 66class Backend(aio.Resource): 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._async_group 72 73 @property 74 def first_id(self) -> typing.Optional[int]: 75 """First entry id""" 76 return self._first_id 77 78 @property 79 def last_id(self) -> typing.Optional[int]: 80 """Last entry id""" 81 return self._last_id 82 83 def register_change_cb(self, 84 cb: typing.Callable[[typing.List[common.Entry]], 85 None] 86 ) -> util.RegisterCallbackHandle: 87 """Register change callback 88 89 Callback is called if `first_id` changes and/or `last_id` changes 90 and/or new entries are available (passed as argument to registered 91 callback). 92 93 """ 94 return self._change_cbs.register(cb) 95 96 async def register(self, 97 timestamp: float, 98 msg: common.Msg): 99 """Register message 100 101 Registration adds msg to registration queue. If queue is full, wait 102 until message can be successfully added. 103 104 When message is added to empty queue, registration delay timer is 105 started. Once delay timer expires or if number of messages in queue 106 is greater than threshold, all messages are removed from queue and 107 inserted into sqlite database. 108 109 """ 110 await self._msg_queue.put((timestamp, msg)) 111 112 async def query(self, 113 filter: common.Filter 114 ) -> typing.List[common.Entry]: 115 """Query entries""" 116 return await self._db.query(filter) 117 118 async def _loop(self): 119 try: 120 while True: 121 msgs = await self._get_msgs() 122 await self._process_msgs(msgs) 123 124 except Exception as e: 125 mlog.warn("backend loop error: %s", e, exc_info=e) 126 127 finally: 128 self.close() 129 self._msg_queue.close() 130 mlog.debug('backend loop closed') 131 132 async def _get_msgs(self): 133 loop = asyncio.get_running_loop() 134 msgs = [] 135 136 msg = await self._msg_queue.get() 137 msgs.append(msg) 138 139 start = loop.time() 140 while True: 141 while not self._msg_queue.empty(): 142 msgs.append(self._msg_queue.get_nowait()) 143 timeout = register_delay - (loop.time() - start) 144 if timeout <= 0: 145 break 146 if len(msgs) >= register_queue_treshold: 147 break 148 async_group = aio.Group() 149 try: 150 f = async_group.spawn(self._msg_queue.get) 151 await aio.wait_for(asyncio.shield(f), timeout) 152 except asyncio.TimeoutError: 153 break 154 finally: 155 await aio.uncancellable(async_group.async_close()) 156 if not f.cancelled(): 157 msgs.append(f.result()) 158 159 while not self._msg_queue.empty(): 160 msgs.append(self._msg_queue.get_nowait()) 161 return msgs 162 163 async def _process_msgs(self, msgs): 164 mlog.debug("registering new messages (message count: %s)...", 165 len(msgs)) 166 entries = await self._db.add_msgs(msgs) 167 if not entries: 168 return 169 entries = list(reversed(entries)) 170 171 self._last_id = entries[0].id 172 if self._first_id is None: 173 self._first_id = entries[-1].id 174 175 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 176 self._first_id, self._last_id) 177 self._change_cbs.notify(entries) 178 179 if self._high_size <= 0: 180 return 181 if self._last_id - self._first_id + 1 <= self._high_size: 182 return 183 184 mlog.debug("database cleanup starting...") 185 await self._db_cleanup() 186 187 async def _db_cleanup(self): 188 first_id = self._last_id - self._low_size + 1 189 if first_id > self._last_id: 190 first_id = None 191 if first_id <= self._first_id: 192 return 193 194 if self._enable_archive: 195 mlog.debug("archiving database entries...") 196 await self._archive_db(first_id) 197 198 await self._db.delete(first_id) 199 self._first_id = first_id 200 if self._first_id is None: 201 self._last_id = None 202 203 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 204 self._first_id, self._last_id) 205 self._change_cbs.notify([]) 206 207 async def _archive_db(self, first_id): 208 archive_path = await self._async_group.spawn( 209 self._executor, _ext_get_new_archive_path, self._path) 210 archive = await database.create_database( 211 archive_path, self._disable_journal) 212 try: 213 entries = await self._db.query(common.Filter( 214 last_id=first_id - 1 if first_id is not None else None)) 215 await archive.add_entries(entries) 216 finally: 217 await aio.uncancellable(archive.async_close()) 218 219 220def _ext_get_new_archive_path(db_path): 221 last_index = 0 222 223 for i in db_path.parent.glob(db_path.name + '.*'): 224 with contextlib.suppress(ValueError): 225 index = int(i.name.split('.')[-1]) 226 if index > last_index: 227 last_index = index 228 229 for i in itertools.count(last_index + 1): 230 new_path = db_path.parent / f"{db_path.name}.{i}" 231 if new_path.exists(): 232 continue 233 return new_path
Module logger
register_delay: float =
0.1
Registration delay in seconds
register_queue_size: int =
50
Registration queue size
register_queue_treshold: int =
10
Registration queue threshold
async def
create_backend( path: pathlib.Path, low_size: int, high_size: int, enable_archive: bool, disable_journal: bool) -> hat.syslog.server.backend.Backend:
31async def create_backend(path: Path, 32 low_size: int, 33 high_size: int, 34 enable_archive: bool, 35 disable_journal: bool 36 ) -> 'Backend': 37 """Create backend""" 38 db = await database.create_database(path, disable_journal) 39 try: 40 first_id = await db.get_first_id() 41 last_id = await db.get_last_id() 42 except BaseException: 43 await aio.uncancellable(db.async_close()) 44 raise 45 46 backend = Backend() 47 backend._path = path 48 backend._low_size = low_size 49 backend._high_size = high_size 50 backend._enable_archive = enable_archive 51 backend._disable_journal = disable_journal 52 backend._db = db 53 backend._first_id = first_id 54 backend._last_id = last_id 55 backend._async_group = aio.Group() 56 backend._change_cbs = util.CallbackRegistry() 57 backend._msg_queue = aio.Queue(register_queue_size) 58 backend._executor = aio.create_executor() 59 60 backend._async_group.spawn(aio.call_on_cancel, db.async_close) 61 backend._async_group.spawn(backend._loop) 62 63 mlog.debug('created backend with database %s', path) 64 return backend
Create backend
class
Backend(hat.aio.Resource):
67class Backend(aio.Resource): 68 69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._async_group 73 74 @property 75 def first_id(self) -> typing.Optional[int]: 76 """First entry id""" 77 return self._first_id 78 79 @property 80 def last_id(self) -> typing.Optional[int]: 81 """Last entry id""" 82 return self._last_id 83 84 def register_change_cb(self, 85 cb: typing.Callable[[typing.List[common.Entry]], 86 None] 87 ) -> util.RegisterCallbackHandle: 88 """Register change callback 89 90 Callback is called if `first_id` changes and/or `last_id` changes 91 and/or new entries are available (passed as argument to registered 92 callback). 93 94 """ 95 return self._change_cbs.register(cb) 96 97 async def register(self, 98 timestamp: float, 99 msg: common.Msg): 100 """Register message 101 102 Registration adds msg to registration queue. If queue is full, wait 103 until message can be successfully added. 104 105 When message is added to empty queue, registration delay timer is 106 started. Once delay timer expires or if number of messages in queue 107 is greater than threshold, all messages are removed from queue and 108 inserted into sqlite database. 109 110 """ 111 await self._msg_queue.put((timestamp, msg)) 112 113 async def query(self, 114 filter: common.Filter 115 ) -> typing.List[common.Entry]: 116 """Query entries""" 117 return await self._db.query(filter) 118 119 async def _loop(self): 120 try: 121 while True: 122 msgs = await self._get_msgs() 123 await self._process_msgs(msgs) 124 125 except Exception as e: 126 mlog.warn("backend loop error: %s", e, exc_info=e) 127 128 finally: 129 self.close() 130 self._msg_queue.close() 131 mlog.debug('backend loop closed') 132 133 async def _get_msgs(self): 134 loop = asyncio.get_running_loop() 135 msgs = [] 136 137 msg = await self._msg_queue.get() 138 msgs.append(msg) 139 140 start = loop.time() 141 while True: 142 while not self._msg_queue.empty(): 143 msgs.append(self._msg_queue.get_nowait()) 144 timeout = register_delay - (loop.time() - start) 145 if timeout <= 0: 146 break 147 if len(msgs) >= register_queue_treshold: 148 break 149 async_group = aio.Group() 150 try: 151 f = async_group.spawn(self._msg_queue.get) 152 await aio.wait_for(asyncio.shield(f), timeout) 153 except asyncio.TimeoutError: 154 break 155 finally: 156 await aio.uncancellable(async_group.async_close()) 157 if not f.cancelled(): 158 msgs.append(f.result()) 159 160 while not self._msg_queue.empty(): 161 msgs.append(self._msg_queue.get_nowait()) 162 return msgs 163 164 async def _process_msgs(self, msgs): 165 mlog.debug("registering new messages (message count: %s)...", 166 len(msgs)) 167 entries = await self._db.add_msgs(msgs) 168 if not entries: 169 return 170 entries = list(reversed(entries)) 171 172 self._last_id = entries[0].id 173 if self._first_id is None: 174 self._first_id = entries[-1].id 175 176 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 177 self._first_id, self._last_id) 178 self._change_cbs.notify(entries) 179 180 if self._high_size <= 0: 181 return 182 if self._last_id - self._first_id + 1 <= self._high_size: 183 return 184 185 mlog.debug("database cleanup starting...") 186 await self._db_cleanup() 187 188 async def _db_cleanup(self): 189 first_id = self._last_id - self._low_size + 1 190 if first_id > self._last_id: 191 first_id = None 192 if first_id <= self._first_id: 193 return 194 195 if self._enable_archive: 196 mlog.debug("archiving database entries...") 197 await self._archive_db(first_id) 198 199 await self._db.delete(first_id) 200 self._first_id = first_id 201 if self._first_id is None: 202 self._last_id = None 203 204 mlog.debug("backend state changed (first_id: %s; last_id: %s)", 205 self._first_id, self._last_id) 206 self._change_cbs.notify([]) 207 208 async def _archive_db(self, first_id): 209 archive_path = await self._async_group.spawn( 210 self._executor, _ext_get_new_archive_path, self._path) 211 archive = await database.create_database( 212 archive_path, self._disable_journal) 213 try: 214 entries = await self._db.query(common.Filter( 215 last_id=first_id - 1 if first_id is not None else None)) 216 await archive.add_entries(entries) 217 finally: 218 await aio.uncancellable(archive.async_close())
Resource with lifetime control based on Group
.
def
register_change_cb( self, cb: Callable[[List[hat.syslog.server.common.Entry]], NoneType]) -> hat.util.RegisterCallbackHandle:
84 def register_change_cb(self, 85 cb: typing.Callable[[typing.List[common.Entry]], 86 None] 87 ) -> util.RegisterCallbackHandle: 88 """Register change callback 89 90 Callback is called if `first_id` changes and/or `last_id` changes 91 and/or new entries are available (passed as argument to registered 92 callback). 93 94 """ 95 return self._change_cbs.register(cb)
97 async def register(self, 98 timestamp: float, 99 msg: common.Msg): 100 """Register message 101 102 Registration adds msg to registration queue. If queue is full, wait 103 until message can be successfully added. 104 105 When message is added to empty queue, registration delay timer is 106 started. Once delay timer expires or if number of messages in queue 107 is greater than threshold, all messages are removed from queue and 108 inserted into sqlite database. 109 110 """ 111 await self._msg_queue.put((timestamp, msg))
Register message
Registration adds msg to registration queue. If queue is full, wait until message can be successfully added.
When message is added to empty queue, registration delay timer is started. Once delay timer expires or if number of messages in queue is greater than threshold, all messages are removed from queue and inserted into sqlite database.
async def
query( self, filter: hat.syslog.server.common.Filter) -> List[hat.syslog.server.common.Entry]:
113 async def query(self, 114 filter: common.Filter 115 ) -> typing.List[common.Entry]: 116 """Query entries""" 117 return await self._db.query(filter)
Query entries
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close