Source code for gnomish_army_knife.net.writer
"""
A module implementing an arena-match database writer task.
"""
# built-in
from logging import DEBUG
from queue import Queue
# third-party
from runtimepy.net.arbiter import AppInfo
# internal
from gnomish_army_knife.database.event import CombatLogEvent
from gnomish_army_knife.database.writer import ArenaMatchWriter
from gnomish_army_knife.net.connection import CombatLogEventConnection
from gnomish_army_knife.runtime.task import GakRuntimeTask
[docs]
class LogWriterTask(GakRuntimeTask):
"""A class that writes incoming arena match data to a file database."""
writers: dict[
CombatLogEventConnection,
tuple[ArenaMatchWriter, Queue[CombatLogEvent], int],
]
[docs]
async def init(self, app: AppInfo) -> None:
"""Initialize this task with application information."""
await super().init(app)
self.writers = {}
[docs]
async def dispatch(self) -> bool:
"""Dispatch an iteration of this task."""
inactive = set()
# log some info when connection state changed / reconcile with
# UiTask implementation
# Determine which connections are still active or need writing
# interfaces.
active = set()
for conn in self.app.conn_manager.by_type(CombatLogEventConnection):
active.add(conn)
# should move this to some kind of "connection instance created"
# callback (need runtimepy to support this), maybe just "connected"
# callback? (that can fire multiple times i.e. on re-connect?)
# should be a connect/disconnect callback thing?
if conn not in self.writers:
writer = self.runtime.database.create_writer()
queue: Queue[CombatLogEvent] = Queue()
self.writers[conn] = (
writer,
queue,
conn.queue.register(queue),
)
conn.logger.info("Registered as match database stream.")
# Clean up inactive writers (could move this to disconnect callback
# system described above).
for conn in self.writers:
if conn not in active:
inactive.add(conn)
for conn in inactive:
conn.queue.remove(self.writers[conn][2])
del self.writers[conn]
# Handle connection events.
for conn, (writer, queue, _) in self.writers.items():
while not queue.empty():
event = queue.get_nowait()
# time since last event seen
# Could count ignored events at some point.
if await writer.handle(event):
self.event_count.increment()
else:
self.ignore_count.increment()
event.log(self.logger, level=DEBUG)
# update time since last event channel
return True