Metadata-Version: 2.4
Name: asyncio-extensions
Version: 0.2.0
Summary: Some (maybe) useful extensions for asyncio
Project-URL: Changelog, https://github.com/hartungstenio/asyncio-extensions/blob/main/CHANGELOG.md
Project-URL: Documentation, https://github.com/hartungstenio/asyncio-extensions#readme
Project-URL: Issues, https://github.com/hartungstenio/asyncio-extensions/issues
Project-URL: ReleaseNotes, https://github.com/hartungstenio/asyncio-extensions/releases
Project-URL: Source, https://github.com/hartungstenio/asyncio-extensions
Author-email: Christian Hartung <6785871+hartungstenio@users.noreply.github.com>
License-Expression: MIT
License-File: LICENSE.txt
Keywords: asyncio,tasks
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: typing-extensions; python_version < '3.13'
Description-Content-Type: text/markdown

# asyncio-extensions

[![PyPI - Version](https://img.shields.io/pypi/v/asyncio-extensions.svg)](https://pypi.org/project/asyncio-extensions)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asyncio-extensions.svg)](https://pypi.org/project/asyncio-extensions)
[![codecov](https://codecov.io/github/hartungstenio/asyncio-extensions/graph/badge.svg?token=1MEZ4NBUJH)](https://codecov.io/github/hartungstenio/asyncio-extensions)
[![pre-commit.ci status](https://results.pre-commit.ci/badge/github/hartungstenio/asyncio-extensions/main.svg)](https://results.pre-commit.ci/latest/github/hartungstenio/asyncio-extensions/main)

-----

## Installation

```console
pip install asyncio-extensions
```

## Usage

### TaskGroup

`asyncio-extensions` provides a cancellable version of AsyncIO's `TaskGroup`.

```python
import asyncio

from asyncio_extensions import TaskGroup

queue = asyncio.Queue()
async with TaskGroup() as tg:
    for _ in range(10):
        tg.create_task(consume_from_queue(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()
```

#### LimitedTaskGroup

A version of `TaskGroup` that limits the number of concurrently running tasks.

```python
import asyncio

from asyncio_extensions import LimitedTaskGroup

queue = asyncio.Queue()
async with LimitedTaskGroup(3) as tg:
    for _ in range(50):
        tg.create_task(some_expensive_operation(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()
```

#### TerminateTaskGroup and force_terminate_task_group

`TerminateTaskGroup` and `force_terminate_task_group` implement the [terminating a task group](https://docs.python.org/3/library/asyncio-task.html#terminating-a-task-group) pattern from the Python docs. Schedule `force_terminate_task_group()` as a task to stop the entire group early; catch `TerminateTaskGroup` with `except*` to suppress it.

When using `asyncio_extensions.TaskGroup`, suppression is automatic — no `except*` block needed.

```python
import asyncio

from asyncio_extensions import TerminateTaskGroup, force_terminate_task_group

async def main() -> None:
    try:
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(do_work())
            tg.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass
```

### checkpoint

The `checkpoint` function yields control to the event loop. It is a more elegant approach to do-nothing tasks, giving other tasks a chance to run.

```python
from asyncio_extensions import checkpoint

class DummyChannel:
    async def send_message(self, message):
        await checkpoint()
```

### sleep_forever

The `sleep_forever` function never returns. It simply keeps yielding control to the event loop.

```python
from asyncio_extensions import sleep_forever

class DummyChannel:
    async def receive_message(self):
        await sleep_forever()
```

### heartbeat

The `heartbeat` function runs a given callable at a regular interval.

```python
from asyncio_extensions import heartbeat

async def ping():
    pass

async with TaskGroup() as tg:
    tg.create_task(heartbeat(5, ping))

    await some_long_running_process()
```

### identity

The `identity` function yields control back to the event loop once, and then returns the value that was passed in. It is useful when you already have a cached result and want to create a task that behaves like any other asynchronous operation.

```python
from asyncio_extensions import TaskGroup, identity

async def get_product(product_id: int):
    cached_product = await product_cache.get(product_id)

    async with TaskGroup() as tg:
        if cached_product is not None:
            task = tg.create_task(identity(cached_product))
        else:
            task = tg.create_task(fetch_product_from_api(product_id))

        tg.create_task(update_search_metrics(product_id))

    product = task.result()
    return product
```

### asyncify

The `asyncify` function ensures a callable can be awaited. If the callable is already a coroutine function, it is returned as-is. Otherwise, it is wrapped so that calls run in a separate thread.

```python
from asyncio_extensions import asyncify

def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await asyncify(blocking_read)("data.txt")
```

It can also be used as a decorator:

```python
from asyncio_extensions import asyncify

@asyncify
def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await blocking_read("data.txt")
```

### asyncify_iterable

The `asyncify_iterable` function converts any sync or async iterable into an `AsyncIterable`. If the input is already async, it is returned unchanged. Otherwise it is wrapped in an async generator that yields each item and calls `checkpoint()` between items to avoid monopolising the event loop on large inputs.

```python
from asyncio_extensions import asyncify_iterable

async def process(items):
    async for item in asyncify_iterable(items):
        await handle(item)
```

### iterate_queue

The `iterate_queue` async generator wraps an `asyncio.Queue` so it can be consumed with a plain `async for` loop. It calls `task_done()` automatically after each item and stops when the queue is shut down (Python 3.13+) or when a sentinel value is dequeued.

```python
import asyncio

from asyncio_extensions import iterate_queue

async def process_items(queue: asyncio.Queue[str]) -> None:
    async for item in iterate_queue(queue):
        print(item)
```

To signal the end of the stream, call `queue.shutdown()` from the producer (Python 3.13+):

```python
async def producer(queue: asyncio.Queue[str]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    queue.shutdown()
```

On older Python versions, put the `STOP` sentinel in the queue when done:

```python
from asyncio_extensions import iterate_queue, STOP

async def producer(queue: asyncio.Queue[object]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    await queue.put(STOP)

async def consumer(queue: asyncio.Queue[object]) -> None:
    async for item in iterate_queue(queue):
        print(item)
```

> **Note:** `STOP` is deprecated on Python 3.13+. Prefer `queue.shutdown()` instead.

### fill_queue

The `fill_queue` coroutine fills an `asyncio.Queue` from any sync or async iterable. It accepts both `Iterable` and `AsyncIterable` sources and blocks if the queue is full until space becomes available.

```python
import asyncio

from asyncio_extensions import fill_queue

async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue()
    await fill_queue(range(10), queue)
```

It also works with async iterables:

```python
async def source():
    for item in fetch_from_api():
        yield item

await fill_queue(source(), queue)
```

### merge_iterables

The `merge_iterables` async context manager merges multiple sync or async iterables into a single interleaved stream, feeding all sources into a shared queue concurrently.

```python
import asyncio

from asyncio_extensions import merge_iterables

async def main() -> None:
    async with merge_iterables([1, 2, 3], [4, 5, 6]) as stream:
        async for item in stream:
            print(item)
```

It also accepts async iterables, which can produce items in parallel:

```python
async def fetch_page(page: int):
    ...  # yields items from a remote page

async with merge_iterables(fetch_page(1), fetch_page(2)) as stream:
    async for item in stream:
        process(item)
```

### ManagedStream

`ManagedStream[T]` is the type alias for an async context manager that yields an `AsyncIterator[T]`. It is the return type of `safe_gen` and `merge_iterables`, and the accepted parameter type of `flatten_stream`. Use it to annotate your own functions that expose a context-managed async stream:

```python
from asyncio_extensions import ManagedStream

def my_stream() -> ManagedStream[int]:
    ...
```

### safe_gen

The `safe_gen` decorator converts an async generator function into a context manager, enforcing correct handling of early exits. A plain async generator abandoned before exhaustion leaks resources and keeps running indefinitely — the caller has no way to know it needs to call `aclose()`. By returning a context manager instead, `safe_gen` makes cleanup syntactically mandatory: callers must use `async with`, which guarantees `aclose()` is called on exit regardless of how iteration ends. It also suppresses `GeneratorExit` raised inside an exception group, so it composes safely with `TaskGroup`.

```python
from asyncio_extensions import safe_gen

@safe_gen
async def paginate(url: str) -> AsyncGenerator[dict]:
    while url:
        response = await fetch(url)
        for item in response["results"]:
            yield item
        url = response.get("next")

async with paginate("https://api.example.com/items") as stream:
    async for item in stream:
        if should_stop(item):
            break  # generator is closed automatically
```

### flatten_stream

The `flatten_stream` async generator enters a `ManagedStream` context manager and yields its items directly, without requiring an explicit `async with` block. This is particularly useful when a plain `async for` loop is the only interface available — such as Django's `StreamingHttpResponse`.

The recommended pattern is to keep the `async with` block until the last possible moment, and only apply `flatten_stream` at the interface boundary:

```python
from contextlib import asynccontextmanager
from django.http import StreamingHttpResponse
from asyncio_extensions import flatten_stream, merge_iterables

async def export_view(request):
    async def serialized(stream):
        async for row in stream:
            yield serialize(row)

    @asynccontextmanager
    async def response_body():
        async with merge_iterables(fetch_orders(), fetch_invoices()) as stream:
            yield serialized(stream)

    return StreamingHttpResponse(flatten_stream(response_body()), content_type="text/csv")
```

For early-exit scenarios, wrap the result in `aclosing` to ensure proper cleanup:

```python
from contextlib import aclosing

async with aclosing(flatten_stream(merge_iterables(source_a, source_b))) as stream:
    async for item in stream:
        if done(item):
            break
```

### iscoroutinefunction, markcoroutinefunction, and is_awaitable

The `iscoroutinefunction` helper checks whether a callable is already a coroutine function. It is re-exported from `inspect` on newer Python versions and from `asyncio` on older versions, depending on the runtime.

The `markcoroutinefunction` helper marks a normal sync callable as a coroutine function. On Python 3.12+ this is `inspect.markcoroutinefunction`, but with the return type annotated so the function can be treated as a coroutine function in type-checked code.

`is_awaitable` is a typed variant of `iscoroutinefunction` that returns a `TypeIs` guard, allowing type checkers to narrow the callable's return type to `Awaitable` in the `True` branch.

```python
from asyncio_extensions import iscoroutinefunction, is_awaitable, markcoroutinefunction

async def main():
    def sync_task() -> int:
        return 42

    assert iscoroutinefunction(sync_task) is False

    marked = markcoroutinefunction(sync_task)
    assert iscoroutinefunction(marked) is True

async def call(fn: Callable[[], int] | Callable[[], Awaitable[int]]) -> int:
    if is_awaitable(fn):
        return await fn()  # type checker knows fn() returns Awaitable[int] here
    return fn()
```

## Contributing

Pull requests are welcome. For major changes, please open an issue first
to discuss what you would like to change.

Please make sure to update tests as appropriate.

## License

`asyncio-extensions` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.
