Threaded Generator¶
This module provides generator wrappers that execute source iterators in separate threads or processes, allowing for parallel producer-consumer pipelines.
Key components:
ThreadedGenerator: Runs the source iterator in a separate thread.ProcessGenerator: Runs the source iterator in a separate process.ParallelGenerator: Base class supporting multiple workers (threads or processes).
Examples¶
>>> import time
>>> def slow_source():
... for i in range(5):
... time.sleep(0.1)
... yield i
...
>>> # Wrap the slow source in a threaded generator
>>> gen = ThreadedGenerator(slow_source(), maxsize=3)
>>> for item in gen:
... print(item) # Consumption happens in main thread, production in background
0
1
2
3
4
>>> # Using ProcessGenerator for CPU-bound tasks
>>> def cpu_bound_source():
... for i in range(5):
... yield sum(range(1000000 * i))
...
>>> gen = ProcessGenerator(cpu_bound_source(), maxsize=2)
>>> list(gen)
[0, 0, 1999999000000, ...]
>>> # Using ParallelGenerator with multiple workers
>>> # Note: You must wrap the generator function with @partial_generator.
>>> # This is required because standard generators cannot be pickled or restarted.
>>> # The wrapper delays execution so each worker process can create its own
>>> # fresh iterator instance.
>>> @partial_generator
... def parallel_source(x):
... yield x * x
...
>>> # This will spawn 4 worker processes
>>> gen = ParallelGenerator(parallel_source(2), num_workers=4)
>>> list(gen)
[4, 4, 4, 4]
Generators¶
- class ThreadedGenerator(
- it,
- maxsize=1,
- disable=False,
- name=None,
- queue_type=None,
- monitor=None,
Bases:
ParallelGenerator,GenericA generator that runs the source iterator in a separate thread.
Communication is done using a
queue.Queueby default. If this is chained with a process based Generator, you need to use amultiprocessing.Queuefor the queue_type.- Parameters:
Example¶
>>> gen = ThreadedGenerator(range(10), maxsize=5) >>> list(gen) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- class ProcessGenerator(
- it,
- maxsize=1,
- disable=False,
- name=None,
- queue_type=None,
- monitor=None,
Bases:
ParallelGenerator,GenericA generator that runs the source iterator in a separate process.
Communication is done using a
multiprocessing.Queueby default.- Parameters:
Example¶
>>> gen = ProcessGenerator(range(10), maxsize=5) >>> list(gen) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- class ParallelGenerator(
- it,
- num_workers=1,
- maxsize=1,
- disable=False,
- name=None,
- method='process',
- queue_type=None,
- monitor=None,
Bases:
GenericA generator wrapper that executes the source iterator in separate workers (threads or processes).
- Parameters:
it (Iterable[T]) – The source iterable to consume.
num_workers (int) – Number of worker processes/threads to spawn. Defaults to 1.
maxsize (int) – Maximum size of the internal queue. Defaults to 1.
disable (bool) – If True, run synchronously in the main thread/process without workers.
name (str | None) – Name for the generator, used in monitoring/logging.
method (Method) – Execution method, either “process” or “thread”.
queue_type (Type[Q[T]] | None) – Custom queue class to use. Defaults to
multiprocessing.Queueif method is “process”, orqueue.Queueif method is “thread”.monitor (Monitor | None) – Monitor instance for tracking performance statistics.
Warning
When using multiple workers (num_workers > 1), the input iterable it must be a factory or a function wrapped with
partial_generator(). Standard python generators cannot be pickled or restarted, so each worker needs a fresh iterator instance.
Utilities¶
- partial_generator(f)[source]¶
A decorator that transforms a generator function into one that returns a PartialGenerator. Decorate all generators that are passed to ParallelGenerator with this.
- Parameters:
f (Callable[P, Iterator[T]]) – The generator function to wrap.
- Returns:
A wrapper function that, when called, returns a PartialGenerator instance instead of immediately executing the generator logic.
- Return type:
Callable[P, PartialGenerator[T]]
- class PartialGenerator(f)[source]¶
Bases:
GenericA wrapper class that delays the execution of a generator function.
- Parameters:
f (Callable[[], Iterator[T]]) – A callable that takes no arguments and returns an Iterator[T]. This is typically a functools.partial object.
- iter_queue(queue)[source]¶
Generator that yields items from a ShutdownQueue until it is shut down.
This serves as the consumer side of the pipeline. It continuously pulls items from the queue. When the producer worker finishes and the queue is shut down (raising a ShutDown exception internally or receiving a Sentinel), this generator catches the signal and stops yielding, effectively closing the stream.
- Parameters:
queue (SQ[T]) – The queue to consume items from.
- Yield:
Items retrieved from the queue.
- Return type:
Generator[T, None, None]