hat.syslog.server.database
Interface to SQLite database
1"""Interface to SQLite database""" 2 3from pathlib import Path 4import logging 5import sqlite3 6import typing 7 8from hat import aio 9 10from hat.syslog.server import common 11 12 13mlog: logging.Logger = logging.getLogger(__name__) 14"""Module logger""" 15 16 17async def create_database(path: Path, 18 disable_journal: bool 19 ) -> 'Database': 20 """Create database""" 21 22 async def close(): 23 await executor(_ext_close, conn) 24 mlog.debug('database %s closed', path) 25 26 executor = aio.create_executor(1) 27 conn = await executor(_ext_connect, path, disable_journal) 28 async_group = aio.Group() 29 async_group.spawn(aio.call_on_cancel, close) 30 31 db = Database() 32 db._path = path 33 db._conn = conn 34 db._async_group = async_group 35 db._executor = executor 36 37 mlog.debug('opened database %s', path) 38 return db 39 40 41class Database(aio.Resource): 42 43 @property 44 def async_group(self) -> aio.Group: 45 """Async group""" 46 return self._async_group 47 48 async def get_first_id(self) -> typing.Optional[int]: 49 """Get first entry id""" 50 return await self._async_group.spawn(self._executor, _ext_fist_id, 51 self._conn) 52 53 async def get_last_id(self) -> typing.Optional[int]: 54 """Get last entry id""" 55 return await self._async_group.spawn(self._executor, _ext_last_id, 56 self._conn) 57 58 async def add_msgs(self, 59 msgs: typing.List[typing.Tuple[float, common.Msg]] 60 ) -> typing.List[common.Entry]: 61 """Add timestamped messages""" 62 columns = ['entry_timestamp', 'facility', 'severity', 'version', 63 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 64 'data', 'msg'] 65 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 66 msg.version, msg.timestamp, msg.hostname, msg.app_name, 67 msg.procid, msg.msgid, msg.data, msg.msg) 68 for entry_timestamp, msg in msgs] 69 entry_ids = await self._async_group.spawn( 70 self._executor, _ext_insert, self._conn, columns, values) 71 72 entries = [ 73 common.Entry(id=entry_id, 74 timestamp=entry_timestamp, 75 msg=msg) 76 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 77 78 mlog.debug("messages added to database (message count: %s)", 79 len(entries)) 80 return entries 81 82 async def add_entries(self, entries: typing.List[common.Entry]): 83 """Add entries""" 84 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 85 'version', 'msg_timestamp', 'hostname', 'app_name', 86 'procid', 'msgid', 'data', 'msg'] 87 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 88 entry.msg.severity.value, entry.msg.version, 89 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 90 entry.msg.procid, entry.msg.msgid, entry.msg.data, 91 entry.msg.msg) 92 for entry in entries] 93 entry_ids = await self._async_group.spawn( 94 self._executor, _ext_insert, self._conn, columns, values) 95 mlog.debug("entries added to database (entry count: %s)", 96 len(entry_ids)) 97 98 async def query(self, 99 filter: common.Filter 100 ) -> typing.List[common.Entry]: 101 """Query entries that satisfy filter""" 102 conditions = [] 103 args = {} 104 if filter.last_id is not None: 105 conditions.append('rowid <= :last_id') 106 args['last_id'] = filter.last_id 107 if filter.entry_timestamp_from is not None: 108 conditions.append('entry_timestamp >= :entry_timestamp_from') 109 args['entry_timestamp_from'] = filter.entry_timestamp_from 110 if filter.entry_timestamp_to is not None: 111 conditions.append('entry_timestamp <= :entry_timestamp_to') 112 args['entry_timestamp_to'] = filter.entry_timestamp_to 113 if filter.facility: 114 conditions.append('facility = :facility') 115 args['facility'] = filter.facility.value 116 if filter.severity: 117 conditions.append('severity = :severity') 118 args['severity'] = filter.severity.value 119 if filter.hostname: 120 conditions.append('hostname LIKE :hostname') 121 args['hostname'] = f'%{filter.hostname}%' 122 if filter.app_name: 123 conditions.append('app_name LIKE :app_name') 124 args['app_name'] = f'%{filter.app_name}%' 125 if filter.procid: 126 conditions.append('procid LIKE :procid') 127 args['procid'] = f'%{filter.procid}%' 128 if filter.msgid: 129 conditions.append('msgid LIKE :msgid') 130 args['msgid'] = f'%{filter.msgid}%' 131 if filter.msg: 132 conditions.append('msg LIKE :msg') 133 args['msg'] = f'%{filter.msg}%' 134 135 result = await self._async_group.spawn( 136 self._executor, _ext_query, self._conn, conditions, args, 137 filter.max_results) 138 139 entries = [common.Entry( 140 id=row['rowid'], 141 timestamp=row['entry_timestamp'], 142 msg=common.Msg(facility=common.Facility(row['facility']), 143 severity=common.Severity(row['severity']), 144 version=row['version'], 145 timestamp=row['msg_timestamp'], 146 hostname=row['hostname'], 147 app_name=row['app_name'], 148 procid=row['procid'], 149 msgid=row['msgid'], 150 data=row['data'], 151 msg=row['msg'])) 152 for row in result] 153 154 mlog.debug("query resulted with %s entries", len(entries)) 155 return entries 156 157 async def delete(self, first_id: int): 158 """Delete entries prior to first_id""" 159 entry_count = await self._async_group.spawn( 160 self._executor, _ext_delete, self._conn, first_id) 161 mlog.debug("deleted %s entries", entry_count) 162 163 164_db_columns = [['entry_timestamp', 'REAL'], 165 ['facility', 'INTEGER'], 166 ['severity', 'INTEGER'], 167 ['version', 'INTEGER'], 168 ['msg_timestamp', 'REAL'], 169 ['hostname', 'TEXT'], 170 ['app_name', 'TEXT'], 171 ['procid', 'TEXT'], 172 ['msgid', 'TEXT'], 173 ['data', 'TEXT'], 174 ['msg', 'TEXT']] 175 176_db_query_columns = ['rowid'] + [name for name, _ in _db_columns] 177 178_db_structure = f""" 179 CREATE TABLE IF NOT EXISTS log ( 180 {', '.join(col_name + ' ' + col_type 181 for col_name, col_type in _db_columns)} 182 ); 183 CREATE INDEX IF NOT EXISTS log_entry_timestamp_index ON log ( 184 entry_timestamp DESC) 185 """ 186 187 188def _ext_connect(path, disable_journal): 189 path.parent.mkdir(exist_ok=True) 190 conn = sqlite3.connect(f'file:{path}?nolock=1', 191 uri=True, 192 isolation_level=None, 193 detect_types=sqlite3.PARSE_DECLTYPES) 194 try: 195 conn.executescript( 196 ('PRAGMA journal_mode = OFF;\n' if disable_journal else '') + 197 _db_structure) 198 except Exception: 199 conn.close() 200 raise 201 return conn 202 203 204def _ext_close(conn): 205 conn.close() 206 207 208def _ext_fist_id(conn): 209 c = conn.execute("SELECT MIN(rowid) FROM log") 210 result = c.fetchall() 211 return result[0][0] if result else None 212 213 214def _ext_last_id(conn): 215 c = conn.execute("SELECT MAX(rowid) FROM log") 216 result = c.fetchall() 217 return result[0][0] if result else None 218 219 220def _ext_delete(conn, first_id): 221 cmd = "DELETE FROM log" 222 if first_id is not None: 223 cmd += " WHERE rowid < :first_id" 224 c = conn.execute(cmd, {'first_id': first_id}) 225 return c.rowcount 226 227 228def _ext_insert(conn, columns, values): 229 c = conn.executemany(f"INSERT INTO log ({', '.join(columns)}) " 230 f"VALUES ({', '.join('?' * len(columns))})", values) 231 rowcount = c.rowcount 232 if rowcount < 1: 233 return [] 234 last_id = _ext_last_id(conn) 235 return range(last_id - rowcount + 1, last_id + 1) 236 237 238def _ext_query(conn, conditions, args, max_results): 239 c = conn.execute( 240 ' '.join([ 241 "SELECT rowid, *", 242 "FROM log", 243 ('WHERE ' + ' AND '.join(conditions) if conditions else ''), 244 "ORDER BY rowid DESC", 245 ("LIMIT :max_results" if max_results is not None else '')]), 246 dict(args, max_results=max_results)) 247 result = c.fetchall() 248 return [{k: v for k, v in zip(_db_query_columns, i)} 249 for i in result]
Module logger
async def
create_database( path: pathlib.Path, disable_journal: bool) -> hat.syslog.server.database.Database:
18async def create_database(path: Path, 19 disable_journal: bool 20 ) -> 'Database': 21 """Create database""" 22 23 async def close(): 24 await executor(_ext_close, conn) 25 mlog.debug('database %s closed', path) 26 27 executor = aio.create_executor(1) 28 conn = await executor(_ext_connect, path, disable_journal) 29 async_group = aio.Group() 30 async_group.spawn(aio.call_on_cancel, close) 31 32 db = Database() 33 db._path = path 34 db._conn = conn 35 db._async_group = async_group 36 db._executor = executor 37 38 mlog.debug('opened database %s', path) 39 return db
Create database
class
Database(hat.aio.Resource):
42class Database(aio.Resource): 43 44 @property 45 def async_group(self) -> aio.Group: 46 """Async group""" 47 return self._async_group 48 49 async def get_first_id(self) -> typing.Optional[int]: 50 """Get first entry id""" 51 return await self._async_group.spawn(self._executor, _ext_fist_id, 52 self._conn) 53 54 async def get_last_id(self) -> typing.Optional[int]: 55 """Get last entry id""" 56 return await self._async_group.spawn(self._executor, _ext_last_id, 57 self._conn) 58 59 async def add_msgs(self, 60 msgs: typing.List[typing.Tuple[float, common.Msg]] 61 ) -> typing.List[common.Entry]: 62 """Add timestamped messages""" 63 columns = ['entry_timestamp', 'facility', 'severity', 'version', 64 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 65 'data', 'msg'] 66 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 67 msg.version, msg.timestamp, msg.hostname, msg.app_name, 68 msg.procid, msg.msgid, msg.data, msg.msg) 69 for entry_timestamp, msg in msgs] 70 entry_ids = await self._async_group.spawn( 71 self._executor, _ext_insert, self._conn, columns, values) 72 73 entries = [ 74 common.Entry(id=entry_id, 75 timestamp=entry_timestamp, 76 msg=msg) 77 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 78 79 mlog.debug("messages added to database (message count: %s)", 80 len(entries)) 81 return entries 82 83 async def add_entries(self, entries: typing.List[common.Entry]): 84 """Add entries""" 85 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 86 'version', 'msg_timestamp', 'hostname', 'app_name', 87 'procid', 'msgid', 'data', 'msg'] 88 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 89 entry.msg.severity.value, entry.msg.version, 90 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 91 entry.msg.procid, entry.msg.msgid, entry.msg.data, 92 entry.msg.msg) 93 for entry in entries] 94 entry_ids = await self._async_group.spawn( 95 self._executor, _ext_insert, self._conn, columns, values) 96 mlog.debug("entries added to database (entry count: %s)", 97 len(entry_ids)) 98 99 async def query(self, 100 filter: common.Filter 101 ) -> typing.List[common.Entry]: 102 """Query entries that satisfy filter""" 103 conditions = [] 104 args = {} 105 if filter.last_id is not None: 106 conditions.append('rowid <= :last_id') 107 args['last_id'] = filter.last_id 108 if filter.entry_timestamp_from is not None: 109 conditions.append('entry_timestamp >= :entry_timestamp_from') 110 args['entry_timestamp_from'] = filter.entry_timestamp_from 111 if filter.entry_timestamp_to is not None: 112 conditions.append('entry_timestamp <= :entry_timestamp_to') 113 args['entry_timestamp_to'] = filter.entry_timestamp_to 114 if filter.facility: 115 conditions.append('facility = :facility') 116 args['facility'] = filter.facility.value 117 if filter.severity: 118 conditions.append('severity = :severity') 119 args['severity'] = filter.severity.value 120 if filter.hostname: 121 conditions.append('hostname LIKE :hostname') 122 args['hostname'] = f'%{filter.hostname}%' 123 if filter.app_name: 124 conditions.append('app_name LIKE :app_name') 125 args['app_name'] = f'%{filter.app_name}%' 126 if filter.procid: 127 conditions.append('procid LIKE :procid') 128 args['procid'] = f'%{filter.procid}%' 129 if filter.msgid: 130 conditions.append('msgid LIKE :msgid') 131 args['msgid'] = f'%{filter.msgid}%' 132 if filter.msg: 133 conditions.append('msg LIKE :msg') 134 args['msg'] = f'%{filter.msg}%' 135 136 result = await self._async_group.spawn( 137 self._executor, _ext_query, self._conn, conditions, args, 138 filter.max_results) 139 140 entries = [common.Entry( 141 id=row['rowid'], 142 timestamp=row['entry_timestamp'], 143 msg=common.Msg(facility=common.Facility(row['facility']), 144 severity=common.Severity(row['severity']), 145 version=row['version'], 146 timestamp=row['msg_timestamp'], 147 hostname=row['hostname'], 148 app_name=row['app_name'], 149 procid=row['procid'], 150 msgid=row['msgid'], 151 data=row['data'], 152 msg=row['msg'])) 153 for row in result] 154 155 mlog.debug("query resulted with %s entries", len(entries)) 156 return entries 157 158 async def delete(self, first_id: int): 159 """Delete entries prior to first_id""" 160 entry_count = await self._async_group.spawn( 161 self._executor, _ext_delete, self._conn, first_id) 162 mlog.debug("deleted %s entries", entry_count)
Resource with lifetime control based on Group
.
async def
get_first_id(self) -> Optional[int]:
49 async def get_first_id(self) -> typing.Optional[int]: 50 """Get first entry id""" 51 return await self._async_group.spawn(self._executor, _ext_fist_id, 52 self._conn)
Get first entry id
async def
get_last_id(self) -> Optional[int]:
54 async def get_last_id(self) -> typing.Optional[int]: 55 """Get last entry id""" 56 return await self._async_group.spawn(self._executor, _ext_last_id, 57 self._conn)
Get last entry id
async def
add_msgs( self, msgs: List[Tuple[float, hat.syslog.common.Msg]]) -> List[hat.syslog.server.common.Entry]:
59 async def add_msgs(self, 60 msgs: typing.List[typing.Tuple[float, common.Msg]] 61 ) -> typing.List[common.Entry]: 62 """Add timestamped messages""" 63 columns = ['entry_timestamp', 'facility', 'severity', 'version', 64 'msg_timestamp', 'hostname', 'app_name', 'procid', 'msgid', 65 'data', 'msg'] 66 values = [(entry_timestamp, msg.facility.value, msg.severity.value, 67 msg.version, msg.timestamp, msg.hostname, msg.app_name, 68 msg.procid, msg.msgid, msg.data, msg.msg) 69 for entry_timestamp, msg in msgs] 70 entry_ids = await self._async_group.spawn( 71 self._executor, _ext_insert, self._conn, columns, values) 72 73 entries = [ 74 common.Entry(id=entry_id, 75 timestamp=entry_timestamp, 76 msg=msg) 77 for entry_id, (entry_timestamp, msg) in zip(entry_ids, msgs)] 78 79 mlog.debug("messages added to database (message count: %s)", 80 len(entries)) 81 return entries
Add timestamped messages
83 async def add_entries(self, entries: typing.List[common.Entry]): 84 """Add entries""" 85 columns = ['rowid', 'entry_timestamp', 'facility', 'severity', 86 'version', 'msg_timestamp', 'hostname', 'app_name', 87 'procid', 'msgid', 'data', 'msg'] 88 values = [(entry.id, entry.timestamp, entry.msg.facility.value, 89 entry.msg.severity.value, entry.msg.version, 90 entry.msg.timestamp, entry.msg.hostname, entry.msg.app_name, 91 entry.msg.procid, entry.msg.msgid, entry.msg.data, 92 entry.msg.msg) 93 for entry in entries] 94 entry_ids = await self._async_group.spawn( 95 self._executor, _ext_insert, self._conn, columns, values) 96 mlog.debug("entries added to database (entry count: %s)", 97 len(entry_ids))
Add entries
async def
query( self, filter: hat.syslog.server.common.Filter) -> List[hat.syslog.server.common.Entry]:
99 async def query(self, 100 filter: common.Filter 101 ) -> typing.List[common.Entry]: 102 """Query entries that satisfy filter""" 103 conditions = [] 104 args = {} 105 if filter.last_id is not None: 106 conditions.append('rowid <= :last_id') 107 args['last_id'] = filter.last_id 108 if filter.entry_timestamp_from is not None: 109 conditions.append('entry_timestamp >= :entry_timestamp_from') 110 args['entry_timestamp_from'] = filter.entry_timestamp_from 111 if filter.entry_timestamp_to is not None: 112 conditions.append('entry_timestamp <= :entry_timestamp_to') 113 args['entry_timestamp_to'] = filter.entry_timestamp_to 114 if filter.facility: 115 conditions.append('facility = :facility') 116 args['facility'] = filter.facility.value 117 if filter.severity: 118 conditions.append('severity = :severity') 119 args['severity'] = filter.severity.value 120 if filter.hostname: 121 conditions.append('hostname LIKE :hostname') 122 args['hostname'] = f'%{filter.hostname}%' 123 if filter.app_name: 124 conditions.append('app_name LIKE :app_name') 125 args['app_name'] = f'%{filter.app_name}%' 126 if filter.procid: 127 conditions.append('procid LIKE :procid') 128 args['procid'] = f'%{filter.procid}%' 129 if filter.msgid: 130 conditions.append('msgid LIKE :msgid') 131 args['msgid'] = f'%{filter.msgid}%' 132 if filter.msg: 133 conditions.append('msg LIKE :msg') 134 args['msg'] = f'%{filter.msg}%' 135 136 result = await self._async_group.spawn( 137 self._executor, _ext_query, self._conn, conditions, args, 138 filter.max_results) 139 140 entries = [common.Entry( 141 id=row['rowid'], 142 timestamp=row['entry_timestamp'], 143 msg=common.Msg(facility=common.Facility(row['facility']), 144 severity=common.Severity(row['severity']), 145 version=row['version'], 146 timestamp=row['msg_timestamp'], 147 hostname=row['hostname'], 148 app_name=row['app_name'], 149 procid=row['procid'], 150 msgid=row['msgid'], 151 data=row['data'], 152 msg=row['msg'])) 153 for row in result] 154 155 mlog.debug("query resulted with %s entries", len(entries)) 156 return entries
Query entries that satisfy filter
async def
delete(self, first_id: int):
158 async def delete(self, first_id: int): 159 """Delete entries prior to first_id""" 160 entry_count = await self._async_group.spawn( 161 self._executor, _ext_delete, self._conn, first_id) 162 mlog.debug("deleted %s entries", entry_count)
Delete entries prior to first_id
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close