Source code for gnomish_army_knife.runtime.task

"""
A module implementing a common runtime task base.
"""

# built-in
import argparse

# third-party
from runtimepy.net.arbiter import AppInfo
from runtimepy.net.arbiter.task import ArbiterTask
from runtimepy.primitives import Uint32

# internal
from gnomish_army_knife.runtime import GakRuntime


[docs] class GakRuntimeTask(ArbiterTask): """A class implementing a runtime environment for package tasks.""" runtime: GakRuntime # Metrics. event_count: Uint32 # No stateful elements are currently allocated during (runtime) init. auto_finalize = True def _init_state(self) -> None: """Add channels to this instance's channel environment.""" self.event_count = Uint32() self.env.channel("event_count", self.event_count) self.ignore_count = Uint32() self.env.channel("ignore_count", self.ignore_count)
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" await super().init(app) self._init_runtime(app)
def _init_runtime(self, app: AppInfo) -> None: """Initialize package runtime.""" # Parse command-line options and create the runtime instance. parser = argparse.ArgumentParser() GakRuntime.cli_args(parser) self.runtime = app.stack.enter_context( GakRuntime.create( parser.parse_args( app.config.get( # type: ignore "gak_cli_args", [], ) ) ) )