Source code for runtimepy.subprocess.protocol

"""
A module implementing a subprocess protocol.
"""

# built-in
from asyncio import (
    Event,
    Queue,
    ReadTransport,
    SubprocessProtocol,
    SubprocessTransport,
    WriteTransport,
)
from subprocess import Popen
from typing import Optional, cast

# third-party
from vcorelib.math import metrics_time_ns


[docs] class RuntimepySubprocessProtocol(SubprocessProtocol): """A simple subprocess protocol implementation.""" start_time: int elapsed_time: int transport: SubprocessTransport subproc: Popen[bytes] stdin: WriteTransport stdout_transport: ReadTransport stderr_transport: ReadTransport stdout_queue: Optional[Queue[bytes]] stderr_queue: Optional[Queue[bytes]] exited: Event @property def stdout(self) -> Queue[bytes]: """Get this instance's standard output queue.""" assert self.stdout_queue is not None return self.stdout_queue @property def stderr(self) -> Queue[bytes]: """Get this instance's standard error queue.""" assert self.stderr_queue is not None return self.stderr_queue @property def pid(self) -> int: """Get this subprocess's protocol identifier.""" return self.transport.get_pid()
[docs] def connection_made(self, transport) -> None: """Initialize this protocol.""" self.start_time = metrics_time_ns() self.elapsed_time = -1 self.stdout_queue = None self.stderr_queue = None self.exited = Event() self.transport = transport self.subproc = self.transport.get_extra_info("subprocess") transport = self.transport.get_pipe_transport(0) self.stdin = cast(WriteTransport, transport) transport = self.transport.get_pipe_transport(1) self.stdout_transport = cast(ReadTransport, transport) transport = self.transport.get_pipe_transport(2) self.stderr_transport = cast(ReadTransport, transport)
[docs] def pipe_data_received(self, fd: int, data: bytes) -> None: """Handle incoming pipe data.""" if fd == 1 and self.stdout_queue is not None: self.stdout_queue.put_nowait(data) elif self.stderr_queue is not None: assert fd == 2, fd self.stderr_queue.put_nowait(data)
[docs] def pipe_connection_lost(self, fd: int, exc) -> None: """Handle a pipe connection closing.""" if fd == 1: self._flush_stdout() elif fd == 2: self._flush_stderr()
def _flush_stdout(self) -> None: """Flush standard output.""" if self.stdout_queue is not None: self.stdout_queue.put_nowait(bytes()) def _flush_stderr(self) -> None: """Flush standard error.""" if self.stderr_queue is not None: self.stderr_queue.put_nowait(bytes())
[docs] def process_exited(self) -> None: """Handle process exit.""" self.elapsed_time = metrics_time_ns() - self.start_time self._flush_stdout() self._flush_stderr() self.exited.set()