hat.syslog.server.backend

Backend implementation

  1"""Backend implementation"""
  2
  3from pathlib import Path
  4import asyncio
  5import contextlib
  6import itertools
  7import logging
  8import typing
  9
 10from hat import aio
 11from hat import util
 12
 13from hat.syslog.server import common
 14from hat.syslog.server import database
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20register_delay: float = 0.1
 21"""Registration delay in seconds"""
 22
 23register_queue_size: int = 50
 24"""Registration queue size"""
 25
 26register_queue_treshold: int = 10
 27"""Registration queue threshold"""
 28
 29
 30async def create_backend(path: Path,
 31                         low_size: int,
 32                         high_size: int,
 33                         enable_archive: bool,
 34                         disable_journal: bool
 35                         ) -> 'Backend':
 36    """Create backend"""
 37    db = await database.create_database(path, disable_journal)
 38    try:
 39        first_id = await db.get_first_id()
 40        last_id = await db.get_last_id()
 41    except BaseException:
 42        await aio.uncancellable(db.async_close())
 43        raise
 44
 45    backend = Backend()
 46    backend._path = path
 47    backend._low_size = low_size
 48    backend._high_size = high_size
 49    backend._enable_archive = enable_archive
 50    backend._disable_journal = disable_journal
 51    backend._db = db
 52    backend._first_id = first_id
 53    backend._last_id = last_id
 54    backend._async_group = aio.Group()
 55    backend._change_cbs = util.CallbackRegistry()
 56    backend._msg_queue = aio.Queue(register_queue_size)
 57    backend._executor = aio.create_executor()
 58
 59    backend._async_group.spawn(aio.call_on_cancel, db.async_close)
 60    backend._async_group.spawn(backend._loop)
 61
 62    mlog.debug('created backend with database %s', path)
 63    return backend
 64
 65
 66class Backend(aio.Resource):
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._async_group
 72
 73    @property
 74    def first_id(self) -> typing.Optional[int]:
 75        """First entry id"""
 76        return self._first_id
 77
 78    @property
 79    def last_id(self) -> typing.Optional[int]:
 80        """Last entry id"""
 81        return self._last_id
 82
 83    def register_change_cb(self,
 84                           cb: typing.Callable[[typing.List[common.Entry]],
 85                                               None]
 86                           ) -> util.RegisterCallbackHandle:
 87        """Register change callback
 88
 89        Callback is called if `first_id` changes and/or `last_id` changes
 90        and/or new entries are available (passed as argument to registered
 91        callback).
 92
 93        """
 94        return self._change_cbs.register(cb)
 95
 96    async def register(self,
 97                       timestamp: float,
 98                       msg: common.Msg):
 99        """Register message
100
101        Registration adds msg to registration queue. If queue is full, wait
102        until message can be successfully added.
103
104        When message is added to empty queue, registration delay timer is
105        started. Once delay timer expires or if number of messages in queue
106        is greater than threshold, all messages are removed from queue and
107        inserted into sqlite database.
108
109        """
110        await self._msg_queue.put((timestamp, msg))
111
112    async def query(self,
113                    filter: common.Filter
114                    ) -> typing.List[common.Entry]:
115        """Query entries"""
116        return await self._db.query(filter)
117
118    async def _loop(self):
119        try:
120            while True:
121                msgs = await self._get_msgs()
122                await self._process_msgs(msgs)
123
124        except Exception as e:
125            mlog.warn("backend loop error: %s", e, exc_info=e)
126
127        finally:
128            self.close()
129            self._msg_queue.close()
130            mlog.debug('backend loop closed')
131
132    async def _get_msgs(self):
133        loop = asyncio.get_running_loop()
134        msgs = []
135
136        msg = await self._msg_queue.get()
137        msgs.append(msg)
138
139        start = loop.time()
140        while True:
141            while not self._msg_queue.empty():
142                msgs.append(self._msg_queue.get_nowait())
143            timeout = register_delay - (loop.time() - start)
144            if timeout <= 0:
145                break
146            if len(msgs) >= register_queue_treshold:
147                break
148            async_group = aio.Group()
149            try:
150                f = async_group.spawn(self._msg_queue.get)
151                await aio.wait_for(asyncio.shield(f), timeout)
152            except asyncio.TimeoutError:
153                break
154            finally:
155                await aio.uncancellable(async_group.async_close())
156                if not f.cancelled():
157                    msgs.append(f.result())
158
159        while not self._msg_queue.empty():
160            msgs.append(self._msg_queue.get_nowait())
161        return msgs
162
163    async def _process_msgs(self, msgs):
164        mlog.debug("registering new messages (message count: %s)...",
165                   len(msgs))
166        entries = await self._db.add_msgs(msgs)
167        if not entries:
168            return
169        entries = list(reversed(entries))
170
171        self._last_id = entries[0].id
172        if self._first_id is None:
173            self._first_id = entries[-1].id
174
175        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
176                   self._first_id, self._last_id)
177        self._change_cbs.notify(entries)
178
179        if self._high_size <= 0:
180            return
181        if self._last_id - self._first_id + 1 <= self._high_size:
182            return
183
184        mlog.debug("database cleanup starting...")
185        await self._db_cleanup()
186
187    async def _db_cleanup(self):
188        first_id = self._last_id - self._low_size + 1
189        if first_id > self._last_id:
190            first_id = None
191        if first_id <= self._first_id:
192            return
193
194        if self._enable_archive:
195            mlog.debug("archiving database entries...")
196            await self._archive_db(first_id)
197
198        await self._db.delete(first_id)
199        self._first_id = first_id
200        if self._first_id is None:
201            self._last_id = None
202
203        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
204                   self._first_id, self._last_id)
205        self._change_cbs.notify([])
206
207    async def _archive_db(self, first_id):
208        archive_path = await self._async_group.spawn(
209            self._executor, _ext_get_new_archive_path, self._path)
210        archive = await database.create_database(
211            archive_path, self._disable_journal)
212        try:
213            entries = await self._db.query(common.Filter(
214                last_id=first_id - 1 if first_id is not None else None))
215            await archive.add_entries(entries)
216        finally:
217            await aio.uncancellable(archive.async_close())
218
219
220def _ext_get_new_archive_path(db_path):
221    last_index = 0
222
223    for i in db_path.parent.glob(db_path.name + '.*'):
224        with contextlib.suppress(ValueError):
225            index = int(i.name.split('.')[-1])
226            if index > last_index:
227                last_index = index
228
229    for i in itertools.count(last_index + 1):
230        new_path = db_path.parent / f"{db_path.name}.{i}"
231        if new_path.exists():
232            continue
233        return new_path
mlog: logging.Logger = <Logger hat.syslog.server.backend (WARNING)>

Module logger

register_delay: float = 0.1

Registration delay in seconds

register_queue_size: int = 50

Registration queue size

register_queue_treshold: int = 10

Registration queue threshold

async def create_backend( path: pathlib.Path, low_size: int, high_size: int, enable_archive: bool, disable_journal: bool) -> hat.syslog.server.backend.Backend:
31async def create_backend(path: Path,
32                         low_size: int,
33                         high_size: int,
34                         enable_archive: bool,
35                         disable_journal: bool
36                         ) -> 'Backend':
37    """Create backend"""
38    db = await database.create_database(path, disable_journal)
39    try:
40        first_id = await db.get_first_id()
41        last_id = await db.get_last_id()
42    except BaseException:
43        await aio.uncancellable(db.async_close())
44        raise
45
46    backend = Backend()
47    backend._path = path
48    backend._low_size = low_size
49    backend._high_size = high_size
50    backend._enable_archive = enable_archive
51    backend._disable_journal = disable_journal
52    backend._db = db
53    backend._first_id = first_id
54    backend._last_id = last_id
55    backend._async_group = aio.Group()
56    backend._change_cbs = util.CallbackRegistry()
57    backend._msg_queue = aio.Queue(register_queue_size)
58    backend._executor = aio.create_executor()
59
60    backend._async_group.spawn(aio.call_on_cancel, db.async_close)
61    backend._async_group.spawn(backend._loop)
62
63    mlog.debug('created backend with database %s', path)
64    return backend

Create backend

class Backend(hat.aio.Resource):
 67class Backend(aio.Resource):
 68
 69    @property
 70    def async_group(self) -> aio.Group:
 71        """Async group"""
 72        return self._async_group
 73
 74    @property
 75    def first_id(self) -> typing.Optional[int]:
 76        """First entry id"""
 77        return self._first_id
 78
 79    @property
 80    def last_id(self) -> typing.Optional[int]:
 81        """Last entry id"""
 82        return self._last_id
 83
 84    def register_change_cb(self,
 85                           cb: typing.Callable[[typing.List[common.Entry]],
 86                                               None]
 87                           ) -> util.RegisterCallbackHandle:
 88        """Register change callback
 89
 90        Callback is called if `first_id` changes and/or `last_id` changes
 91        and/or new entries are available (passed as argument to registered
 92        callback).
 93
 94        """
 95        return self._change_cbs.register(cb)
 96
 97    async def register(self,
 98                       timestamp: float,
 99                       msg: common.Msg):
100        """Register message
101
102        Registration adds msg to registration queue. If queue is full, wait
103        until message can be successfully added.
104
105        When message is added to empty queue, registration delay timer is
106        started. Once delay timer expires or if number of messages in queue
107        is greater than threshold, all messages are removed from queue and
108        inserted into sqlite database.
109
110        """
111        await self._msg_queue.put((timestamp, msg))
112
113    async def query(self,
114                    filter: common.Filter
115                    ) -> typing.List[common.Entry]:
116        """Query entries"""
117        return await self._db.query(filter)
118
119    async def _loop(self):
120        try:
121            while True:
122                msgs = await self._get_msgs()
123                await self._process_msgs(msgs)
124
125        except Exception as e:
126            mlog.warn("backend loop error: %s", e, exc_info=e)
127
128        finally:
129            self.close()
130            self._msg_queue.close()
131            mlog.debug('backend loop closed')
132
133    async def _get_msgs(self):
134        loop = asyncio.get_running_loop()
135        msgs = []
136
137        msg = await self._msg_queue.get()
138        msgs.append(msg)
139
140        start = loop.time()
141        while True:
142            while not self._msg_queue.empty():
143                msgs.append(self._msg_queue.get_nowait())
144            timeout = register_delay - (loop.time() - start)
145            if timeout <= 0:
146                break
147            if len(msgs) >= register_queue_treshold:
148                break
149            async_group = aio.Group()
150            try:
151                f = async_group.spawn(self._msg_queue.get)
152                await aio.wait_for(asyncio.shield(f), timeout)
153            except asyncio.TimeoutError:
154                break
155            finally:
156                await aio.uncancellable(async_group.async_close())
157                if not f.cancelled():
158                    msgs.append(f.result())
159
160        while not self._msg_queue.empty():
161            msgs.append(self._msg_queue.get_nowait())
162        return msgs
163
164    async def _process_msgs(self, msgs):
165        mlog.debug("registering new messages (message count: %s)...",
166                   len(msgs))
167        entries = await self._db.add_msgs(msgs)
168        if not entries:
169            return
170        entries = list(reversed(entries))
171
172        self._last_id = entries[0].id
173        if self._first_id is None:
174            self._first_id = entries[-1].id
175
176        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
177                   self._first_id, self._last_id)
178        self._change_cbs.notify(entries)
179
180        if self._high_size <= 0:
181            return
182        if self._last_id - self._first_id + 1 <= self._high_size:
183            return
184
185        mlog.debug("database cleanup starting...")
186        await self._db_cleanup()
187
188    async def _db_cleanup(self):
189        first_id = self._last_id - self._low_size + 1
190        if first_id > self._last_id:
191            first_id = None
192        if first_id <= self._first_id:
193            return
194
195        if self._enable_archive:
196            mlog.debug("archiving database entries...")
197            await self._archive_db(first_id)
198
199        await self._db.delete(first_id)
200        self._first_id = first_id
201        if self._first_id is None:
202            self._last_id = None
203
204        mlog.debug("backend state changed (first_id: %s; last_id: %s)",
205                   self._first_id, self._last_id)
206        self._change_cbs.notify([])
207
208    async def _archive_db(self, first_id):
209        archive_path = await self._async_group.spawn(
210            self._executor, _ext_get_new_archive_path, self._path)
211        archive = await database.create_database(
212            archive_path, self._disable_journal)
213        try:
214            entries = await self._db.query(common.Filter(
215                last_id=first_id - 1 if first_id is not None else None))
216            await archive.add_entries(entries)
217        finally:
218            await aio.uncancellable(archive.async_close())

Resource with lifetime control based on Group.

Backend()
async_group: hat.aio.Group

Async group

first_id: Optional[int]

First entry id

last_id: Optional[int]

Last entry id

def register_change_cb( self, cb: Callable[[List[hat.syslog.server.common.Entry]], NoneType]) -> hat.util.RegisterCallbackHandle:
84    def register_change_cb(self,
85                           cb: typing.Callable[[typing.List[common.Entry]],
86                                               None]
87                           ) -> util.RegisterCallbackHandle:
88        """Register change callback
89
90        Callback is called if `first_id` changes and/or `last_id` changes
91        and/or new entries are available (passed as argument to registered
92        callback).
93
94        """
95        return self._change_cbs.register(cb)

Register change callback

Callback is called if first_id changes and/or last_id changes and/or new entries are available (passed as argument to registered callback).

async def register(self, timestamp: float, msg: hat.syslog.common.Msg):
 97    async def register(self,
 98                       timestamp: float,
 99                       msg: common.Msg):
100        """Register message
101
102        Registration adds msg to registration queue. If queue is full, wait
103        until message can be successfully added.
104
105        When message is added to empty queue, registration delay timer is
106        started. Once delay timer expires or if number of messages in queue
107        is greater than threshold, all messages are removed from queue and
108        inserted into sqlite database.
109
110        """
111        await self._msg_queue.put((timestamp, msg))

Register message

Registration adds msg to registration queue. If queue is full, wait until message can be successfully added.

When message is added to empty queue, registration delay timer is started. Once delay timer expires or if number of messages in queue is greater than threshold, all messages are removed from queue and inserted into sqlite database.

async def query( self, filter: hat.syslog.server.common.Filter) -> List[hat.syslog.server.common.Entry]:
113    async def query(self,
114                    filter: common.Filter
115                    ) -> typing.List[common.Entry]:
116        """Query entries"""
117        return await self._db.query(filter)

Query entries

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close