Metadata-Version: 2.4
Name: cs-naysync
Version: 20260415
Summary: An attempt at comingling async-code and nonasync-code in an ergonomic way.
Keywords: python3
Author-email: Cameron Simpson <cs@cskk.id.au>
Description-Content-Type: text/markdown
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Requires-Dist: cs.deco>=20251230
Requires-Dist: cs.semantics>=20250103
Project-URL: MonoRepo Commits, https://bitbucket.org/cameron_simpson/css/commits/branch/main
Project-URL: Monorepo Git Mirror, https://github.com/cameron-simpson/css
Project-URL: Monorepo Hg/Mercurial Mirror, https://hg.sr.ht/~cameron-simpson/css
Project-URL: Source, https://github.com/cameron-simpson/css/blob/main/lib/python/cs/naysync.py

An attempt at comingling async-code and nonasync-code in an ergonomic way.

*Latest release 20260415*:
* New to_threadpool() function like to_thread(0 but utilitiing a ThreadPoolExecutor.
* aqget: new optional tpe:ThreadPoolExecutor parameter to use a thread pool for the to_thread call, if used.
* New Lock context manager class which can be used from sync and async contexts.

One of the difficulties in adapting non-async code for use in
an async world is that anything asynchronous needs to be turtles
all the way down: a single blocking synchronous call anywhere
in the call stack blocks the async event loop.

This module presently provides:
- `@afunc`: a decorator to make a synchronous function asynchronous
- `@agen`: a decorator to make a synchronous generator asynchronous
- `amap(func,iterable)`: asynchronous mapping of `func` over an iterable
- `aqget(q)`: asynchronous function to get an item from a `queue.Queue` or similar
- `aqiter(q)`: asynchronous generator to yield items from a `queue.Queue` or similar
- `async_iter(iterable)`: return an asynchronous iterator of an iterable
- `IterableAsyncQueue`: an iterable flavour of `asyncio.Queue` with no `get` methods
- `AsyncPipeLine`: a pipeline of functions connected together with `IterableAsyncQueue`s

Short summary:
* `afunc`: A decorator for a synchronous function which turns it into an asynchronous function. If `func` is already an asynchronous function it is returned unchanged. If `fast` is true (default `False`) then `func` is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in `asyncio.to_thread`.
* `agen`: A decorator for a synchronous generator which turns it into an asynchronous generator. If `genfunc` already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.
* `amap`: An asynchronous generator yielding the results of `func(item)` for each `item` in the iterable `it`.
* `aqget`: An asynchronous function to get an item from a `queue.Queue`like object `q`. It must support the `.get()` and `.get_nowait()` methods; if requires the `.get()` is dispatched using `to_thread()`.
* `aqiter`: An asynchronous generator to yield items from a `queue.Queue`like object `q`. It must support the `.get()` and `.get_nowait()` methods.
* `async_iter`: Return an asynchronous iterator yielding items from the iterable `it`. An asynchronous iterable returns `aiter(it)` directly.
* `AsyncPipeLine`: An `AsyncPipeLine` is an asynchronous iterable with a `put` method to provide input for processing.
* `IterableAsyncQueue`: An iterable subclass of `asyncio.Queue`.
* `Lock`: A lock object which can be used in synchronous and asynchonous contexts.
* `StageMode`: Special modes for `AsyncPipeLine` pipeline stages.
* `to_threadpool`: A variant on `asyncio.to_thread` which dispatches the thread via a `concurrent.futures.ThreadPoolExecutor`.

Module contents:
- <a name="afunc"></a>`afunc(*da, **dkw)`: A decorator for a synchronous function which turns it into
  an asynchronous function.
  If `func` is already an asynchronous function it is returned unchanged.
  If `fast` is true (default `False`) then `func` is presumed to consume
  negligible time and it is simply wrapped in an asynchronous function.
  Otherwise it is wrapped in `asyncio.to_thread`.

  Example:

      @afunc
      def func(count):
          time.sleep(count)
          return count

      slept = await func(5)

      @afunc(fast=True)
      def asqrt(n):
          return math.sqrt(n)
- <a name="agen"></a>`agen(*da, **dkw)`: A decorator for a synchronous generator which turns it into
  an asynchronous generator.
  If `genfunc` already an asynchronous generator it is returned unchanged.
  Exceptions in the synchronous generator are reraised in the asynchronous
  generator.

  The optional parameter `fast` is passed through to `async_iter`.

  Example:

      @agen
      def gen(count):
          for i in range(count):
              yield i
              time.sleep(1.0)

      async for item in gen(5):
          print(item)
- <a name="amap"></a>`amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], *, concurrent=False, unordered=False, indexed=False, fast=False)`: An asynchronous generator yielding the results of `func(item)`
  for each `item` in the iterable `it`.

  `it` may be a synchronous or asynchronous iterable.

  `func` may be a synchronous or asynchronous callable.

  If `concurrent` is `False` (the default), run each `func(item)`
  call in series.
  If `concurrent` is true run the function calls as `asyncio`
  tasks concurrently.

  If `unordered` is true (default `False`) yield results as
  they arrive, otherwise yield results in the order of the items
  in `it`, but as they arrive - tasks still evaluate concurrently
  if `concurrent` is true.

  If `indexed` is true (default `False`) yield 2-tuples of
  `(i,result)` instead of just `result`, where `i` is the index
  if each item from `it` counting from `0`.

  If `fast` is true (default `False`) assume that `func` does
  not block or otherwise take a long time.

  Example of an async function to fetch URLs in parallel.

      async def get_urls(urls : List[str]):
          """ Fetch `urls` in parallel.
              Yield `(url,response)` 2-tuples.
          """
          async for i, response in amap(
              requests.get, urls,
              concurrent=True, unordered=True, indexed=True,
          ):
              yield urls[i], response
- <a name="aqget"></a>`aqget(q: queue.Queue, tpe: concurrent.futures.thread.ThreadPoolExecutor = None)`: An asynchronous function to get an item from a `queue.Queue`like object `q`.
  It must support the `.get()` and `.get_nowait()` methods; if
  requires the `.get()` is dispatched using `to_thread()`.

  The optional `tpe` parameter may specify a
  `concurrent.futures.ThreadPoolExecutor` to use instead of
  `to_thread`. For example, the `Lock` class uses a thread pool
  to avoid deadlocking with the general purpose thread pool
  used by `to_thread`.
- <a name="aqiter"></a>`aqiter(q: queue.Queue, *, sentinel=<object object at 0x104dca3b0>)`: An asynchronous generator to yield items from a `queue.Queue`like object `q`.
  It must support the `.get()` and `.get_nowait()` methods.

  An optional `sentinel` object may be supplied, which ends iteration
  if encountered. If a sentinel is specified then this must be the only
  consumer of the queue because the sentinel is consumed.
- <a name="async_iter"></a>`async_iter(it: Union[Iterable, AsyncIterable], *, fast=None)`: Return an asynchronous iterator yielding items from the iterable `it`.
  An asynchronous iterable returns `aiter(it)` directly.

  If `fast` is true then `it` is iterated directly instead of
  via a distinct async generator. If not specified, `fast` is
  set to `True` if `it` is a `list` or `tuple` or `set`. A true
  value for this parameter indicates that fetching the next
  item from `it` is always effectively instant and never blocks.
- <a name="AsyncPipeLine"></a>`class AsyncPipeLine`: An `AsyncPipeLine` is an asynchronous iterable with a `put` method
  to provide input for processing.

  A new pipeline is usually constructed via the factory method
  `AsyncPipeLine.from_stages(stage_func,...)`.

  It has the same methods as an `IterableAsyncQueue`:
  - `async put(item)` to queue an item for processing
  - `async close()` to close the input, indicating end of the input items
  - iteration to consume the processed results

  It also has the following methods:
  - `async submit(AnyIterable)` to submit multiple items for processing
  - `async __call__(AnyIterable)` to submit the iterable for
    processing and consume the results by iteration


  Example:

      def double(item):
          yield item
          yield item
      pipeline = AsyncPipeLine.from_stages(
          double,
          double,
      )
      async for result in pipeline([1,2,3,4]):
          print(result)

*`AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None)`*:
Call the pipeline with an iterable.
Close `self.inq` after submitting items from `it`.
Return an asynchronous iterable (`self.outq`)
yielding results.

*`AsyncPipeLine.close(self)`*:
Close the input queue.

*`AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]`*:
Prepare an `AsyncPipeLine` from stage specifications.
Return `(inq,tasks,outq)` 3-tuple being an input `IterableAsyncQueue`
to receive items to process, a list of `asyncio.Task`s per
stage specification, and an output `IterableAsyncQueue` to
produce results. If there are no stage_specs the 2 queues
are the same queue.

Each stage specification is either:
- a stage function, implying a `batchsize` of `None`
- a 2-tuple of `(stage_func,batchsize)`
The `stage_func` and `batchsize` are as for the `run_stage` method.
In particular the `batchsize` should be:
- `None`, for a `stage_func` accepting a single item
- an `int` >=0, for a `stage_func` accepting a list of items
- `StageMode.STREAM`, for a `stage_func` accepting an `AsyncIterableQueue`

*`AsyncPipeLine.put(self, item)`*:
Put `item` onto the input queue.

*`AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None)`*:
Run a pipeline stage, copying items from `inq` to the `stage_func`
and putting results onto `outq`. After processing, `outq` is
closed.

`stage_func` is a callable which may be:
- a sync or async generator which yields results to place onto `outq`
- a sync or async function which returns a single result

If `batchsize` is `None`, the default, each input item is
passed to `stage_func(item)`, which yields the results from the
single item.

If `batchsize is StageMode.STREAM` then `stage_func`
should be a synchronous or asynchronous generator function
which receives `inq` as its sole parameter and yields
results.

If `batchsize` is an `int`, items from `inq` are collected
into batches up to a limit of `batchsize` (no limit if
`batchsize` is `0`) and passed to `stage_func(batch)`, which
yields the results from the batch of items.
If the `batchsize` is `0` the `stage_func` is called exactly
once with all the input items, even if there are no input
items.

*`AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None)`*:
Submit the items from `it` to the pipeline.
- <a name="IterableAsyncQueue"></a>`class IterableAsyncQueue(asyncio.queues.Queue)`: An iterable subclass of `asyncio.Queue`.

  This modifies `asyncio.Queue` by:
  - adding a `.close()` async method
  - making the queue iterable, with each iteration consuming an item via `.get()`

*`IterableAsyncQueue.__anext__(self)`*:
Fetch the next item from the queue.

*`IterableAsyncQueue.close(self)`*:
Close the queue.
It is not an error to close the queue more than once.

*`IterableAsyncQueue.get(self)`*:
We do not allow `.get()`.

*`IterableAsyncQueue.get_nowat(self)`*:
We do not allow `.get_nowait()`.

*`IterableAsyncQueue.put(self, item)`*:
Put `item` onto the queue.

*`IterableAsyncQueue.put_nowait(self, item)`*:
Put an item onto the queue without blocking.
- <a name="Lock"></a>`class Lock`: A lock object which can be used in synchronous and asynchonous contexts.

  The async enter step is built on the `aqget()` function.

  Example use:

      shared_lock = Lock("my shared lock")

  Synchronous use from some thread:

      with shared_lock:
          critical section

  Asynchronous use from some event loop:

      async with shared_lock:
          critical section

*`Lock.__init__(self, name=None, *, max_workers=64)`*:
Initialise the `Lock`.

Parameters:
* `name`: optional name for the lock
* `mac_workers`: optional number of threads in the per-lock thread pool,
  arbitrarily set to 64 since the defaults for `ThreadPoolExecutor`
  seems insanely low and `run_in_executor` is synchronous!
- <a name="StageMode"></a>`class StageMode(enum.StrEnum)`: Special modes for `AsyncPipeLine` pipeline stages.

*`StageMode.__format__`*

*`StageMode.__str__`*
- <a name="to_threadpool"></a>`to_threadpool(tpe: concurrent.futures.thread.ThreadPoolExecutor, func, *a, **kw)`: A variant on `asyncio.to_thread` which dispatches the thread
  via a `concurrent.futures.ThreadPoolExecutor`.

  You can just use the `run_in_executor` event loop method
  directly but this is handy if you've already got a thread
  pool for a specific purpose and code which fits `to_thread`.

# Release Log



*Release 20260415*:
* New to_threadpool() function like to_thread(0 but utilitiing a ThreadPoolExecutor.
* aqget: new optional tpe:ThreadPoolExecutor parameter to use a thread pool for the to_thread call, if used.
* New Lock context manager class which can be used from sync and async contexts.

*Release 20251119*:
Make most optional parameters keyword only.

*Release 20250306*:
* AsyncPipeLine.__call__: just return self.outq, use try/finally in submit.
* @agen: new optional `fast` parameter, plumbed to async_iter().
* amap: progressive async consume and dispatch, allowing yield of results as items come in - no longer waits for all items to be dispatched before yielding results.
* amap: new fast=False parameter to indicate that func does not block.
* Some small fixes.

*Release 20250103*:
* @afunc now accepts an async function and returns it unchanged.
* @agen now accepts an async generator and returns it unchanged.
* async_iter: now accepts an async iterable, return aiter(it) of it directly.
* New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
* async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
* async_iter: make missing `fast=` be True for list/tuple/set and False otherwise.
* @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
* New AsyncPipeLine, an asynchronous iterable with a `put` method to provide input for processing.
* New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
* New aqget(Queue), an async interface to queue.Queue.get.
* New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

*Release 20241221.1*:
Doc fix for amap().

*Release 20241221*:
* Simpler implementation of @afunc.
* Simplify implementation of @agen by using async_iter.
* Docstring improvements.

*Release 20241220*:
* New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
* New amap(func,iterable) asynchronously mapping a function over an iterable.

*Release 20241215*:
* @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
* @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.

*Release 20241214.1*:
Doc update.

*Release 20241214*:
Initial release with @agen and @afunc decorators.
