Source code for runtimepy.subprocess.peer

"""
A module implementing a runtimepy peer interface.
"""

# built-in
import asyncio
from contextlib import (
    AsyncExitStack,
    ExitStack,
    asynccontextmanager,
    contextmanager,
    suppress,
)
import os
from pathlib import Path
from sys import executable
from typing import AsyncIterator, Iterator, Type, TypeVar

# third-party
from vcorelib.io import ARBITER, DEFAULT_INCLUDES_KEY
from vcorelib.io.file_writer import IndentedFileWriter
from vcorelib.io.types import JsonObject
from vcorelib.names import import_str_and_item
from vcorelib.paths.context import tempfile

# internal
from runtimepy.subprocess import spawn_exec, spawn_shell
from runtimepy.subprocess.interface import RuntimepyPeerInterface
from runtimepy.subprocess.protocol import RuntimepySubprocessProtocol

T = TypeVar("T", bound="RuntimepyPeer")


[docs] class RuntimepyPeer(RuntimepyPeerInterface): """A class implementing an interface for messaging peer subprocesses.""" def __init__( self, protocol: RuntimepySubprocessProtocol, name: str, config: JsonObject, markdown: str = None, ) -> None: """Initialize this instance.""" super().__init__(name, config, markdown=markdown) self.protocol = protocol async def _poll(self) -> None: """Poll input queues.""" keep_going = True task = None while keep_going: try: if task is None: task = asyncio.create_task(self.process_command_queue()) elif task.done(): task = None keep_going = await self.service_queues() if keep_going: await asyncio.sleep(self.poll_period_s) except asyncio.CancelledError: keep_going = False @asynccontextmanager async def _context(self: T) -> AsyncIterator[T]: """A managed context for the peer.""" # Register task that will poll queues. task = asyncio.create_task(self._poll()) try: with suppress(AssertionError): if await self.loopback(): await self.wait_json({"meta": self.meta}) await self.share_environment() yield self finally: task.cancel() await task
[docs] @classmethod @asynccontextmanager async def shell( cls: Type[T], name: str, config: JsonObject, cmd: str, markdown: str = None, ) -> AsyncIterator[T]: """Create an instance from a shell command.""" async with spawn_shell( cmd, stdout=asyncio.Queue(), stderr=asyncio.Queue() ) as proto: async with cls( proto, name, config, markdown=markdown )._context() as inst: yield inst
[docs] async def main(self) -> None: """Program entry."""
[docs] @classmethod @asynccontextmanager async def exec( cls: Type[T], name: str, config: JsonObject, *args, markdown: str = None, **kwargs, ) -> AsyncIterator[T]: """Create an instance from comand-line arguments.""" async with spawn_exec( *args, stdout=asyncio.Queue(), stderr=asyncio.Queue(), **kwargs ) as proto: async with cls( proto, name, config, markdown=markdown )._context() as inst: yield inst
@classmethod def _write_script( cls: Type[T], writer: IndentedFileWriter, name: str, config_path: Path, import_str: str, ) -> None: """Write a script file for running the desired peer program.""" writer.write("import sys") writer.write("from vcorelib.asyncio import run_handle_interrupt") writer.write("from vcorelib.io import ARBITER, DEFAULT_INCLUDES_KEY") from_str, to_import = import_str_and_item(import_str) writer.write(f"from {from_str} import {to_import}") fixed_path = str(config_path).replace("\\", "\\\\") writer.write(f'CONFIG_PATH = "{fixed_path}"') writer.write('if __name__ == "__main__":') with writer.indented(): writer.write( f'run_handle_interrupt({to_import}.run("{name}", ' "ARBITER.decode(CONFIG_PATH, " "require_success=True, includes_key=DEFAULT_INCLUDES_KEY" ").data, sys.argv))" ) writer.write("sys.exit(0)") @classmethod def _write_script_harness( cls: Type[T], name: str, path: Path, config_path: Path, import_str: str ) -> None: """Handles creating file-writer to write script.""" # Write file contents. with IndentedFileWriter.from_path( path, per_indent=4, linesep="\n" ) as writer: cls._write_script(writer, name, config_path, import_str) @classmethod @contextmanager def _prepare_jit_script( cls: Type[T], name: str, config: JsonObject, import_str: str ) -> Iterator[Path]: """Prepare a JIT script for this process to invoke.""" with ExitStack() as stack: config_path = stack.enter_context(tempfile(suffix=".json")) # Encode configuration data. assert ARBITER.encode(config_path, config)[0] # Decode to load includes. config = ARBITER.decode( config_path, require_success=True, includes_key=DEFAULT_INCLUDES_KEY, ).data assert ARBITER.encode(config_path, config)[0] path = stack.enter_context(tempfile(suffix=".py")) # Write file contents. cls._write_script_harness(name, path, config_path, import_str) yield path
[docs] @classmethod @asynccontextmanager async def running_program( cls: Type[T], name: str, config: JsonObject, import_str: str, *args, **kwargs, ) -> AsyncIterator[T]: """Run a peer subprocess.""" async with AsyncExitStack() as stack: entry_args = [executable] if ( "COVERAGE_RUN" in os.environ or "PYTEST_CURRENT_TEST" in os.environ ): entry_args.extend(["-m", "coverage", "run"]) # Run program. yield await stack.enter_async_context( cls.exec( name, config, *entry_args, str( stack.enter_context( cls._prepare_jit_script(name, config, import_str) ) ), *args, **kwargs, ) )
[docs] def write(self, data: bytes, addr: tuple[str, int] = None) -> None: """Write bytes via this interface.""" del addr self.protocol.stdin.write(data) self.stdin_metrics.increment(len(data))
[docs] async def service_queues(self) -> bool: """Service data from peer.""" keep_going = False # Forward stderr. if self.protocol.stderr_queue is not None: keep_going = True queue = self.protocol.stderr while not queue.empty(): self.handle_stderr(queue.get_nowait()) # Handle messages from stdout. if self.protocol.stdout_queue is not None: keep_going = True queue = self.protocol.stdout while not queue.empty(): await self.handle_stdout(queue.get_nowait()) return keep_going