Metadata-Version: 2.4
Name: dramatiq-worker-starter
Version: 0.1.0
Summary: 开箱即用的 Dramatiq Worker 启动包
Author-email: seehar <seehar@qq.com>
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: dramatiq[redis,watch]>=2.0.1
Requires-Dist: pydantic>=2.0.0

# Dramatiq Worker Starter

开箱即用的 Dramatiq Worker 启动包，简化 Dramatiq Worker 的配置和启动流程。

## 特性

- **开箱即用** - 最少配置即可启动 Worker
- **可扩展** - 支持自定义中间件、队列、Actor
- **类型安全** - 完整的类型提示
- **配置灵活** - 通过参数传入配置
- **内置中间件**:
  - `TimingMiddleware` - 记录任务执行耗时
  - `WorkerInfoMiddleware` - Worker 心跳上报
  - `ReadableRedisBackend` - 可读的结果键名

## 安装

```bash
pip install dramatiq-worker-starter
```

或从源码安装：

```bash
git clone <repository-url>
cd dramatiq-worker-starter
pip install -e .
```

## 快速开始

### 定义 Actor

```python
from dramatiq_worker_starter import ActorBase

@ActorBase.actor(queue_name="default")
def hello_task(name: str) -> str:
    return f"Hello, {name}!"
```

### 启动 Worker

```python
from dramatiq_worker_starter import ActorBase, init_broker, run_worker
from dramatiq_worker_starter.utils import setup_logging

# 导入 Actor（必须在使用前导入）
from my_actors import hello_task

# 配置日志
setup_logging()

# 初始化 Broker
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 启动 Worker
if __name__ == "__main__":
    run_worker(broker)
```

### 发送任务

```python
from dramatiq_worker_starter import ActorBase, init_broker

# 初始化 Broker（与 Worker 使用相同的配置）
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 发送任务
message = ActorBase.send("hello_task", "Alice")
print(f"Task sent with message_id: {message.message_id}")

# 延迟发送任务（5秒后执行）
delayed_message = ActorBase.send_with_options("hello_task", "Bob", delay=5000)
```

## 配置

### init_broker 参数

```python
broker = init_broker(
    redis_host: str = "localhost",
    redis_port: int = 6379,
    redis_db: int = 0,
    redis_password: str = "",
    redis_db_result: int = 1,
    namespace: str = "dramatiq-result",
    heartbeat_interval: int = 30,
    worker_ttl: int = 120,
    custom_middleware: list | None = None,
)
```

## 模块说明

### ActorBase

Actor 基类，提供便捷的 Actor 定义方式：

```python
from dramatiq_worker_starter import ActorBase

@ActorBase.actor(
    queue_name="custom",
    max_retries=3,
    min_backoff=1000,
    max_backoff=30000,
)
def my_task(data: str) -> str:
    return data.upper()
```

### Worker

Worker 启动器，支持自定义配置：

```python
from dramatiq_worker_starter import Worker

worker = Worker(
    broker_instance=broker,
    queues=["default", "custom"],
    worker_threads=4,
    worker_timeout=3600000,
)
worker.start()
```

### Middleware

#### TimingMiddleware

记录任务执行耗时：

```python
from dramatiq_worker_starter import init_broker, TimingMiddleware

broker = init_broker(custom_middleware=[TimingMiddleware()])
```

#### WorkerInfoMiddleware

Worker 心跳上报：

```python
from dramatiq_worker_starter import init_broker, WorkerInfoMiddleware

broker = init_broker(
    custom_middleware=[WorkerInfoMiddleware(heartbeat_interval=15)]
)
```

#### 自定义中间件

```python
import dramatiq
from dramatiq.middleware import Middleware

class CustomMiddleware(Middleware):
    def before_process_message(self, broker, message):
        print(f"Task started: {message.actor_name}")

    def after_process_message(self, broker, message, *, result=None, exception=None):
        if exception:
            print(f"Task failed: {exception}")
        else:
            print(f"Task completed: {message.actor_name}")

broker = init_broker(custom_middleware=[CustomMiddleware()])
```

## 项目结构

```
dramatiq-worker-starter/
├── src/
│   └── dramatiq_worker_starter/
│       ├── __init__.py           # 包导出
│       ├── broker.py             # Broker 初始化
│       ├── middleware.py         # 内置中间件
│       ├── worker.py             # Worker 启动器
│       ├── actors/               # Actor 模块
│       │   ├── __init__.py
│       │   └── base.py           # Actor 基类
│       └── utils/                # 工具模块
│           ├── __init__.py
│           └── logger.py         # 日志工具
├── examples/                     # 使用示例
│   ├── simple/
│   │   ├── actors.py
│   │   ├── main.py
│   │   └── tasks.py
│   └── advanced/
│       ├── custom_middleware.py
│       ├── main.py
│       └── tasks.py
└── tests/                        # 测试用例
```

## 示例

### Simple Example

启动 Worker：

```bash
python -m examples.simple.main
```

发送任务：

```bash
python -m examples.simple.tasks
```

### Advanced Example

启动 Worker（自定义配置）：

```bash
python -m examples.advanced.main
```

发送任务并查询结果：

```bash
python -m examples.advanced.tasks
```

## Redis 数据结构

### Worker 信息

- `dramatiq:workers:active` (ZSET): 活跃 Worker 集合
- `dramatiq:worker:{worker_id}:actors` (SET): Worker 支持的 Actor 列表
- `dramatiq:worker:{worker_id}:info` (HASH): Worker 详细信息

### 结果存储

- `dramatiq-result:{queue_name}:{actor_name}:{message_id}` (LIST): 任务结果

结果格式：

```json
{
  "result": { /* 实际结果 */ },
  "_timing": {
    "start_datetime": "2026-02-12T12:00:00.000000",
    "end_datetime": "2026-02-12T12:00:05.000000",
    "duration_ms": 5000,
    "queue_name": "default",
    "actor_name": "hello_task",
    "worker_id": "abc12345",
    "exception": null
  }
}
```

## 依赖

- `dramatiq[redis,watch]>=2.0.1`
- `pydantic>=2.12.5`

## 许可证

MIT License

## 贡献

欢迎提交 Issue 和 Pull Request！
