Source code for runtimepy.subprocess.program

"""
A module implementing a peer program communication interface.
"""

# built-in
import asyncio
from contextlib import asynccontextmanager, contextmanager, suppress
import logging
import os
import signal
import sys
from typing import AsyncIterator, BinaryIO, Iterator, Optional, Type, TypeVar

# third-party
from vcorelib.io import ARBITER
from vcorelib.io.types import JsonObject
from vcorelib.paths.context import tempfile

# internal
from runtimepy.metrics.channel import ChannelMetrics
from runtimepy.mixins.psutil import PsutilMixin
from runtimepy.net.arbiter import ConnectionArbiter
from runtimepy.net.arbiter.info import RuntimeStruct
from runtimepy.subprocess.interface import RuntimepyPeerInterface

T = TypeVar("T", bound="PeerProgram")
PROGRAM: Optional["PeerProgram"] = None


[docs] class PeerProgram(RuntimepyPeerInterface, PsutilMixin): """A communication interface for peer programs.""" json_output: BinaryIO stream_output: BinaryIO stream_metrics: ChannelMetrics struct_type: Type[RuntimeStruct] = RuntimeStruct got_eof: asyncio.Event _singleton: Optional["PeerProgram"] = None
[docs] @classmethod def singleton(cls: type[T]) -> T: """Get a shared single instance of a protocol for this class.""" assert cls._singleton is not None return cls._singleton # type: ignore
[docs] @contextmanager def streaming_events(self) -> Iterator[None]: """Stream events to the stream output.""" with self.struct.env.registered( self.stream_output, flush=True, channel=self.stream_metrics ): yield
[docs] def write(self, data: bytes, addr: tuple[str, int] = None) -> None: """Write data.""" del addr self.json_output.write(data) self.json_output.flush() self.stdout_metrics.increment(len(data))
[docs] async def heartbeat_task(self) -> None: """Send a message heartbeat back and forth.""" loop = asyncio.get_running_loop() prev_poll_time = loop.time() with suppress(asyncio.CancelledError): while not self.got_eof.is_set(): # Perform heartbeat. if self._peer_env_event.is_set(): self.stderr_metrics.update(self.stream_metrics) # Poll metrics. curr = loop.time() self.poll_psutil(curr - prev_poll_time) prev_poll_time = curr with suppress(AssertionError): await self.process_command_queue() await self.wait_json() await asyncio.sleep(self.poll_period_s * 10)
[docs] async def io_task(self, buffer: BinaryIO) -> None: """Run this peer program's main loop.""" self.got_eof.clear() # Allow polling stdin. if hasattr(os, "set_blocking"): getattr(os, "set_blocking")(buffer.fileno(), False) accumulator = 0 with suppress(asyncio.CancelledError): while not self.got_eof.is_set(): data: bytes = buffer.read(1) if data is None: await asyncio.sleep(self.poll_period_s) continue if not data: break accumulator += 1 # Process incoming messages. saw_msg = False for msg in self.processor.messages(data): await self.process_json(msg) saw_msg = True if saw_msg: self.stdin_metrics.increment(accumulator) accumulator = 0 # Signal the end of input processing. self.got_eof.set()
[docs] @classmethod def run_standard( cls: Type[T], name: str, config: JsonObject ) -> tuple[asyncio.Task[None], T]: """Run this program using standard input and output.""" peer = cls(name, config) peer.json_output = sys.stdout.buffer peer.stream_output = sys.stderr.buffer peer.got_eof = asyncio.Event() peer.stream_metrics = ChannelMetrics() global PROGRAM # pylint: disable=global-statement PROGRAM = peer assert cls._singleton is None cls._singleton = peer return asyncio.create_task(peer.io_task(sys.stdin.buffer)), peer
[docs] async def main(self, argv: list[str]) -> None: """Program entry.""" del argv with tempfile(suffix=".json") as config_path: arbiter = ConnectionArbiter(stop_sig=self.got_eof) ARBITER.encode(config_path, self.struct.config) await arbiter.load_configs([config_path]) await arbiter.app()
[docs] async def cleanup(self) -> None: """Runs when program 'running' context exits."""
[docs] def pre_environment_exchange(self) -> None: """Perform early initialization tasks."""
[docs] def struct_pre_finalize(self) -> None: """Configure struct before finalization.""" self.init_psutil(self.struct.env)
[docs] @classmethod @asynccontextmanager async def running( cls: Type[T], name: str, config: JsonObject, argv: list[str] ) -> AsyncIterator[tuple[asyncio.Task[None], asyncio.Task[None], T]]: """ Provide an interface for managed-context cleanup of the peer process. """ io_task, peer = cls.run_standard(name, config) # Set up logging. logger = logging.getLogger() logger.addHandler(peer.list_handler) peer.logger.info("Logging initialized.") # Wait for environment exchange. peer.pre_environment_exchange() await peer._peer_env_event.wait() peer.logger.info("Environments exchanged.") # Start main loop. main_task = asyncio.create_task(peer.main(argv)) peer.logger.info("Main started.") await peer.wait_json() heartbeat = asyncio.create_task(peer.heartbeat_task()) try: with peer.streaming_events(): yield io_task, main_task, peer finally: logger.removeHandler(peer.list_handler) for cancel in (io_task, main_task, heartbeat): cancel.cancel() await cancel await peer.cleanup()
[docs] @classmethod async def run( cls: Type[T], name: str, config: JsonObject, argv: list[str] ) -> None: """Run the program.""" # Don't respond to keyboard interrupt. signal.signal(signal.SIGINT, signal.SIG_IGN) async with cls.running(name, config, argv) as (_, main_task, peer): with peer.log_time("Main task context", reminder=True): await main_task
[docs] async def poll_handler(self) -> None: """Handle a 'poll' message.""" self.struct.poll() self.poll_metrics()