Source code for runtimepy.task.sample

"""
A sample task interface.
"""

# built-in
from argparse import Namespace
import asyncio
from logging import DEBUG
import math
from typing import Optional

# internal
from runtimepy.channel.environment.command import FieldOrChannel
from runtimepy.channel.environment.sample import poll_sample_env, sample_env
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.mixins.trig import TrigMixin
from runtimepy.net.arbiter import AppInfo
from runtimepy.net.arbiter.task import ArbiterTask, TaskFactory
from runtimepy.net.stream.json import JsonMessageConnection


[docs] class SampleTask(ArbiterTask, TrigMixin, AsyncCommandProcessingMixin): """A sample application."""
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" await super().init(app) sample_env(self.env) TrigMixin.__init__(self, self.env) async def test_command( args: Namespace, __: Optional[FieldOrChannel] ) -> None: """Handle a test command.""" self.logger.info("Test command %s.", args) self._setup_async_commands(test_command)
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" # Use this to implement / test rate-limited logging. with self.log_time("dispatch", level=DEBUG): poll_sample_env(self.env) # Trigger callbacks. self.sin_phase_angle.value += math.pi / 180 self.cos_phase_angle.value += -math.pi / 180 # Interact with connections. await asyncio.gather( *( x.loopback() for x in self.app.search( pattern="client", kind=JsonMessageConnection ) ) ) self.dispatch_trig(self.metrics.dispatches.value) return True
[docs] class Sample(TaskFactory[SampleTask]): """A sample-task application factory.""" kind = SampleTask
[docs] class SampleAppTask(ArbiterTask): """A base TUI application.""" app: AppInfo
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" self.app = app
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" for name, struct in self.app.structs.items(): if "struct" in name: struct.poll() # Send poll message to peer process. for peer in self.app.peers.values(): peer.send_poll() return True
[docs] class SampleApp(TaskFactory[SampleAppTask]): """A TUI application factory.""" kind = SampleAppTask