Threaded Generator

This module provides generator wrappers that execute source iterators in separate threads or processes, allowing for parallel producer-consumer pipelines.

Key components:

Examples

>>> import time
>>> def slow_source():
...     for i in range(5):
...         time.sleep(0.1)
...         yield i
...
>>> # Wrap the slow source in a threaded generator
>>> gen = ThreadedGenerator(slow_source(), maxsize=3)
>>> for item in gen:
...     print(item)  # Consumption happens in main thread, production in background
0
1
2
3
4

>>> # Using ProcessGenerator for CPU-bound tasks
>>> def cpu_bound_source():
...     for i in range(5):
...         yield sum(range(1000000 * i))
...
>>> gen = ProcessGenerator(cpu_bound_source(), maxsize=2)
>>> list(gen)
[0, 0, 1999999000000, ...]

>>> # Using ParallelGenerator with multiple workers
>>> # Note: You must wrap the generator function with @partial_generator.
>>> # This is required because standard generators cannot be pickled or restarted.
>>> # The wrapper delays execution so each worker process can create its own
>>> # fresh iterator instance.
>>> @partial_generator
... def parallel_source(x):
...     yield x * x
...
>>> # This will spawn 4 worker processes
>>> gen = ParallelGenerator(parallel_source(2), num_workers=4)
>>> list(gen)
[4, 4, 4, 4]

Generators

class ThreadedGenerator(
it,
maxsize=1,
disable=False,
name=None,
queue_type=None,
monitor=None,
)[source]

Bases: ParallelGenerator, Generic

A generator that runs the source iterator in a separate thread.

Communication is done using a queue.Queue by default. If this is chained with a process based Generator, you need to use a multiprocessing.Queue for the queue_type.

Parameters:
  • it (Iterable[T]) – The source iterable.

  • maxsize (int) – Max queue size.

  • disable (bool) – If True, run in main thread.

  • name (str | None) – Name for monitoring.

  • queue_type (Type[Q[T]] | None) – Custom queue type.

  • monitor (Monitor | None) – Monitor instance.

Example

>>> gen = ThreadedGenerator(range(10), maxsize=5)
>>> list(gen)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
class ProcessGenerator(
it,
maxsize=1,
disable=False,
name=None,
queue_type=None,
monitor=None,
)[source]

Bases: ParallelGenerator, Generic

A generator that runs the source iterator in a separate process.

Communication is done using a multiprocessing.Queue by default.

Parameters:
  • it (Iterable[T]) – The source iterable.

  • maxsize (int) – Max queue size.

  • disable (bool) – If True, run in main process.

  • name (str | None) – Name for monitoring.

  • queue_type (Type[Q[T]] | None) – Custom queue type.

  • monitor (Monitor | None) – Monitor instance.

Example

>>> gen = ProcessGenerator(range(10), maxsize=5)
>>> list(gen)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
class ParallelGenerator(
it,
num_workers=1,
maxsize=1,
disable=False,
name=None,
method='process',
queue_type=None,
monitor=None,
)[source]

Bases: Generic

A generator wrapper that executes the source iterator in separate workers (threads or processes).

Parameters:
  • it (Iterable[T]) – The source iterable to consume.

  • num_workers (int) – Number of worker processes/threads to spawn. Defaults to 1.

  • maxsize (int) – Maximum size of the internal queue. Defaults to 1.

  • disable (bool) – If True, run synchronously in the main thread/process without workers.

  • name (str | None) – Name for the generator, used in monitoring/logging.

  • method (Method) – Execution method, either “process” or “thread”.

  • queue_type (Type[Q[T]] | None) – Custom queue class to use. Defaults to multiprocessing.Queue if method is “process”, or queue.Queue if method is “thread”.

  • monitor (Monitor | None) – Monitor instance for tracking performance statistics.

Warning

When using multiple workers (num_workers > 1), the input iterable it must be a factory or a function wrapped with partial_generator(). Standard python generators cannot be pickled or restarted, so each worker needs a fresh iterator instance.

join()[source]

Wait for all workers to finish and check for errors propagated from workers.

Raises:

RuntimeError – If an exception occurred in any worker.

Return type:

None

terminate()[source]

Signal the queue to shutdown immediately and wait for workers to join.

Return type:

None

Utilities

partial_generator(f)[source]

A decorator that transforms a generator function into one that returns a PartialGenerator. Decorate all generators that are passed to ParallelGenerator with this.

Parameters:

f (Callable[P, Iterator[T]]) – The generator function to wrap.

Returns:

A wrapper function that, when called, returns a PartialGenerator instance instead of immediately executing the generator logic.

Return type:

Callable[P, PartialGenerator[T]]

class PartialGenerator(f)[source]

Bases: Generic

A wrapper class that delays the execution of a generator function.

Parameters:

f (Callable[[], Iterator[T]]) – A callable that takes no arguments and returns an Iterator[T]. This is typically a functools.partial object.

iter_queue(queue)[source]

Generator that yields items from a ShutdownQueue until it is shut down.

This serves as the consumer side of the pipeline. It continuously pulls items from the queue. When the producer worker finishes and the queue is shut down (raising a ShutDown exception internally or receiving a Sentinel), this generator catches the signal and stops yielding, effectively closing the stream.

Parameters:

queue (SQ[T]) – The queue to consume items from.

Yield:

Items retrieved from the queue.

Return type:

Generator[T, None, None]