Source code for gnomish_army_knife.database.queue

"""
A module implementing a simple queue-handler event interface.
"""

# built-in
from contextlib import contextmanager
from queue import Queue
from typing import Iterator

# internal
from gnomish_army_knife.database.event import CombatLogEvent


[docs] class CombatLogQueueHandler: """ A simple mixin class for handling combat log events via per-event handlers and queue handlers. """ def __init__(self) -> None: """Initialize this instance.""" self.queues: dict[int, Queue[CombatLogEvent]] = {} self.queue_idx = 1
[docs] def register(self, queue: Queue[CombatLogEvent]) -> int: """Register a queue.""" result = self.queue_idx self.queues[result] = queue self.queue_idx += 1 return result
[docs] def remove(self, index: int) -> bool: """Remove a previously registered queue.""" remove = index in self.queues if remove: del self.queues[index] return remove
[docs] @contextmanager def registered(self, queue: Queue[CombatLogEvent]) -> Iterator[None]: """Register a queue as a managed context.""" idx = self.register(queue) try: yield finally: assert self.remove(idx)
[docs] def handle(self, event: CombatLogEvent) -> None: """Handle a combat-log event.""" for queue in self.queues.values(): queue.put_nowait(event)