Metadata-Version: 2.4
Name: lexigram-tasks
Version: 0.1.1
Summary: Background task processing for Lexigram Framework - Scheduling, workers, and job queues
Project-URL: Homepage, https://github.com/lexigram-dev/lexigram
Project-URL: Repository, https://github.com/lexigram-dev/lexigram
Project-URL: Documentation, https://docs.lexigram.dev/tasks
Project-URL: Issues, https://github.com/lexigram-dev/lexigram/issues
Project-URL: Changelog, https://github.com/lexigram-dev/lexigram/blob/main/CHANGELOG.md
Author-email: Lexigram Framework Team <team@lexigram.dev>
Maintainer-email: Lexigram Framework Team <team@lexigram.dev>
License: MIT
License-File: LICENSE
Keywords: async,background,cron,framework,jobs,queue,scheduling,tasks,workers
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: croniter>=1.4.0
Requires-Dist: lexigram-contracts>=0.1.0
Requires-Dist: lexigram>=0.1.1
Requires-Dist: psutil>=5.9.0
Requires-Dist: typing-extensions>=4.0.0
Provides-Extra: admin
Requires-Dist: lexigram-admin>=0.1.1; extra == 'admin'
Provides-Extra: all
Requires-Dist: aio-pika>=9.3.0; extra == 'all'
Requires-Dist: lexigram-testing>=0.1.1; extra == 'all'
Requires-Dist: pika>=1.3.0; extra == 'all'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'all'
Requires-Dist: pytest-cov>=4.0.0; extra == 'all'
Requires-Dist: pytest-mock>=3.10.0; extra == 'all'
Requires-Dist: pytest>=8.0.0; extra == 'all'
Requires-Dist: redis>=5.0.0; extra == 'all'
Requires-Dist: rq>=1.15.0; extra == 'all'
Provides-Extra: dev
Requires-Dist: black>=23.0.0; extra == 'dev'
Requires-Dist: mypy>=1.0.0; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Requires-Dist: types-croniter>=2.0.0; extra == 'dev'
Provides-Extra: rabbitmq
Requires-Dist: aio-pika>=9.3.0; extra == 'rabbitmq'
Requires-Dist: pika>=1.3.0; extra == 'rabbitmq'
Provides-Extra: redis
Requires-Dist: redis>=5.0.0; extra == 'redis'
Requires-Dist: rq>=1.15.0; extra == 'redis'
Provides-Extra: test
Requires-Dist: lexigram-testing>=0.1.1; extra == 'test'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'test'
Requires-Dist: pytest-cov>=4.0.0; extra == 'test'
Requires-Dist: pytest-mock>=3.10.0; extra == 'test'
Requires-Dist: pytest>=8.0.0; extra == 'test'
Description-Content-Type: text/markdown

# lexigram-tasks

Background task processing for Lexigram Framework — Scheduling, workers, and job queues

---

## Overview

lexigram-tasks provides background task execution, scheduling, and async job queues with support for Redis, RabbitMQ, PostgreSQL, and in-memory backends. It features configurable retry policies, dead-letter queues, priority queues, rate limiting, cron scheduling, and Prometheus metrics. All services are wired via `TasksProvider`, which registers task protocols with the DI container.

---

## Install

```bash
uv add lexigram-tasks
# Optional extras
uv add "lexigram-tasks[redis]"     # Redis backend
uv add "lexigram-tasks[rabbitmq]"  # RabbitMQ backend
```

## Quick Start

```python
from lexigram import Application
from lexigram.di.module import Module, module

# Import the module from the package
from lexigram.tasks import TasksModule

@module(imports=[TasksModule.configure(...)])
class AppModule(Module):
    pass

app = Application(modules=[AppModule])
if __name__ == "__main__":
    app.run()
```

## Configuration

> **Zero-config usage:** Call `TasksModule.configure()` with no arguments to use defaults.

### Option 1 — YAML file

```yaml
# application.yaml
tasks:
  backend:
    type: redis
    redis_url: redis://localhost:6379/0
  worker:
    worker_count: 4
    max_concurrent_tasks: 10
  scheduler:
    enabled: true
    timezone: UTC
```

### Option 2 — Profiles + Environment Variables *(recommended)*

```bash
export LEX_TASKS__ENABLED=true
# Environment variables for each field
```

### Option 3 — Python

```python
from lexigram.tasks.config import TaskConfig
from lexigram.tasks import TasksModule

config = TaskConfig(
    backend=TaskBackendConfig(type="redis", redis_url="redis://localhost:6379/0"),
    worker=TaskWorkerConfig(worker_count=4, max_concurrent_tasks=10),
)
TasksModule.configure(queue=queue, worker_count=4, enable_scheduler=True)
```

### Config reference

| Field | Default | Env var | Description |
|-------|---------|---------|-------------|
| `backend.type` | `memory` | `LEX_TASKS__BACKEND__TYPE` | Queue backend (`redis`, `rabbitmq`, `postgres`, `memory`) |
| `backend.redis_url` | `redis://localhost:6379/0` | `LEX_TASKS__BACKEND__REDIS_URL` | Redis connection URL |
| `backend.amqp_url` | `amqp://localhost` | `LEX_TASKS__BACKEND__AMQP_URL` | AMQP connection URL |
| `worker.worker_count` | `4` | `LEX_TASKS__WORKER__WORKER_COUNT` | Number of worker processes |
| `worker.max_concurrent_tasks` | `10` | `LEX_TASKS__WORKER__MAX_CONCURRENT_TASKS` | Max tasks executed in parallel per worker |
| `worker.default_timeout` | `300` | `LEX_TASKS__WORKER__DEFAULT_TIMEOUT` | Task execution timeout in seconds |
| `worker.max_retries` | `3` | `LEX_TASKS__WORKER__MAX_RETRIES` | Maximum retry attempts per task |
| `scheduler.enabled` | `true` | `LEX_TASKS__SCHEDULER__ENABLED` | Enable cron-based task scheduling |
| `scheduler.timezone` | `UTC` | `LEX_TASKS__SCHEDULER__TIMEZONE` | Timezone for cron schedule evaluation |

## Module Factory Methods

| Method | Description |
|--------|-------------|
| `TasksModule.configure(queue, worker_count, enable_scheduler)` | Configure with explicit queue and worker settings |
| `TasksModule.stub()` | Minimal config for testing |

## Key Features

- **Multiple backends** — Redis, RabbitMQ, PostgreSQL (transactional), in-memory
- **Retry policies** — Configurable `retries`, `backoff`, `max_delay` per task
- **Dead-letter queue** — Failed jobs routed to DLQ for inspection / replay
- **Priority queues** — `urgent`, `default`, `bulk` built-in; custom queues supported
- **Rate limiting** — Per-queue and per-task throughput caps
- **Concurrency** — Bounded worker pool with backpressure
- **Cron scheduling** — Cron-expression task scheduling via `@scheduled` decorator
- **Observability** — Prometheus metrics for queue depth, latency, error rate
- **Health checks** — `/health/tasks` endpoint via `lexigram-monitor`

## Testing

```python
async with Application.boot(modules=[TasksModule.stub()]) as app:
    # your test code
    ...
```

## BackgroundTaskManager

A container-injectable service for fire-and-go tasks that ensures no task handle is lost and all pending work is cancelled on framework shutdown.

```python
from lexigram.tasks import BackgroundTaskManager

class MyService:
    def __init__(self, task_manager: BackgroundTaskManager) -> None:
        self._tasks = task_manager

    async def kick_off_work(self) -> None:
        self._tasks.track(self._do_something())

    async def kick_off_named_work(self) -> None:
        self._tasks.track_named("my-named-job", self._do_something())

    async def check_pending(self) -> int:
        return self._tasks.pending_count

# In your Provider.shutdown():
await task_manager.shutdown(timeout=30.0)
```

Register as a singleton in your provider:

```python
from lexigram.tasks import BackgroundTaskManager
from lexigram.di.provider import Provider

class MyProvider(Provider):
    async def register(self, container):
        container.singleton(BackgroundTaskManager, BackgroundTaskManager())

    async def shutdown(self):
        mgr = await self._container.resolve(BackgroundTaskManager)
        await mgr.shutdown(timeout=30.0)
```

## ScheduledWorker

A base class for services that run a cycle of work on a fixed interval — replacing hand-rolled `while not stop_event` loops.

```python
from lexigram.tasks import BackgroundTaskManager, OnErrorPolicy, ScheduledWorker

class RetentionWorker(ScheduledWorker):
    interval_seconds = 3600.0          # run every hour
    initial_delay_seconds = 5.0        # wait 5 s before the first cycle
    on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE  # (default)

    async def run_cycle(self) -> None:
        await self._repo.delete_expired_records()

# In your provider.boot():
task_manager = await container.resolve(BackgroundTaskManager)
self._worker = RetentionWorker(task_manager=task_manager)
await self._worker.start()

# In your provider.shutdown():
await self._worker.stop()
```

Override at construction time to tune per-instance without subclassing:

```python
worker = RetentionWorker(
    task_manager=task_manager,
    interval_seconds=300.0,
    max_jitter_seconds=30.0,
    on_error_policy=OnErrorPolicy.BACKOFF,
)
```

`OnErrorPolicy` values:

| Value | Behaviour on `run_cycle` error |
|---|---|
| `LOG_AND_CONTINUE` | Log the exception and resume on the next interval (default). |
| `BACKOFF` | Log and double the sleep time (up to 10× interval). |
| `STOP` | Log and stop the worker permanently. |

## Key Source Files

| File | What it contains |
|------|----------------|
| `src/lexigram/tasks/module.py` | `TasksModule` class with factory methods |
| `src/lexigram/tasks/di/provider.py` | `TasksProvider` — wires task protocols into DI container |
| `src/lexigram/tasks/config.py` | `TaskConfig` and sub-config classes |
| `src/lexigram/tasks/backends/` | Backend implementations (memory, redis, rabbitmq, postgres) |
| `src/lexigram/tasks/scheduling/` | Cron scheduler and scheduled task decorators |
| `src/lexigram/tasks/background_task_manager.py` | `BackgroundTaskManager` — lifecycle-aware task tracking (LEX-006) |
| `src/lexigram/tasks/scheduled_worker.py` | `ScheduledWorker` + `OnErrorPolicy` — periodic worker base class (LEX-005) |