Source code for runtimepy.mixins.async_command
"""
A module implementing an interface for classes with channel-command processors
that need to handle commands asynchronously.
"""
# built-in
from argparse import Namespace
import asyncio
from typing import Optional
# third-party
from vcorelib.logging import LoggerMixin
# internal
from runtimepy.channel.environment.command import FieldOrChannel
from runtimepy.channel.environment.command.parser import ChannelCommand
from runtimepy.channel.environment.command.processor import (
ChannelCommandProcessor,
CustomCommand,
)
ChannelCommandParams = tuple[Namespace, Optional[FieldOrChannel]]
[docs]
class AsyncCommandProcessingMixin(LoggerMixin):
"""A class mixin for handling asynchronous commands."""
command: ChannelCommandProcessor
outgoing_commands: asyncio.Queue[ChannelCommandParams]
def _setup_async_commands(self, *custom_commands: CustomCommand) -> None:
"""Setup asynchronous commands."""
if not hasattr(self, "outgoing_commands"):
self.outgoing_commands = asyncio.Queue()
self.command.hooks.append(self._handle_command)
self.command.register_custom_commands(*custom_commands)
[docs]
async def handle_command(
self, args: Namespace, channel: Optional[FieldOrChannel]
) -> None:
"""Handle a command."""
def _handle_command(
self, args: Namespace, channel: Optional[FieldOrChannel]
) -> None:
"""Determine if a remote command should be queued up."""
self.outgoing_commands.put_nowait((args, channel))
[docs]
async def process_command_queue(self) -> None:
"""Process any outgoing command requests."""
while not self.outgoing_commands.empty():
params = self.outgoing_commands.get_nowait()
if params[0].command == ChannelCommand.CUSTOM:
await self.command.handle_custom_command(*params)
else:
await self.handle_command(*params)
self.outgoing_commands.task_done()