Source code for gnomish_army_knife.net.task
"""
A module implementing a simple event log server task.
"""
# built-in
import asyncio
from queue import Queue
from threading import Event, Thread
from time import sleep
# third-party
from runtimepy.net.arbiter import AppInfo
# internal
from gnomish_army_knife.database.event import CombatLogEvent
from gnomish_army_knife.net.connection import CombatLogEventConnection
from gnomish_army_knife.runtime.task import GakRuntimeTask
[docs]
class LogServerTask(GakRuntimeTask):
"""A class implementing a runtime environment for an event log server."""
queue: Queue[CombatLogEvent]
stop_reading_log: Event
log_file_reader_thread: Thread
queue_saturated: asyncio.Event
once: bool
eloop: asyncio.AbstractEventLoop
[docs]
def log_file_reader_main(self) -> None:
"""
A thread entry for processing the latest/active combat log and
servicing any queue or handler consumers.
"""
self.logger.info("Combat-log reading thread started.")
latest = self.runtime.latest_combat_log()
# Always attempt to re-process prior logs that may have been
# written to.
for log in self.runtime.combat_logs:
self.runtime.database.logs.process_log(
log, stop=self.stop_reading_log
)
# Prune old log files by default.
if latest is not None and log != latest:
log.unlink()
while not self.stop_reading_log.is_set():
if latest is not None:
self.runtime.database.logs.process_log(
latest, stop=self.stop_reading_log
)
self.eloop.call_soon_threadsafe(self.queue_saturated.set)
# Ensure we don't starve due to no active log file.
sleep(0.1)
[docs]
async def init(self, app: AppInfo) -> None:
"""Initialize this task with application information."""
await super().init(app)
assert list(self.app.conn_manager.by_type(CombatLogEventConnection))
self.queue = Queue()
self.stop_reading_log = Event()
self.queue_saturated = asyncio.Event()
self.eloop = asyncio.get_event_loop()
# get this from config
self.once = app.config_param("log_server_once", False)
# Connect queue to event stream.
app.stack.enter_context(
self.runtime.database.logs.queue.registered(self.queue)
)
# Create and start log-file-reading thread.
self.log_file_reader_thread = Thread(target=self.log_file_reader_main)
self.log_file_reader_thread.start()
# Allow the queue to fully saturate.
with self.log_time(
"Waiting for event queue to saturate", reminder=True
):
await self.queue_saturated.wait()
[docs]
async def dispatch(self) -> bool:
"""Dispatch an iteration of this task."""
while not self.queue.empty():
item = self.queue.get_nowait()
self.event_count.increment()
# Forward events to connected clients.
for conn in self.app.conn_manager.by_type(
CombatLogEventConnection
):
conn.forward_handler(item)
await asyncio.sleep(0)
await asyncio.sleep(0)
return not self.once