Source code for runtimepy.subprocess.interface

"""
A module implementing a runtimepy peer interface.
"""

# built-in
from argparse import Namespace
import asyncio
from io import BytesIO
from json import dumps
import logging
from logging import INFO, getLogger
from typing import Optional

# third-party
from vcorelib.io.markdown import MarkdownMixin
from vcorelib.io.types import JsonObject
from vcorelib.math import RateLimiter

# internal
from runtimepy import METRICS_NAME, PKG_NAME
from runtimepy.channel.environment import ChannelEnvironment
from runtimepy.channel.environment.base import FieldOrChannel
from runtimepy.channel.environment.command import register_env
from runtimepy.channel.environment.command.processor import (
    RemoteCommandProcessor,
)
from runtimepy.message import JsonMessage, MessageProcessor
from runtimepy.message.interface import JsonMessageInterface
from runtimepy.metrics.channel import ChannelMetrics
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.net.arbiter.info import RuntimeStruct, SampleStruct

HOST_SUFFIX = ".host"
PEER_SUFFIX = ".peer"


[docs] class RuntimepyPeerInterface( JsonMessageInterface, AsyncCommandProcessingMixin, MarkdownMixin ): """A class implementing an interface for messaging peer subprocesses.""" poll_period_s: float = 0.01 struct_type: type[RuntimeStruct] = SampleStruct # Unclear why this is/was necessary (mypy bug?) markdown: str def __init__( self, name: str, config: JsonObject, markdown: str = None ) -> None: """Initialize this instance.""" self.set_markdown(markdown=markdown, package=PKG_NAME) self.processor = MessageProcessor() self.basename = name self.struct = self.struct_type( self.basename + HOST_SUFFIX, config, markdown=config.get("config", {}).get("markdown"), # type: ignore ) self.peer: Optional[RemoteCommandProcessor] = None self.peer_config: Optional[JsonMessage] = None self.peer_config_event = asyncio.Event() self._peer_env_event = asyncio.Event() # Set these for JsonMessageInterface. AsyncCommandProcessingMixin.__init__(self, logger=self.struct.logger) self.log_limiter = RateLimiter.from_s(1.0) self.command = self.struct.command self._setup_async_commands() JsonMessageInterface.__init__(self) self.struct.init_env() self.struct_pre_finalize() self._finalize_struct()
[docs] def struct_pre_finalize(self) -> None: """Configure struct before finalization."""
[docs] def handle_log_message(self, message: JsonMessage) -> None: """Handle a log message.""" logger = self.logger msg = "remote: " + message["msg"] if self.peer is not None: logger = self.peer.logger msg = message["msg"] logger.log(message.get("level", INFO), msg, *message.get("args", []))
@property def peer_name(self) -> str: """Get the name of the peer's environment.""" return self.basename + PEER_SUFFIX def _set_peer_env(self, data: JsonMessage) -> bool: """Set the peer's environment.""" result = False if self.peer is None: self.peer = RemoteCommandProcessor( ChannelEnvironment.load_json(data), getLogger(self.peer_name) ) def send_cmd_hook( args: Namespace, channel: Optional[FieldOrChannel] ) -> None: """Command hook for sending JSON commands.""" self._handle_command(args, channel) self.peer.hooks.append(send_cmd_hook) self.peer.logger.info("Loaded.") # Register both environments. register_env(self.struct.name, self.struct.command) register_env(self.peer_name, self.peer) self._peer_env_event.set() result = True return result
[docs] async def handle_command( self, args: Namespace, channel: Optional[FieldOrChannel] ) -> None: """Handle a command.""" if args.remote: cli_args = [args.command] if args.force: cli_args.append("-f") cli_args.append(args.channel) self.logger.info( "Remote command: %s", str( await self.channel_command( " ".join(cli_args + args.extra), environment=args.env ) ), )
def _register_handlers(self) -> None: """Register connection-specific command handlers.""" super()._register_handlers() async def env_handler(outbox: JsonMessage, inbox: JsonMessage) -> None: """A simple channel environment sharing handler.""" # Exchange environments. if self._set_peer_env(inbox): outbox.update(self.struct.env.export_json()) self.basic_handler("env", env_handler) async def config_handler( outbox: JsonMessage, inbox: JsonMessage ) -> None: """Store peer's configuration.""" if self.peer_config is None: self.peer_config = inbox["config"] outbox["config"] = self.struct.config self.logger.info( "Peer's configuration: '%s'.", dumps(self.peer_config, indent=4), ) self.peer_config_event.set() self.basic_handler("config", config_handler)
[docs] async def share_config(self, data: JsonMessage) -> None: """Exchange configuration data.""" await self.wait_json({"config": data})
[docs] async def share_environment(self) -> None: """Exchange channel environments.""" result = await self.wait_json({"env": self.struct.env.export_json()}) self._set_peer_env(result["env"])
def _finalize_struct(self) -> None: """Initialize struct members.""" with self.struct.env.names_pushed(METRICS_NAME): # stderr channel metrics. self.stderr_metrics = ChannelMetrics() self.struct.register_channel_metrics( "stderr", self.stderr_metrics, "transmitted" ) # stdout channel metrics. self.stdout_metrics = ChannelMetrics() self.struct.register_channel_metrics( "stdout", self.stdout_metrics, "transmitted" ) # stdin channel metrics. self.stdin_metrics = ChannelMetrics() self.struct.register_channel_metrics( "stdin", self.stdin_metrics, "transmitted" ) self.struct.env.finalize()
[docs] def poll_metrics(self, time_ns: int = None) -> None: """Poll channels.""" self.stderr_metrics.poll(time_ns=time_ns) self.stdout_metrics.poll(time_ns=time_ns) self.stdin_metrics.poll(time_ns=time_ns)
[docs] def handle_stderr(self, data: bytes) -> None: """Forward stderr.""" if data: count = len(data) self.stderr_metrics.increment(count) # Parse channel events. if self.peer is not None: with BytesIO(data) as stream: for event in self.peer.env.parse_event_stream(stream): self.peer.env.ingest(event) else: self.governed_log( self.log_limiter, "Dropped %d bytes of telemetry.", count, level=logging.WARNING, )
[docs] async def handle_stdout(self, data: bytes) -> None: """Handle messages from stdout.""" if data: self.stdout_metrics.increment(len(data)) for msg in self.processor.messages(data): await self.process_json(msg)
[docs] async def poll_handler(self) -> None: """Handle a 'poll' message.""" self.struct.poll()