Source code for projspec.artifact.process

import io
import logging
import os
from queue import Queue, Empty
import re
import subprocess
import sys
from threading import Thread
import time
import weakref

from projspec.artifact import BaseArtifact
from projspec.config import get_conf
from projspec.utils import run_subprocess


logger = logging.getLogger("projspec")

ON_POSIX = "posix" in sys.builtin_module_names


def _enqueue(out: io.IOBase, queue: Queue[bytes]):
    """Reads subprocess output in a separate thread to prevent deadlocks"""
    while True:
        if line := out.readline():
            logger.debug(line.decode("utf-8").rstrip())
            queue.put(line)
        else:
            break


[docs] class Process(BaseArtifact): """A simple process where we know nothing about what it does, only if it's running. Can include batch jobs and long-running services. While running the process, the output can be "enqueued", meaning that the .queue attribute will be populated with lines of output (from stdout and stderr, by default). This can be controlled by passing `enqueue=` to the .make() method; or disabled by setting the config value `capture_artifact_output` to False. """ icon = "⌨️" term: bool = False environ: dict[str, str] = {} queue: Queue[bytes] | None = None # lines of binary output by the subprocess def _make(self, enqueue=True, **kwargs): enq = enqueue and get_conf("capture_artifact_output") if self.environ and "environ" not in kwargs: env = os.environ.copy() env.update(self.environ) kwargs["env"] = env if self.proc is None: self.queue = Queue() logger.info(f"Running {self.cmd}") if enq: kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT kwargs["close_fds"] = ON_POSIX else: kwargs["output"] = False proc = run_subprocess( self.cmd, cwd=self.proj.url, popen=True, **kwargs, ) if enq: t = Thread(target=_enqueue, args=(proc.stdout, self.queue)) t.daemon = True # thread dies with the program t.start() if self.term: weakref.finalize(self, proc.terminate) # weakref.finalize(self, t.join) self.proc = proc def _is_done(self) -> bool: return self.proc is not None and self.proc.poll() is None def clean( self, ): if self.proc is not None: self.proc.terminate() self.proc.wait() self.proc = None self.queue = None
[docs] class Server(Process): """A process that is designed to stay running and serve requests, usually over HTTP When calling make(), some instances will accept port= and address= arguments to specify listening, but only if the instance was initially configured with port_arg= and address_arg=. After creating the process, is scan is True, the actual listening address and port will attempt to be inferred. """ icon = "🖧" _port: int = 0 _address: str = "0.0.0.0" _url_pattern: str = re.compile(r".*http[s]?://([^:]+):(\d+)") scan: bool = True port_arg: str | None = None address_arg: str | None = None in_env: bool = False def _make(self, port: int | None = None, address: str | None = None, **kwargs): cmd = self.cmd[:] if port is not None and self.port_arg is not None: self._port = port if self.in_env: self.environ[self.port_arg] = str(port) else: self.cmd.extend([self.port_arg, str(port)]) if address is not None and self.address_arg is not None: self._address = address if self.in_env: self.environ[self.address_arg] = address else: self.cmd.extend([self.address_arg, address]) super()._make() self.cmd = cmd if self.scan and (port is None or address is None): t0 = time.time() while True: if time.time() - t0 > 2: break try: line = self.queue.get_nowait().decode("utf-8") except Empty: time.sleep(0.02) continue if not line: break if match := self._url_pattern.match(line): self._address = address or match.group(1) self._port = int(match.group(2)) if port is None else port break