Source code for scitex_etc.wait_key

#!/usr/bin/env python3
# File: src/scitex_etc/wait_key.py

import multiprocessing
import time

import readchar


[docs] def wait_key(p, *, read_key=None, printer=print): """Block until the user presses ``q``, then terminate process ``p``. Echoes each key as it is pressed; on ``q`` prints a notice and calls ``p.terminate()``. Parameters ---------- p A process-like object exposing ``terminate()``. read_key : callable, optional Zero-arg callable returning the next key as a string. Defaults to ``readchar.readchar``. Injectable as a test seam (no mocks). printer : callable, optional Output sink, defaults to the builtin ``print``. Injectable so tests can record output without patching ``builtins.print``. """ if read_key is None: read_key = readchar.readchar key = "x" while key != "q": key = read_key() printer(key) printer("q was pressed.") p.terminate()
[docs] def count(*, printer=print, sleeper=time.sleep): """Print an incrementing counter forever, sleeping 1s between values. Parameters ---------- printer : callable, optional Output sink, defaults to the builtin ``print``. Injectable test seam. sleeper : callable, optional One-arg sleep callable, defaults to ``time.sleep``. Injectable so tests can run without real delay and bound the loop. """ counter = 0 while True: printer(counter) sleeper(1) counter += 1
if __name__ == "__main__": p1 = multiprocessing.Process(target=count) p1.start() wait_key(p1) print("aaa") # EOF