Metadata-Version: 2.4
Name: atqo
Version: 2.0.0
Summary: Async task queue orchestrator with resource-aware scheduling
Project-URL: Homepage, https://github.com/endremborza/atqo
Author-email: Endre Márk Borza <endremborza@gmail.com>
License: Copyright 2022 Endre Márk Borza
        
        Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
License-File: LICENSE
Requires-Python: >=3.12
Requires-Dist: tblib
Provides-Extra: fancy
Requires-Dist: tqdm; extra == 'fancy'
Description-Content-Type: text/markdown

# atqo

[![pypi](https://img.shields.io/pypi/v/atqo.svg)](https://pypi.org/project/atqo/)

Async task queue orchestrator with resource-aware scheduling.

Each actor class declares what it consumes (`requirements`); the scheduler holds the pool (`resources`) and decides how many actors of each class to run. Optional per-task rate limits gate dispatch against time-windowed budgets.

## Install

```bash
uv add atqo
```

## Usage

```python
from atqo import ActorBase, Scheduler, SchedulerTask, SingleCPUActor

class Scraper(SingleCPUActor):
    def consume(self, url):
        return fetch(url)

class HeavyActor(ActorBase):
    requirements = {"cpu": 2, "mem": 4}
    def consume(self, arg):
        return process(arg)

scheduler = Scheduler(
    actors=[Scraper, HeavyActor],
    resources={"cpu": 8, "mem": 16},
)

scheduler.refill_task_queue(
    [SchedulerTask(u, actor=Scraper) for u in urls]
    + [SchedulerTask(j, actor=HeavyActor) for j in jobs]
)
results = scheduler.join()
```

## Rate limits

Per-task budgets recover over time (token bucket), independent of the static resource pool:

```python
from atqo import RateLimit

scheduler = Scheduler(
    actors=[Scraper],
    resources={"cpu": 4},
    rate_limits={"site_a": RateLimit(10, per_seconds=60)},
)
SchedulerTask(url, actor=Scraper, rate_costs={"site_a": 1})
```

A task whose `rate_costs` exceeds a bucket's `capacity` raises `ImpossibleRateCost` at ingress — it could never run.

## Simple parallel API

```python
from atqo import parallel_map, parallel_consume

results = parallel_map(expensive_fn, items, workers=4)
parallel_consume(MyActor, items, workers=4)
```

## Patterns

### Stateful actors (logged-in browser, warm cache, etc.)

Register a separate actor class. Its `__init__` performs the setup; the scheduler routes tasks needing that state via `actor=`.

```python
class Browser(ActorBase):
    requirements = {"browser_slot": 1}
    def __init__(self): 
        self.driver = open_browser()
    def consume(self, url): 
        return self.driver.fetch(url)

class LoggedInBrowser(Browser):
    def __init__(self):
        super().__init__()
        self.driver.login(USER, PW)

scheduler = Scheduler(
    actors=[Browser, LoggedInBrowser],
    resources={"browser_slot": 4},
)
SchedulerTask(url, actor=LoggedInBrowser)
```

## Hang protection

Every blocking wait in the scheduler is bounded. Optional knobs on `Scheduler(...)`:

- `task_timeout`: per-task wall-clock cap. Exceeded attempts fail like any exception (count against `allowed_fail_count`).
- `stall_timeout`: if no progress for this long, raise `SchedulerStalled`.
- `poison_timeout`: how long graceful actor drain waits before force-cancel (default 5s).

`cleanup()` and `join()` always terminate; they cancel listeners and stop the event loop unconditionally.
