Source code for gnomish_army_knife.net.task

"""
A module implementing a simple event log server task.
"""

# built-in
import asyncio
from queue import Queue
from threading import Event, Thread
from time import sleep

# third-party
from runtimepy.net.arbiter import AppInfo

# internal
from gnomish_army_knife.database.event import CombatLogEvent
from gnomish_army_knife.net.connection import CombatLogEventConnection
from gnomish_army_knife.runtime.task import GakRuntimeTask


[docs] class LogServerTask(GakRuntimeTask): """A class implementing a runtime environment for an event log server.""" queue: Queue[CombatLogEvent] stop_reading_log: Event log_file_reader_thread: Thread queue_saturated: asyncio.Event once: bool eloop: asyncio.AbstractEventLoop
[docs] def log_file_reader_main(self) -> None: """ A thread entry for processing the latest/active combat log and servicing any queue or handler consumers. """ self.logger.info("Combat-log reading thread started.") latest = self.runtime.latest_combat_log() # Always attempt to re-process prior logs that may have been # written to. for log in self.runtime.combat_logs: self.runtime.database.logs.process_log( log, stop=self.stop_reading_log ) # Prune old log files by default. if latest is not None and log != latest: log.unlink() while not self.stop_reading_log.is_set(): if latest is not None: self.runtime.database.logs.process_log( latest, stop=self.stop_reading_log ) self.eloop.call_soon_threadsafe(self.queue_saturated.set) # Ensure we don't starve due to no active log file. sleep(0.1)
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" await super().init(app) assert list(self.app.conn_manager.by_type(CombatLogEventConnection)) self.queue = Queue() self.stop_reading_log = Event() self.queue_saturated = asyncio.Event() self.eloop = asyncio.get_event_loop() # get this from config self.once = app.config_param("log_server_once", False) # Connect queue to event stream. app.stack.enter_context( self.runtime.database.logs.queue.registered(self.queue) ) # Create and start log-file-reading thread. self.log_file_reader_thread = Thread(target=self.log_file_reader_main) self.log_file_reader_thread.start() # Allow the queue to fully saturate. with self.log_time( "Waiting for event queue to saturate", reminder=True ): await self.queue_saturated.wait()
[docs] async def stop_extra(self) -> None: """Extra actions to perform when this task is stopping.""" # Signal log-reading thread to stop and wait for it to be stopped. self.stop_reading_log.set() self.log_file_reader_thread.join() self.logger.info("Combat-log reading thread stopped.")
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" while not self.queue.empty(): item = self.queue.get_nowait() self.event_count.increment() # Forward events to connected clients. for conn in self.app.conn_manager.by_type( CombatLogEventConnection ): conn.forward_handler(item) await asyncio.sleep(0) await asyncio.sleep(0) return not self.once