Metadata-Version: 2.4
Name: lsyzwmworkcore
Version: 0.0.4
Summary: lsyzwm-work-core - 简化work任务分发与执行的核心库
Author: 9kl
License-Expression: MIT
Keywords: lsyzwm-work-core
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: Twisted==24.11.0
Requires-Dist: lsyzwm-master-sdk==0.0.9
Provides-Extra: dev
Provides-Extra: test
Dynamic: license-file

# lsyzwmworkcore

简化 Worker 任务分发与执行的核心库，基于 ZooKeeper 实现分布式任务调度。

## 特性

- 🚀 **简单易用** - 继承 `WorkerBase`，实现 `process_task` 方法即可
- 🔄 **生命周期钩子** - 提供 `on_task_started`、`on_task_completed`、`on_task_failed` 钩子方法
- 📡 **自动监听** - 基于 ZooKeeper Watch 机制自动监听任务变化
- 🔧 **增量处理** - 内置 `NodeChangeTracker` 避免重复处理
- ⏰ **定时任务** - 支持延时任务、Cron 任务、间隔任务
- 💾 **缓存支持** - Worker 级别和实例级别的缓存节点管理

## 安装

```bash
pip install lsyzwmworkcore
```

## 快速开始

### 1. 定义 Worker

```python
from lsyzwmworkcore.base import WorkerBase
from lsyzwm_master_sdk import MasterZooClient


class MyWorker(WorkerBase):
    @property
    def worker_name(self) -> str:
        return "my_worker"

    def process_task(self, task_id: str) -> None:
        """处理单个任务（必须实现）"""
        # 获取任务数据
        data = self.get_task_value(task_id, as_json=True)
        
        # 处理业务逻辑
        print(f"处理任务: {task_id}, 数据: {data}")

    # 可选：重载钩子方法
    def on_task_started(self, task_id: str) -> None:
        super().on_task_started(task_id)
        print(f"任务开始: {task_id}")

    def on_task_completed(self, task_id: str) -> None:
        super().on_task_completed(task_id)
        print(f"任务完成: {task_id}")

    def on_task_failed(self, task_id: str, exception: Exception = None) -> None:
        super().on_task_failed(task_id, exception)
        print(f"任务失败: {task_id}, 错误: {exception}")
```

### 2. 启动 Worker

```python
from lsyzwm_master_sdk import MasterZooClient

# 初始化 ZooKeeper 客户端
zk_client = MasterZooClient(hosts="localhost:2181")
zk_client.start()

# 创建并启动 Worker
worker = MyWorker(sid=1, zk_client=zk_client)
worker.register()      # 注册 Worker 实例
worker.watch_tasks()   # 开始监听任务
```

### 3. 使用 WorkerManager（可选）

```python
from lsyzwmworkcore.worker_manager import worker_manager

# 注册 Worker 类
worker_manager.register_worker_class(MyWorker)

# 获取 Worker 实例
worker = worker_manager.get_worker("my_worker")

# 获取所有 Worker
all_workers = worker_manager.get_all_workers()
```

## 任务生命周期

```
on_task_started(task_id)      # 钩子：任务开始，默认标记为处理中
        ↓
process_task(task_id)         # 核心：子类实现业务逻辑
        ↓
on_task_completed(task_id)    # 钩子：任务成功，默认标记完成并删除节点
        或
on_task_failed(task_id, ex)   # 钩子：任务失败，默认标记失败并删除节点
```

## 核心组件

### WorkerBase

Worker 抽象基类，提供：

| 方法 | 说明 |
|------|------|
| `process_task(task_id)` | **抽象方法**，子类必须实现 |
| `on_task_started(task_id)` | 钩子：任务开始 |
| `on_task_completed(task_id)` | 钩子：任务完成 |
| `on_task_failed(task_id, exception)` | 钩子：任务失败 |
| `register()` | 注册 Worker 实例 |
| `watch_tasks()` | 监听任务节点变化 |
| `get_task_value(task_id, as_json)` | 获取任务数据 |
| `add_task_node(worker_name, payload, ...)` | 添加任务到其他 Worker |
| `add_self_task_node(payload, ...)` | 添加任务到当前实例 |

### NodeChangeTracker

节点变更跟踪器，用于增量检测：

```python
tracker = NodeChangeTracker(worker_name="my_worker")

# 获取新增节点
new_nodes = tracker.get_new_nodes(current_node_ids)

# 标记状态
tracker.mark_processing(node_id)
tracker.mark_completed(node_id)
tracker.mark_failed(node_id, remove_from_processing=True)
```

### WorkerManager

Worker 管理器（单例模式）：

```python
from lsyzwmworkcore.worker_manager import worker_manager

worker_manager.register_worker_class(MyWorker)
worker = worker_manager.get_worker("my_worker")
names = worker_manager.get_worker_names()
```

## 定时任务

```python
# 延时任务
worker.create_delay_job_node(
    job_id="job_001",
    worker_name="target_worker",
    payload={"action": "process"},
    delay_ts=int(time.time()) + 3600,  # 1小时后执行
    who=worker.worker_id
)

# Cron 任务
worker.create_cron_job_node(
    job_id="job_002",
    worker_name="target_worker",
    payload={"action": "daily_report"},
    cron="0 9 * * *",  # 每天9点执行
    who=worker.worker_id
)

# 间隔任务
worker.create_interval_job_node(
    job_id="job_003",
    worker_name="target_worker",
    payload={"action": "heartbeat"},
    who=worker.worker_id,
    minutes=5  # 每5分钟执行
)
```

## 缓存管理

```python
# Worker 级别缓存（所有实例共享）
worker.set_worker_cache_value("config", {"key": "value"})
config = worker.get_worker_cache_value("config", as_json=True)

# 实例级别缓存（仅当前实例）
worker.set_worker_instance_cache_value("state", {"status": "running"})
state = worker.get_worker_instance_cache_value("state", as_json=True)
```

## 依赖

- Python >= 3.8
- Twisted == 24.11.0
- lsyzwm-master-sdk == 0.0.9

## License

MIT
