hat.syslog.server.database

Interface to SQLite database

  1"""Interface to SQLite database"""
  2
  3from pathlib import Path
  4import logging
  5import sqlite3
  6import typing
  7
  8from hat import aio
  9
 10from hat.syslog.server import common
 11
 12
 13mlog: logging.Logger = logging.getLogger(__name__)
 14"""Module logger"""
 15
 16
 17async def create_database(path: Path,
 18                          disable_journal: bool
 19                          ) -> 'Database':
 20    """Create database"""
 21
 22    async def close():
 23        await executor(_ext_close, conn)
 24        mlog.debug('database %s closed', path)
 25
 26    executor = aio.create_executor(1)
 27    conn = await executor(_ext_connect, path, disable_journal)
 28    async_group = aio.Group()
 29    async_group.spawn(aio.call_on_cancel, close)
 30
 31    db = Database()
 32    db._path = path
 33    db._conn = conn
 34    db._async_group = async_group
 35    db._executor = executor
 36
 37    mlog.debug('opened database %s', path)
 38    return db
 39
 40
 41class Database(aio.Resource):
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        """Async group"""
 46        return self._async_group
 47
 48    async def get_first_id(self) -> typing.Optional[int]:
 49        """Get first entry id"""
 50        return await self._async_group.spawn(self._executor, _ext_fist_id,
 51                                             self._conn)
 52
 53    async def get_last_id(self) -> typing.Optional[int]:
 54        """Get last entry id"""
 55        return await self._async_group.spawn(self._executor, _ext_last_id,
 56                                             self._conn)
 57
 58    async def add_msgs(self,
 59                       msgs: typing.List[typing.Tuple[float, common.Msg]]
 60                       ) -> typing.List[common.Entry]:
 61        """Add timestamped messages"""
 62        columns = ['entry_timestamp', 'facility', 'severity', 'version',
 63                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
 64                   'data', 'msg']
 65        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
 66                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
 67                   msg.procid, msg.msgid, msg.data, msg.msg)
 68                  for entry_timestamp, msg in msgs]
 69        entry_ids = await self._async_group.spawn(
 70            self._executor, _ext_insert, self._conn, columns, values)
 71
 72        entries = [
 73            common.Entry(id=entry_id,
 74                         timestamp=entry_timestamp,
 75                         msg=msg)
 76            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
 77
 78        mlog.debug("messages added to database (message count: %s)",
 79                   len(entries))
 80        return entries
 81
 82    async def add_entries(self, entries: typing.List[common.Entry]):
 83        """Add entries"""
 84        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
 85                   'version', 'msg_timestamp', 'hostname', 'app_name',
 86                   'procid', 'msgid', 'data', 'msg']
 87        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
 88                   entry.msg.severity.value, entry.msg.version,
 89                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
 90                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
 91                   entry.msg.msg)
 92                  for entry in entries]
 93        entry_ids = await self._async_group.spawn(
 94            self._executor, _ext_insert, self._conn, columns, values)
 95        mlog.debug("entries added to database (entry count: %s)",
 96                   len(entry_ids))
 97
 98    async def query(self,
 99                    filter: common.Filter
100                    ) -> typing.List[common.Entry]:
101        """Query entries that satisfy filter"""
102        conditions = []
103        args = {}
104        if filter.last_id is not None:
105            conditions.append('rowid <= :last_id')
106            args['last_id'] = filter.last_id
107        if filter.entry_timestamp_from is not None:
108            conditions.append('entry_timestamp >= :entry_timestamp_from')
109            args['entry_timestamp_from'] = filter.entry_timestamp_from
110        if filter.entry_timestamp_to is not None:
111            conditions.append('entry_timestamp <= :entry_timestamp_to')
112            args['entry_timestamp_to'] = filter.entry_timestamp_to
113        if filter.facility:
114            conditions.append('facility = :facility')
115            args['facility'] = filter.facility.value
116        if filter.severity:
117            conditions.append('severity = :severity')
118            args['severity'] = filter.severity.value
119        if filter.hostname:
120            conditions.append('hostname LIKE :hostname')
121            args['hostname'] = f'%{filter.hostname}%'
122        if filter.app_name:
123            conditions.append('app_name LIKE :app_name')
124            args['app_name'] = f'%{filter.app_name}%'
125        if filter.procid:
126            conditions.append('procid LIKE :procid')
127            args['procid'] = f'%{filter.procid}%'
128        if filter.msgid:
129            conditions.append('msgid LIKE :msgid')
130            args['msgid'] = f'%{filter.msgid}%'
131        if filter.msg:
132            conditions.append('msg LIKE :msg')
133            args['msg'] = f'%{filter.msg}%'
134
135        result = await self._async_group.spawn(
136            self._executor, _ext_query, self._conn, conditions, args,
137            filter.max_results)
138
139        entries = [common.Entry(
140                    id=row['rowid'],
141                    timestamp=row['entry_timestamp'],
142                    msg=common.Msg(facility=common.Facility(row['facility']),
143                                   severity=common.Severity(row['severity']),
144                                   version=row['version'],
145                                   timestamp=row['msg_timestamp'],
146                                   hostname=row['hostname'],
147                                   app_name=row['app_name'],
148                                   procid=row['procid'],
149                                   msgid=row['msgid'],
150                                   data=row['data'],
151                                   msg=row['msg']))
152                   for row in result]
153
154        mlog.debug("query resulted with %s entries", len(entries))
155        return entries
156
157    async def delete(self, first_id: int):
158        """Delete entries prior to first_id"""
159        entry_count = await self._async_group.spawn(
160            self._executor, _ext_delete, self._conn, first_id)
161        mlog.debug("deleted %s entries", entry_count)
162
163
164_db_columns = [['entry_timestamp', 'REAL'],
165               ['facility', 'INTEGER'],
166               ['severity', 'INTEGER'],
167               ['version', 'INTEGER'],
168               ['msg_timestamp', 'REAL'],
169               ['hostname', 'TEXT'],
170               ['app_name', 'TEXT'],
171               ['procid', 'TEXT'],
172               ['msgid', 'TEXT'],
173               ['data', 'TEXT'],
174               ['msg', 'TEXT']]
175
176_db_query_columns = ['rowid'] + [name for name, _ in _db_columns]
177
178_db_structure = f"""
179    CREATE TABLE IF NOT EXISTS log (
180        {', '.join(col_name + ' ' + col_type
181                   for col_name, col_type in _db_columns)}
182    );
183    CREATE INDEX IF NOT EXISTS log_entry_timestamp_index ON log (
184        entry_timestamp DESC)
185    """
186
187
188def _ext_connect(path, disable_journal):
189    path.parent.mkdir(exist_ok=True)
190    conn = sqlite3.connect(f'file:{path}?nolock=1',
191                           uri=True,
192                           isolation_level=None,
193                           detect_types=sqlite3.PARSE_DECLTYPES)
194    try:
195        conn.executescript(
196            ('PRAGMA journal_mode = OFF;\n' if disable_journal else '') +
197            _db_structure)
198    except Exception:
199        conn.close()
200        raise
201    return conn
202
203
204def _ext_close(conn):
205    conn.close()
206
207
208def _ext_fist_id(conn):
209    c = conn.execute("SELECT MIN(rowid) FROM log")
210    result = c.fetchall()
211    return result[0][0] if result else None
212
213
214def _ext_last_id(conn):
215    c = conn.execute("SELECT MAX(rowid) FROM log")
216    result = c.fetchall()
217    return result[0][0] if result else None
218
219
220def _ext_delete(conn, first_id):
221    cmd = "DELETE FROM log"
222    if first_id is not None:
223        cmd += " WHERE rowid < :first_id"
224    c = conn.execute(cmd, {'first_id': first_id})
225    return c.rowcount
226
227
228def _ext_insert(conn, columns, values):
229    c = conn.executemany(f"INSERT INTO log ({', '.join(columns)}) "
230                         f"VALUES ({', '.join('?' * len(columns))})", values)
231    rowcount = c.rowcount
232    if rowcount < 1:
233        return []
234    last_id = _ext_last_id(conn)
235    return range(last_id - rowcount + 1, last_id + 1)
236
237
238def _ext_query(conn, conditions, args, max_results):
239    c = conn.execute(
240        ' '.join([
241            "SELECT rowid, *",
242            "FROM log",
243            ('WHERE ' + ' AND '.join(conditions) if conditions else ''),
244            "ORDER BY rowid DESC",
245            ("LIMIT :max_results" if max_results is not None else '')]),
246        dict(args, max_results=max_results))
247    result = c.fetchall()
248    return [{k: v for k, v in zip(_db_query_columns, i)}
249            for i in result]
mlog: logging.Logger = <Logger hat.syslog.server.database (WARNING)>

Module logger

async def create_database( path: pathlib.Path, disable_journal: bool) -> hat.syslog.server.database.Database:
18async def create_database(path: Path,
19                          disable_journal: bool
20                          ) -> 'Database':
21    """Create database"""
22
23    async def close():
24        await executor(_ext_close, conn)
25        mlog.debug('database %s closed', path)
26
27    executor = aio.create_executor(1)
28    conn = await executor(_ext_connect, path, disable_journal)
29    async_group = aio.Group()
30    async_group.spawn(aio.call_on_cancel, close)
31
32    db = Database()
33    db._path = path
34    db._conn = conn
35    db._async_group = async_group
36    db._executor = executor
37
38    mlog.debug('opened database %s', path)
39    return db

Create database

class Database(hat.aio.Resource):
 42class Database(aio.Resource):
 43
 44    @property
 45    def async_group(self) -> aio.Group:
 46        """Async group"""
 47        return self._async_group
 48
 49    async def get_first_id(self) -> typing.Optional[int]:
 50        """Get first entry id"""
 51        return await self._async_group.spawn(self._executor, _ext_fist_id,
 52                                             self._conn)
 53
 54    async def get_last_id(self) -> typing.Optional[int]:
 55        """Get last entry id"""
 56        return await self._async_group.spawn(self._executor, _ext_last_id,
 57                                             self._conn)
 58
 59    async def add_msgs(self,
 60                       msgs: typing.List[typing.Tuple[float, common.Msg]]
 61                       ) -> typing.List[common.Entry]:
 62        """Add timestamped messages"""
 63        columns = ['entry_timestamp', 'facility', 'severity', 'version',
 64                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
 65                   'data', 'msg']
 66        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
 67                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
 68                   msg.procid, msg.msgid, msg.data, msg.msg)
 69                  for entry_timestamp, msg in msgs]
 70        entry_ids = await self._async_group.spawn(
 71            self._executor, _ext_insert, self._conn, columns, values)
 72
 73        entries = [
 74            common.Entry(id=entry_id,
 75                         timestamp=entry_timestamp,
 76                         msg=msg)
 77            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
 78
 79        mlog.debug("messages added to database (message count: %s)",
 80                   len(entries))
 81        return entries
 82
 83    async def add_entries(self, entries: typing.List[common.Entry]):
 84        """Add entries"""
 85        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
 86                   'version', 'msg_timestamp', 'hostname', 'app_name',
 87                   'procid', 'msgid', 'data', 'msg']
 88        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
 89                   entry.msg.severity.value, entry.msg.version,
 90                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
 91                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
 92                   entry.msg.msg)
 93                  for entry in entries]
 94        entry_ids = await self._async_group.spawn(
 95            self._executor, _ext_insert, self._conn, columns, values)
 96        mlog.debug("entries added to database (entry count: %s)",
 97                   len(entry_ids))
 98
 99    async def query(self,
100                    filter: common.Filter
101                    ) -> typing.List[common.Entry]:
102        """Query entries that satisfy filter"""
103        conditions = []
104        args = {}
105        if filter.last_id is not None:
106            conditions.append('rowid <= :last_id')
107            args['last_id'] = filter.last_id
108        if filter.entry_timestamp_from is not None:
109            conditions.append('entry_timestamp >= :entry_timestamp_from')
110            args['entry_timestamp_from'] = filter.entry_timestamp_from
111        if filter.entry_timestamp_to is not None:
112            conditions.append('entry_timestamp <= :entry_timestamp_to')
113            args['entry_timestamp_to'] = filter.entry_timestamp_to
114        if filter.facility:
115            conditions.append('facility = :facility')
116            args['facility'] = filter.facility.value
117        if filter.severity:
118            conditions.append('severity = :severity')
119            args['severity'] = filter.severity.value
120        if filter.hostname:
121            conditions.append('hostname LIKE :hostname')
122            args['hostname'] = f'%{filter.hostname}%'
123        if filter.app_name:
124            conditions.append('app_name LIKE :app_name')
125            args['app_name'] = f'%{filter.app_name}%'
126        if filter.procid:
127            conditions.append('procid LIKE :procid')
128            args['procid'] = f'%{filter.procid}%'
129        if filter.msgid:
130            conditions.append('msgid LIKE :msgid')
131            args['msgid'] = f'%{filter.msgid}%'
132        if filter.msg:
133            conditions.append('msg LIKE :msg')
134            args['msg'] = f'%{filter.msg}%'
135
136        result = await self._async_group.spawn(
137            self._executor, _ext_query, self._conn, conditions, args,
138            filter.max_results)
139
140        entries = [common.Entry(
141                    id=row['rowid'],
142                    timestamp=row['entry_timestamp'],
143                    msg=common.Msg(facility=common.Facility(row['facility']),
144                                   severity=common.Severity(row['severity']),
145                                   version=row['version'],
146                                   timestamp=row['msg_timestamp'],
147                                   hostname=row['hostname'],
148                                   app_name=row['app_name'],
149                                   procid=row['procid'],
150                                   msgid=row['msgid'],
151                                   data=row['data'],
152                                   msg=row['msg']))
153                   for row in result]
154
155        mlog.debug("query resulted with %s entries", len(entries))
156        return entries
157
158    async def delete(self, first_id: int):
159        """Delete entries prior to first_id"""
160        entry_count = await self._async_group.spawn(
161            self._executor, _ext_delete, self._conn, first_id)
162        mlog.debug("deleted %s entries", entry_count)

Resource with lifetime control based on Group.

Database()
async_group: hat.aio.Group

Async group

async def get_first_id(self) -> Optional[int]:
49    async def get_first_id(self) -> typing.Optional[int]:
50        """Get first entry id"""
51        return await self._async_group.spawn(self._executor, _ext_fist_id,
52                                             self._conn)

Get first entry id

async def get_last_id(self) -> Optional[int]:
54    async def get_last_id(self) -> typing.Optional[int]:
55        """Get last entry id"""
56        return await self._async_group.spawn(self._executor, _ext_last_id,
57                                             self._conn)

Get last entry id

async def add_msgs( self, msgs: List[Tuple[float, hat.syslog.common.Msg]]) -> List[hat.syslog.server.common.Entry]:
59    async def add_msgs(self,
60                       msgs: typing.List[typing.Tuple[float, common.Msg]]
61                       ) -> typing.List[common.Entry]:
62        """Add timestamped messages"""
63        columns = ['entry_timestamp', 'facility', 'severity', 'version',
64                   'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid',
65                   'data', 'msg']
66        values = [(entry_timestamp, msg.facility.value, msg.severity.value,
67                   msg.version, msg.timestamp, msg.hostname, msg.app_name,
68                   msg.procid, msg.msgid, msg.data, msg.msg)
69                  for entry_timestamp, msg in msgs]
70        entry_ids = await self._async_group.spawn(
71            self._executor, _ext_insert, self._conn, columns, values)
72
73        entries = [
74            common.Entry(id=entry_id,
75                         timestamp=entry_timestamp,
76                         msg=msg)
77            for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)]
78
79        mlog.debug("messages added to database (message count: %s)",
80                   len(entries))
81        return entries

Add timestamped messages

async def add_entries(self, entries: List[hat.syslog.server.common.Entry]):
83    async def add_entries(self, entries: typing.List[common.Entry]):
84        """Add entries"""
85        columns = ['rowid', 'entry_timestamp', 'facility', 'severity',
86                   'version', 'msg_timestamp', 'hostname', 'app_name',
87                   'procid', 'msgid', 'data', 'msg']
88        values = [(entry.id, entry.timestamp, entry.msg.facility.value,
89                   entry.msg.severity.value, entry.msg.version,
90                   entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name,
91                   entry.msg.procid, entry.msg.msgid, entry.msg.data,
92                   entry.msg.msg)
93                  for entry in entries]
94        entry_ids = await self._async_group.spawn(
95            self._executor, _ext_insert, self._conn, columns, values)
96        mlog.debug("entries added to database (entry count: %s)",
97                   len(entry_ids))

Add entries

async def query( self, filter: hat.syslog.server.common.Filter) -> List[hat.syslog.server.common.Entry]:
 99    async def query(self,
100                    filter: common.Filter
101                    ) -> typing.List[common.Entry]:
102        """Query entries that satisfy filter"""
103        conditions = []
104        args = {}
105        if filter.last_id is not None:
106            conditions.append('rowid <= :last_id')
107            args['last_id'] = filter.last_id
108        if filter.entry_timestamp_from is not None:
109            conditions.append('entry_timestamp >= :entry_timestamp_from')
110            args['entry_timestamp_from'] = filter.entry_timestamp_from
111        if filter.entry_timestamp_to is not None:
112            conditions.append('entry_timestamp <= :entry_timestamp_to')
113            args['entry_timestamp_to'] = filter.entry_timestamp_to
114        if filter.facility:
115            conditions.append('facility = :facility')
116            args['facility'] = filter.facility.value
117        if filter.severity:
118            conditions.append('severity = :severity')
119            args['severity'] = filter.severity.value
120        if filter.hostname:
121            conditions.append('hostname LIKE :hostname')
122            args['hostname'] = f'%{filter.hostname}%'
123        if filter.app_name:
124            conditions.append('app_name LIKE :app_name')
125            args['app_name'] = f'%{filter.app_name}%'
126        if filter.procid:
127            conditions.append('procid LIKE :procid')
128            args['procid'] = f'%{filter.procid}%'
129        if filter.msgid:
130            conditions.append('msgid LIKE :msgid')
131            args['msgid'] = f'%{filter.msgid}%'
132        if filter.msg:
133            conditions.append('msg LIKE :msg')
134            args['msg'] = f'%{filter.msg}%'
135
136        result = await self._async_group.spawn(
137            self._executor, _ext_query, self._conn, conditions, args,
138            filter.max_results)
139
140        entries = [common.Entry(
141                    id=row['rowid'],
142                    timestamp=row['entry_timestamp'],
143                    msg=common.Msg(facility=common.Facility(row['facility']),
144                                   severity=common.Severity(row['severity']),
145                                   version=row['version'],
146                                   timestamp=row['msg_timestamp'],
147                                   hostname=row['hostname'],
148                                   app_name=row['app_name'],
149                                   procid=row['procid'],
150                                   msgid=row['msgid'],
151                                   data=row['data'],
152                                   msg=row['msg']))
153                   for row in result]
154
155        mlog.debug("query resulted with %s entries", len(entries))
156        return entries

Query entries that satisfy filter

async def delete(self, first_id: int):
158    async def delete(self, first_id: int):
159        """Delete entries prior to first_id"""
160        entry_count = await self._async_group.spawn(
161            self._executor, _ext_delete, self._conn, first_id)
162        mlog.debug("deleted %s entries", entry_count)

Delete entries prior to first_id

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close