Skip to content

bounded_subprocess

Why this library exists

Python's subprocess module is enough until the child process misbehaves. A buggy program can:

  • spawn grandchildren that outlive the parent's timeout;
  • block on stdin and never read what you wrote;
  • print gigabytes to stdout and exhaust the parent's memory before you ever call .communicate().

bounded_subprocess is a small wrapper around subprocess that adds three hard bounds:

  1. Process-group cleanup. The child runs in its own session, so on timeout we kill the entire process group — including anything it forked.
  2. Bounded output capture. We keep at most max_output_size bytes from each of stdout and stderr — the prefix by default, the suffix when tail=True — and discard the rest.
  3. Wall-clock timeout. A single deadline governs the run, regardless of what the child does on its pipes.

This is not isolation: the child can still touch the filesystem, the network, or escape into a new session of its own. If you need isolation, podman_run runs the same interface inside a container.

The library comes in four flavors, all built on the same primitives:

Function / class When to reach for it
run One-shot synchronous call.
run (async) The same, but await-able, and with an optional memory watchdog.
Interactive A long-lived child you talk to line by line.
podman_run Async execution inside a podman container.

Quickstart

Run a command synchronously

from bounded_subprocess import run

result = run(["echo", "hello"], timeout_seconds=5)
print(result.exit_code)        # 0
print(result.stdout.strip())   # 'hello'

Run a command asynchronously

import asyncio
from bounded_subprocess.bounded_subprocess_async import run

async def main():
    result = await run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
    )
    print(result.exit_code, result.stdout.strip(), result.stderr.strip())

asyncio.run(main())

Talk to a long-running child

from bounded_subprocess.interactive import Interactive

proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
proc.write(b"print(1 + 2)\n", timeout_seconds=1)
print(proc.read_line(timeout_seconds=1))  # b'3'
proc.close(nice_timeout_seconds=1)

Run a command in a container

import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run

async def main():
    result = await podman_run(
        ["cat"],
        image="alpine:latest",
        timeout_seconds=5,
        max_output_size=1024,
        stdin_data="hello\n",
    )
    print(result.stdout)

asyncio.run(main())

Each entry point takes plenty of additional knobs (working directory, environment, stdin, memory limit, container volumes, …); see the reference below.

API reference

Synchronous execution

bounded_subprocess.bounded_subprocess.run

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, cwd: Optional[str] = None) -> Result

Run a subprocess with a wall-clock timeout and bounded output capture.

The child runs in its own session, so on timeout we kill the entire process group, not just the child itself. We read stdout and stderr nonblockingly and keep at most max_output_size bytes from each — the prefix by default, or the suffix when tail=True.

On timeout, Result.timeout is True and Result.exit_code is -1. If you pass stdin_data and we cannot finish writing it within stdin_write_timeout seconds (default 15), we force exit_code to -1 even when the child exits cleanly.

from bounded_subprocess import run

result = run(
    ["bash", "-lc", "echo ok; echo err 1>&2"],
    timeout_seconds=5,
    max_output_size=1024,
)
print(result.exit_code, result.stdout.strip(), result.stderr.strip())

Asynchronous execution

bounded_subprocess.bounded_subprocess_async.run async

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, memory_limit_mb: Optional[int] = None, memory_watchdog_interval_seconds: float = 1.0, cwd: Optional[str] = None) -> Result

Async counterpart of bounded_subprocess.run, with an optional memory limit.

The child runs in its own session, and we await it until it exits or the deadline elapses. We read stdout and stderr nonblockingly and keep at most max_output_size bytes from each (prefix by default; suffix when tail=True). On timeout, Result.timeout is True and Result.exit_code is -1. A failed stdin_data write within stdin_write_timeout also forces exit_code to -1, even when the child exits cleanly.

When you set memory_limit_mb, a watchdog polls aggregate peak RSS (VmHWM from /proc, summed across the process group) every memory_watchdog_interval_seconds and kills the whole group when usage exceeds the limit. We accept this non-cgroup approximation deliberately: it overcounts shared pages and can miss very short-lived children, but it works without elevated privileges on a typical cluster node.

import asyncio
from bounded_subprocess.bounded_subprocess_async import run

async def main():
    result = await run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
        max_output_size=1024,
    )
    print(result.exit_code, result.stdout.strip(), result.stderr.strip())

asyncio.run(main())

bounded_subprocess.bounded_subprocess_async.podman_run async

podman_run(args: List[str], *, image: str, timeout_seconds: int, max_output_size: int, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None, entrypoint: Optional[str] = None) -> Result
Run a command inside a podman container, with the same bounds as `run`.

We launch the container with `--rm -i`, track its id through a
`--cidfile`, and force-remove it when the call returns (whether the
command succeeded, timed out, or errored). `volumes` flow through as
`-v` flags, `env` as `-e` flags, and `cwd` as `-w`.

`entrypoint=""` is meaningful: it clears the image's ENTRYPOINT, so
`args` becomes the full command. Passing `None` (the default) omits
the flag entirely and leaves the image's ENTRYPOINT in place.

```python
import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run

async def main():
    result = await podman_run(
        ["cat"],
        image="alpine:latest",
        timeout_seconds=5,
        max_output_size=1024,
        stdin_data="hello

", volumes=["/host/data:/container/data"], cwd="/container/data", ) print(result.exit_code, result.stdout)

asyncio.run(main())
```

Interactive execution

bounded_subprocess.interactive.Interactive

Interactive(args: List[str], read_buffer_size: int, cwd: Optional[str] = None)

A long-lived subprocess you can write to and read lines from.

The child runs with nonblocking stdin/stdout pipes. write honors a timeout; read_line returns one complete line at a time (without the trailing newline) or None on timeout / EOF.

read_buffer_size caps how many bytes of recent stdout we retain while waiting for a newline. Lines longer than this lose bytes from the front — useful when a child spews structured output without ever emitting \n, but lossy if you actually need to read very long lines.

from bounded_subprocess.interactive import Interactive

proc = Interactive(["python3", "-u", "-c", "print(input())"], read_buffer_size=4096)
proc.write(b"hello\n", timeout_seconds=1)
line = proc.read_line(timeout_seconds=1)   # b'hello'
rc = proc.close(nice_timeout_seconds=1)

Spawn the child process. See the class docstring for parameter semantics.

close

close(nice_timeout_seconds: int) -> int

Close the pipes, wait up to nice_timeout_seconds for a clean exit, then SIGKILL the child if it is still running. Returns the child's exit code, or -9 if we had to kill it.

read_line

read_line(timeout_seconds: int) -> Optional[bytes]

Read the next line of stdout (without the trailing newline), or return None on timeout / EOF.

write

write(stdin_data: bytes, timeout_seconds: int) -> bool

Write stdin_data to the child within the timeout. Returns False if the child already exited or the write failed (e.g. broken pipe).

bounded_subprocess.interactive_async.Interactive

Interactive(args: List[str], read_buffer_size: int, cwd: Optional[str] = None)

Async counterpart of bounded_subprocess.interactive.Interactive.

Same model — a long-lived child with nonblocking line-oriented I/O — but write, read_line, and close are coroutines. read_buffer_size caps retained stdout the same way; lines longer than the buffer lose bytes from the front.

import asyncio
from bounded_subprocess.interactive_async import Interactive

async def main():
    proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
    await proc.write(b"print(1 + 2)\n", timeout_seconds=1)
    print(await proc.read_line(timeout_seconds=1))   # b'3'
    await proc.close(nice_timeout_seconds=1)

asyncio.run(main())

Spawn the child process. See the class docstring for parameter semantics.

close async

close(nice_timeout_seconds: int) -> int

Close the pipes, wait up to nice_timeout_seconds for a clean exit, then SIGKILL the child if it is still running. Returns the child's exit code, or -9 if we had to kill it.

read_line async

read_line(timeout_seconds: int) -> Optional[bytes]

Read the next line of stdout (without the trailing newline), or return None on timeout / EOF.

write async

write(stdin_data: bytes, timeout_seconds: int) -> bool

Write stdin_data to the child within the timeout. Returns False if the child already exited or the write failed.

Result type

bounded_subprocess.util.Result dataclass

Result(timeout, exit_code, stdout, stderr)

The result of a bounded subprocess run.

stdout and stderr each contain at most max_output_size bytes; we decode them with errors="ignore". timeout is True only when the wall-clock deadline elapsed. exit_code holds the child's exit status, or -1 when the run aborted because of a timeout, a failed stdin write, or the memory watchdog.