Metadata-Version: 2.4
Name: jobserver
Version: 3.0
Summary: A nestable Jobserver with thread-safe futures, callbacks, and type-hinting
Author-email: Rhys Ulerich <rhys.ulerich@gmail.com>
License-Expression: MPL-2.0
Project-URL: Homepage, https://github.com/RhysU/jobserver
Project-URL: Repository, https://github.com/RhysU/jobserver
Project-URL: Issues, https://github.com/RhysU/jobserver/issues
Project-URL: Changelog, https://github.com/RhysU/jobserver/blob/master/CHANGELOG.md
Keywords: concurrency,executor,futures,jobserver,multiprocessing,parallel,process-pool,worker-pool
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: license-file

<!-- Generated by draft/render_readme_pypi.py from README.md; do not edit. -->

Jobserver
=========

A nestable Jobserver with thread-safe futures, callbacks, and type-hinting

Purpose
-------

Jobserver is similar in spirit to
`multiprocessing.Pool` or `concurrent.futures.Executor` with a few differences:

 * First, the implementation choices are based upon the [GNU Make
   Jobserver](https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html).
 * Second, as a result, the Jobserver is "nestable" meaning that resource
   constraints will be shared with work submitted by other work.
 * Third, no background threads are spun up to handle any backing
   queues consequently permitting the implementation to play well with
   more 3rd party libraries.
 * Fourth, Futures are eagerly scanned to quickly reclaim resources.
 * Fifth, Futures can detect when a result pipe closes without a result.
 * Sixth, the user can specify additional work acceptance criteria.
   For example, not launching work unless some amount of RAM is available.
 * Lastly, the API communicates when Exceptions occur within a callback.

In particular, `Jobserver` does not inherit from `concurrent.futures.Executor`
because that `Executor` API fundamentally requires a background thread for
asynchronously issuing `concurrent.futures.Future` callbacks.  `Jobserver`,
eschewing threads, consequently is both somehow less-than and more-than a
standard `Executor`.

In contrast, JobserverExecutor combines
a `Jobserver` with a background thread to provide full
`concurrent.futures.Executor` compatibility.  `JobserverExecutor` is a drop-in
replacement for `concurrent.futures.ProcessPoolExecutor` that aims to provide
more robustness at the expense of slower process launching.

Dependencies
------------

None aside from the Python Standard Library.

Comparison with the Python Standard Library
-------------------------------------------

How `Jobserver` and `JobserverExecutor` compare to the standard library's
[`Pool`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool)
and
[`ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor):

| Feature                           | `Jobserver` | `JobserverExecutor` | `ProcessPoolExecutor` | `Pool` |
|-----------------------------------|:-----------:|:-------------------:|:---------------------:|:------:|
| Nested work shares the slots      |     yes     |         yes         |          no           |   no   |
| Background thread                 |     no      |         yes         |          yes          |  yes   |
| Cancel pending work               |     no      |         yes         |          yes          |   no   |
| Cancel running work               |     yes     |         no          |          no           |   no   |
| Detects individual lost results   |     yes     |         yes         |       partial\*       |   no   |
| User-defined launch criteria      |     yes     |         yes         |          no           |   no   |
| Lambdas/closures via `fork`       |     yes     |         no          |          no           |   no   |
| Lambdas/closures via `spawn`      |     no      |         no          |          no           |   no   |
| Lambdas/closures via `forkserver` |     no      |         no          |          no           |   no   |

\* Each submission owns its own result pipe, so a pipe that closes without a
result is reported as `LostResult` against exactly that one `Future`.
`ProcessPoolExecutor`, by contrast, cannot tell which submission lost its
worker, so it fails all outstanding futures at once.

Testing
-------

Tested with CPython 3.9, 3.10, 3.11, 3.12, 3.13, and 3.14 with ci script.<br>
Implementation passes both PEP 8 (per `ruff`) and type-hinting (per `mypy`).

License
-------

Copyright (C) 2019-2026 Rhys Ulerich

This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.

Examples
--------

 * ex01_basic.py - Submitting jobs and collecting results
   via shorthand, keyword args, and `submit()`.
 * ex02_lifecycle.py - Polling with `done()`,
   `wait()`, and `result()`, plus `reclaim_resources()` and cleanup.
 * ex03_nested.py - Nesting submissions so child work
   shares slot constraints with its parent.
 * ex04_cancel.py - Cancelling running work by sending
   `SIGTERM` to a worker via `Future.wait(signal=...)`.
 * ex05_death.py - Detecting a submission whose result
   pipe closes without a result (e.g. a killed worker) via `LostResult`.
 * ex06_sleep.py - Gating work acceptance on an
   external condition using `replace_sleep()`.
 * ex07_callbacks.py - Registering `when_done`
   callbacks and draining errors via `CallbackRaised`.
 * ex08_environment.py - Setting and unsetting
   environment variables in child processes via `revise_env()`.
 * ex09_preexec.py - Using `replace_preexec()`
   with a plain callable or context manager factory for entry/exit semantics.
 * ex10_timeouts.py - Using non-blocking polling,
   finite deadlines, and `Blocked` from `result()` and `submit()`.
 * ex11_pdeathsig.py - On Linux, using
   `replace_preexec()` to call `prctl(PR_SET_PDEATHSIG)` so a child dies when
   its parent does.
 * ex12_executor.py - Using `JobserverExecutor` as
   a context manager supporting `map()` and `c.f.Future` cancellation.

### Example ex01_basic.py

```python
"""Example 1 shows submitting jobs and collecting results."""

from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver


def main() -> None:
    """Shows submitting jobs and collecting results."""
    # Instance will use the default multiprocessing context for this platform
    with Jobserver(slots=2) as jobserver:
        # Calling Jobserver.submit(...) submits work and returns a Future
        future_a = jobserver.submit(fn=pow, args=(2, 10), kwargs={"mod": 1000})

        # Simpler shorthand via Jobserver.__call__(...) with positional args
        future_b = jobserver(len, (1, 2, 3))

        # Shorthand also permits kwargs or mixed args/kwargs (not shown)
        future_c = jobserver(str, object=42)

        # Results retrieved in arbitrary order
        info("str(object=42) = %s", future_c.result())
        info("len((1, 2, 3)) = %s", future_b.result())
        info("pow(2, 10, mod=1000) = %s", future_a.result())

        # Map over multiple inputs yielding results in order
        # (Argument names argses and kwargses are plurals for args and kwargs)
        lengths = list(jobserver.map(fn=len, argses=[("ab",), ("cde",)]))
        info("lengths via map: %s", lengths)

        # A worker raising an ordinary Exception has it re-raised by result().
        future_err = jobserver.submit(fn=int, args=("not a number",))
        try:
            future_err.result()
            raise RuntimeError("Expected ValueError was not raised")
        except ValueError as e:
            info("Worker raised and result() re-raised: %r", e)
            # __cause__ shows the worker's traceback for the original failure.
            info("Child traceback preserved via __cause__: %s", e.__cause__)


# The spawn and forkserver start methods re-import this module in every
# child, so the if __name__ == "__main__" guard below is required.
if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex02_lifecycle.py

```python
"""Example 2 shows the Future lifecycle, polling, and resource cleanup."""

from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver


def main() -> None:
    """Shows the Future lifecycle, polling, and resource cleanup."""
    # A "with" block is optional but convenient: on exit it reclaims any
    # outstanding Futures, avoiding a ResourceWarning at finalization.
    with Jobserver(slots=2) as jobserver:
        # jobserver(fn, *args, **kwargs) is shorthand for jobserver.submit(...)
        future1 = jobserver(len, "lifecycle")

        # done(): a cheap progress check that never blocks the caller.
        info("done() poll: %s", future1.done())

        # wait(): block for completion when the value itself is not needed.
        info("wait() blocks then returns: %s", future1.wait())

        # result(): block for and return the value, re-raising on failure.
        info("result(): %r", future1.result())

        # reclaim_resources() frees resources and issues callbacks for finished
        # Futures you never waited on.  Like invoking the garbage collector,
        # the method is generally unnecessary but occasionally useful.
        reclaimed: list = []
        future2 = jobserver(len, "second")
        future2.when_done(reclaimed.append, "callback fired")

        # Exiting the "with" block would reclaim this implicitly.
        while not reclaimed:
            jobserver.reclaim_resources()

        info("reclaim_resources() handled future2: %s", reclaimed)


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex03_nested.py

```python
"""Example 3 shows nested submissions sharing slot constraints."""

from logging import INFO, basicConfig, captureWarnings, info
from multiprocessing import get_all_start_methods

from jobserver import Blocked, Jobserver


def main() -> None:
    """Shows nested submissions for all available multiprocessing contexts."""
    for method in get_all_start_methods():
        example(method)


def example(context: str) -> None:
    """Shows nested submissions sharing slot constraints."""
    # Using slots=2 here; slots=None would use os.sched_getaffinity(0)
    # to match the number of usable CPUs for the current process.
    # Jobserver may be used as a context manager but isn't required.
    jobserver_a = Jobserver(context=context, slots=2)

    # slots=2: parent(1) + child(1) + grandchild(Blocked) -> depth 1
    future_a = jobserver_a.submit(
        fn=task_recurse, args=(jobserver_a, 10), timeout=5
    )
    depth_a = future_a.result()
    info("context=%s: Reached depth %d with 2 slots", context, depth_a)
    assert depth_a == 1, depth_a

    # slots=4: depth 3 (N slots -> depth N-1)
    # Jobserver may be used as a context manager but isn't required.
    with Jobserver(context=context, slots=4) as jobserver_b:
        future_b = jobserver_b.submit(
            fn=task_recurse, args=(jobserver_b, 10), timeout=5
        )
        depth_b = future_b.result()
        info("context=%s: Reached depth %d with 4 slots", context, depth_b)
        assert depth_b == 3, depth_b


def task_recurse(jobserver: Jobserver, max_depth: int) -> int:
    """Submit nested work until either Blocked or max_depth reached."""
    if max_depth < 1:
        return 0
    try:
        future = jobserver.submit(
            fn=task_recurse, args=(jobserver, max_depth - 1), timeout=0
        )
    except Blocked:
        return 0
    return 1 + future.result(timeout=None)


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex04_cancel.py

```python
"""Example 4 shows cancelling running work by signalling the worker."""

import signal
import time
from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver, LostResult


def main() -> None:
    """Shows cancelling running work via SIGTERM through Future.wait()."""
    with Jobserver(slots=2) as jobserver:
        # Submit long-running work that would otherwise never finish here.
        future_cancel = jobserver.submit(fn=time.sleep, args=(3600,))

        # Submit normal work alongside the doomed submission.
        future_ok = jobserver.submit(fn=len, args=("hello",))

        # wait(signal=...) sends a signal to the running worker, then waits.
        # wait() returns True once completion (here, death) is confirmed.
        # Any signal can be sent (SIGTERM, SIGKILL, SIGUSR1, etc.)
        # and the wait timeout can be as short or as long as desired.
        info(
            "Cancelled worker wait: %s",
            future_cancel.wait(signal=signal.SIGTERM),
        )
        info("Normal result: %s", future_ok.result())

        # Here, result() raises LostResult for the SIGTERM-ed worker
        try:
            future_cancel.result()
            raise RuntimeError("Expected LostResult was not raised")
        except LostResult:
            info("Caught expected LostResult from cancelled worker")


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex05_death.py

```python
"""Example 5 shows detecting a submission that ends without a result."""

import signal
from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver, LostResult


def main() -> None:
    """Shows detecting a submission that ends without a result."""
    with Jobserver(context="spawn", slots=2) as jobserver:
        # Submit work that SIGKILLs itself, closing the result pipe
        # without sending a result.
        future_killed = jobserver(signal.raise_signal, signal.SIGKILL)

        # Submit normal work alongside the doomed submission
        future_ok = jobserver.submit(fn=len, args=("hello",))

        # wait() returns True once settled, even when the submission died
        info("Killed worker wait: %s", future_killed.wait())
        info("Normal result: %s", future_ok.result())

        # result() raises LostResult for the killed worker
        try:
            future_killed.result()
            raise RuntimeError("Expected LostResult was not raised")
        except LostResult:
            info("Caught expected LostResult from SIGKILL worker")


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex06_sleep.py

```python
"""
Example 6 shows gating work acceptance on external conditions.

Availability of RAM is a more common use case but ill-suited for an example.
"""

import os
import tempfile
from logging import INFO, basicConfig, captureWarnings, info
from typing import Optional

from jobserver import Blocked, Jobserver


def main() -> None:
    """Shows gating work acceptance on external conditions."""
    with Jobserver(context="spawn", slots=2) as jobserver:
        with tempfile.NamedTemporaryFile() as tmp:
            gate_path = tmp.name
            info("Gate file: %s", gate_path)

            # sleep_gate returns None (proceed) when gate exists, 0.1 otherwise
            def sleep_gate() -> Optional[float]:
                if os.path.exists(gate_path):
                    return None
                return 0.1

            # Submission proceeds because the gate file exists
            gated = jobserver.replace_sleep(sleep_gate)
            future = gated.submit(fn=sorted, args=([3, 1, 2],))
            info("With gate file: %s", future.result())

        # Gate file is now removed; sleep_gate keeps returning 0.1 to timeout
        try:
            gated.submit(
                fn=sorted,
                args=([3, 1, 2],),
                timeout=0.35,
            )
            raise RuntimeError("Expected Blocked was not raised")
        except Blocked:
            info("Caught expected Blocked: gate file absent")


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex07_callbacks.py

```python
"""Example 7 shows callbacks, exception handling, and CallbackRaised."""

from logging import INFO, basicConfig, captureWarnings, info

from jobserver import CallbackRaised, Jobserver


def main() -> None:
    """Shows callbacks, exception handling, and CallbackRaised."""
    with Jobserver(context="spawn", slots=2) as jobserver:
        future1 = jobserver.submit(fn=len, args=("hello",))

        # Register callbacks to fire after observing the future
        # completes.  Callbacks receive exactly and only the
        # arguments given to when_done(...).
        accumulator: list = []
        future1.when_done(accumulator.append, "first")
        future1.when_done(accumulator.append, "second")
        info("Result: %s", future1.result())
        info("Callbacks fired: %s", accumulator)

        # Unlike concurrent.futures.Future.add_done_callback(...), you must
        # pass the future as an argument if wanting the callback to receive it.
        future1.when_done(type, future1)

        # Registering a callback after completion causes it to immediately fire
        future1.when_done(accumulator.append, "after-completion")
        info("Callbacks after completion: %s", accumulator)

        # when_done(...) returns a Future-specific seqno per registration.
        future2 = jobserver.submit(fn=len, args=("world",))
        assert 0 == future2.when_done(raise_exception, klass=ValueError)
        assert 1 == future2.when_done(raise_exception, klass=TypeError)
        assert 2 == future2.when_done(accumulator.append, "survivor")

        # Each wait() fires pending callbacks; raising ones surface as
        # CallbackRaised wrapping the original exception, others run silently.
        # CallbackRaised.seqno reports which registration raised.  Loop until
        # no error is raised to drain all callbacks.
        for i in range(3):
            try:
                future2.wait()
                info("wait() call %d: no more errors", i)
                break
            except CallbackRaised as e:
                info(
                    "wait() call %d: caught %s from registration seqno=%d",
                    i,
                    type(e.__cause__).__name__,
                    e.seqno,
                )

        # The result is still available after all callbacks drain
        info("Result after errors: %s", future2.result())

        # Callbacks may register additional callbacks or be provided any future
        future1.when_done(future1.when_done, tuple)
        future1.when_done(future2.when_done, list)


def raise_exception(klass: type) -> None:
    """Raise the requested exception."""
    raise klass()


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex08_environment.py

```python
"""Example 8 shows environment variable injection for child processes."""

import os
from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver


def main() -> None:
    """Shows environment variable injection for child processes."""
    with Jobserver(context="spawn", slots=2) as jobserver:
        # Set an environment variable in the child process
        future_set = jobserver.revise_env({"DEMO_KEY": "hello"}).submit(
            fn=task_getenv_missing,
            args=("DEMO_KEY",),
        )
        info("env set: %s", future_set.result())

        # Unset an environment variable by passing None
        future_unset = jobserver.revise_env({"DEMO_KEY": None}).submit(
            fn=task_getenv_missing,
            args=("DEMO_KEY",),
        )
        info("env unset: %s", future_unset.result())


def task_getenv_missing(key: str) -> str:
    """Return os.getenv(key, 'MISSING')."""
    return os.getenv(key, "MISSING")


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex09_preexec.py

```python
"""Example 9 shows replace_preexec with a callable and a context manager.

The replace_preexec callable runs fn(*args, **kwargs) in the worker before the
task function.  A non-None return is assumed to be a context manager wrapping
the task function's execution.
"""

import logging
from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Jobserver


def main() -> None:
    """Shows replace_preexec with a callable and a context manager factory."""
    # replace_preexec(...) configures logging in all workers
    with Jobserver(context="spawn", slots=2).replace_preexec(
        logging.basicConfig, level=logging.DEBUG
    ) as jobserver:
        future_plain = jobserver.submit(fn=task_logger_level)
        info("plain preexec: level=%s", future_plain.result())

        # Context manager factory: basicConfig on enter, shutdown on exit
        future_cm = jobserver.replace_preexec(LoggingContext).submit(
            fn=task_logger_level,
        )
        info("context manager preexec: level=%s", future_cm.result())


def task_logger_level() -> int:
    """Return the root logger's effective level."""
    return logging.getLogger().getEffectiveLevel()


class LoggingContext:
    """Context manager: basicConfig on enter, shutdown on exit."""

    def __enter__(self):
        logging.basicConfig(level=logging.WARNING)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.shutdown()
        return False


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex10_timeouts.py

```python
"""Example 10 shows timeout handling for submit(), done(), wait(), result()."""

import time
from logging import INFO, basicConfig, captureWarnings, info

from jobserver import Blocked, Jobserver


def main() -> None:
    """Shows the timeout argument on submit(), done(), wait(), result()."""
    # Timeouts are in seconds: None blocks (default for wait()/result()), 0
    # polls, and a positive value waits at most that long.  On expiry submit()
    # and result() raise Blocked while done() and wait() return False.
    with Jobserver(context="spawn", slots=1) as jobserver:
        # Submit a slow task that occupies the only slot
        future = jobserver.submit(fn=task_slow, args=(0.5,))

        # timeout=0 polls without blocking; done() is exactly wait(timeout=0)
        info("done() before: %s", future.done())

        # done(timeout=0.1) waits up to 0.1s, returning False if not ready
        info("done(timeout=0.1): %s", future.done(timeout=0.1))

        # A positive timeout waits at most that long.  wait() then returns
        # False rather than raising when the result is not yet ready.
        info("wait(timeout=0.1) not ready: %s", future.wait(timeout=0.1))

        # result() with the same finite timeout instead raises Blocked
        try:
            future.result(timeout=0.1)
            raise RuntimeError("Expected Blocked was not raised")
        except Blocked:
            info("Caught expected Blocked from result(timeout=0.1)")

        # submit(timeout=0) is non-blocking and raises Blocked at once when no
        # slot is available; timeout=None would instead block until one frees
        try:
            jobserver.submit(fn=len, args=("rejected",), timeout=0)
            raise RuntimeError("Expected Blocked was not raised")
        except Blocked:
            info("Caught expected Blocked: no slots for new work")

        # timeout=None (the default) blocks indefinitely until the result is in
        info("wait() after: %s", future.wait())
        # done() on a completed future also returns True (non-blocking)
        info("done() after: %s", future.done())
        info("Slow task result: %s", future.result())


def task_slow(seconds: float) -> str:
    """Sleep then return a confirmation."""
    time.sleep(seconds)
    return f"done after {seconds:.2f}s"


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    main()
```

### Example ex11_pdeathsig.py

```python
"""
Example 11 shows replace_preexec setting PR_SET_PDEATHSIG via prctl.

This example will not work on all operating systems.
"""

import ctypes
import ctypes.util
import os
import signal
import sys
from logging import INFO, basicConfig, captureWarnings, info, warning

from jobserver import Jobserver


def main() -> None:
    """Shows replace_preexec setting PR_SET_PDEATHSIG via prctl."""
    with Jobserver(context="spawn", slots=2) as jobserver:
        # preexec runs before the task function, here establishing
        # PR_SET_PDEATHSIG so the child receives SIGTERM if the parent dies
        future = jobserver.replace_preexec(set_pdeathsig).submit(
            fn=task_check_pdeathsig,
        )
        info("pdeathsig active: %s", future.result())


def task_check_pdeathsig() -> bool:
    """Return True if PR_SET_PDEATHSIG has been set to a nonzero signal."""
    PR_GET_PDEATHSIG = 2
    libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
    sig = ctypes.c_int(0)
    result = libc.prctl(PR_GET_PDEATHSIG, ctypes.byref(sig))
    if result != 0:
        return False
    return sig.value != 0


def set_pdeathsig() -> None:
    """Set PR_SET_PDEATHSIG so child receives SIGTERM when parent dies."""
    PR_SET_PDEATHSIG = 1
    libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
    result = libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
    if result != 0:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno))


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    if sys.platform == "linux":
        main()
    else:
        warning("PR_SET_PDEATHSIG unavailable on %s; skipping", sys.platform)
```

### Example ex12_executor.py

```python
"""Example 12 shows JobserverExecutor with an owned or shared Jobserver.

Prefer JobserverExecutor for drop-in concurrent.futures compatibility and
cancellable PENDING futures; use Jobserver directly for thread-free nesting.
"""

import os
import time
from concurrent.futures import CancelledError
from logging import DEBUG, INFO, basicConfig, captureWarnings, getLogger, info

from jobserver import Jobserver, JobserverExecutor


def process_start() -> None:
    """Log message from processes at startup."""
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    captureWarnings(True)
    info("started (pid=%d)", os.getpid())


def main() -> None:
    """Shows JobserverExecutor: context manager, map, submit, and cancel."""
    # Pattern A: executor owns its Jobserver -- no explicit Jobserver needed.
    with JobserverExecutor() as executor:
        lengths = list(executor.map(len, ["a", "bb", "ccc", "dddd", "eeeee"]))
        info("lengths via map (owned jobserver): %s", lengths)

    # Pattern B: caller owns the Jobserver and passes it to the executor.
    js = Jobserver(context="spawn", slots=1).replace_preexec(process_start)
    with js, JobserverExecutor(js) as executor:
        # map() applies fn to every item, yielding results in order
        lengths = list(executor.map(len, ["a", "bb", "ccc", "dddd", "eeeee"]))
        info("lengths via map (shared jobserver): %s", lengths)

        # Submit a slow task that holds the only available slot
        slow = executor.submit(time.sleep, 0.5)

        # Because of slow, this future queues as PENDING and is cancellable
        pending = executor.submit(len, "pending")

        # Cancel PENDING future before it is dispatched to a worker
        cancelled = pending.cancel()
        info("pending cancelled: %s", cancelled)
        try:
            pending.result()
            raise AssertionError("Unexpected")
        except CancelledError:
            pass

        # Collect the slow task's result; executor exits cleanly
        assert slow.result() is None


if __name__ == "__main__":
    # Configure logging, showing lifecycle messages, and announce startup
    process_start()
    getLogger("jobserver").setLevel(DEBUG)
    main()
```
