Metadata-Version: 2.1
Name: nb_cron_nb
Version: 0.1.1
Summary: A powerful and simple cron job scheduler that dominates APScheduler
Author: ydf0509
License: MIT
Project-URL: Homepage, https://github.com/ydf0509/nb_cron
Project-URL: Repository, https://github.com/ydf0509/nb_cron
Keywords: cron,scheduler,distributed,job,task
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: redis
Provides-Extra: mongo
Provides-Extra: sqlalchemy
Provides-Extra: fastapi
Provides-Extra: flask
Provides-Extra: django
Provides-Extra: all
Provides-Extra: dev


# 🚀 nb_cron

**下一代 Python 分布式定时任务框架（全面超越 APScheduler）**

nb_cron 是一个强大、极简且**专为云原生架构设计**的定时任务调度库。它不仅彻底解决了 APScheduler 常年存在的序列化崩溃、多实例重复执行等痛点，更在架构理念上实现了**“业务逻辑与调度配置的物理隔离”**，让 Python 定时任务的管理进入真正的现代化阶段。

### 🥊 核心痛点对决：APScheduler vs nb_cron

| 痛点场景 | 😭 APScheduler 的历史包袱 | 🤩 nb_cron 的现代解决方案 |
| :--- | :--- | :--- |
| **云原生多副本部署**<br>*(K8s/多容器)* | 致命弱点：多实例会**重复执行任务**，需手动硬编码第三方 Redis 锁，极易死锁。 | **天生云原生**：内置极其可靠的分布式锁（Redis/Mongo/SQL），天然支持 K8s 多副本平滑扩容，保证 **exactly-once**（绝不重复执行）。 |
| **微服务跨项目调度** | 强耦合：任务代码和调度器必须在同一个项目中，无法集中化管理。 | **跨 Git 项目可视化编排**：首创业务与调度解耦机制。A 项目只写函数，B 项目（调度中心）通过 Web UI 动态下发定时配置。 |
| **代码重构与序列化** | Pickle 地狱：存入 DB 的是函数内存地址，一旦代码重构（改名/移动文件路径），反序列化直接崩溃，任务全线瘫痪。 | **彻底抛弃 Pickle**：首创 `@cron_register` 稳定名称注册表，存入 DB 的仅是纯字符串。代码随便重构，只要名字在，任务照样跑。 |
| **可视化管理后台** | 官方**没有 UI**，想要启停任务、看日志只能自己从头手搓前后端。 | **原生自带 Web UI**：内置开箱即用的 Vue3 + Element Plus 现代化后台。**前端已预编译到 `nb_cron/web`，Python 开发者无需安装 Node.js 即可一键启动。** |
| **精度与时区** | Cron 不支持秒级精度；时区配置与 Misfire 策略行为混乱。 | **强制 6 字段 Cron**（支持秒级）；默认本地时区（可传 `tz`），Misfire 容忍策略极简可控。 |
| **多项目共享存储** | 多项目共用一个 Redis 时极易发生 Key 冲突，互相踩踏任务。 | **强制物理隔离**：初始化必须传 `name` 参数，按项目名称进行 Redis Key 的绝对隔离。 |
| **选择困难症** | 提供 7 种 Scheduler 类（Blocking, Background, AsyncIO...），新手永远选错。 | **大道至简**：全局只有一个 `NbCron` 类，永远在后台非阻塞运行，同时兼容同步与 `async` 异步函数。 |
| **集群分布式消费** | 只能在本地调度并执行，面对海量重计算任务力不从心。 | **一键变身分布式 MQ**：支持无缝切换至 `FunboostExecutor`，瞬间获得**失败重试、指数退避、超时杀死**等工业级分布式消息队列消费能力。 |

### ✨ 为什么 nb_cron 是“下一代”框架？

传统的定时任务框架，往往把**“业务代码”**和**“定时规则（如每天凌晨两点）”**死死绑在一起。
nb_cron 带来了全新的架构理念：

1. **配置即数据，无需重启服务**：通过 Web UI 随时修改任务的 Cron 表达式或启停任务，配置实时写入 Redis 并生效，你的业务进程**全程无需重启**。
2. **函数定义与任务调度的物理分离**：让后端开发人员只管专心写业务函数并打上 `@cron_register` 标记；让运维或运营人员在独立的 Web 页面上，通过下拉菜单选择已注册的函数，在nb_cron_ui 的前端去创建定时任务。


## 安装

> **⚠️ 重要提示**：PyPI 上的包名是 `nb_cron_nb`（不是 `nb_cron`），因为 `nb_cron` 已被他人占用。**代码中的 import 仍然是 `from nb_cron import ...`**，只是安装时用 `pip install nb_cron_nb`。

```bash
# 基础安装（内存存储）
pip install nb_cron_nb

# Redis 存储 + FastAPI Web（推荐，生产环境一行搞定）
pip install nb_cron_nb[redis,fastapi]

# Redis + Flask
pip install nb_cron_nb[redis,flask]

# 全部功能
pip install nb_cron_nb[all]
```

各可选组件：

| 组件 | 安装命令 | 说明 |
|---|---|---|
| redis | `pip install nb_cron_nb[redis]` | Redis 存储 + 分布式锁 |
| mongo | `pip install nb_cron_nb[mongo]` | MongoDB 存储 + 分布式锁 |
| sqlalchemy | `pip install nb_cron_nb[sqlalchemy]` | SQLite/MySQL/PostgreSQL 存储 |
| fastapi | `pip install nb_cron_nb[fastapi]` | FastAPI Web 框架集成 |
| flask | `pip install nb_cron_nb[flask]` | Flask Web 框架集成 |
| django | `pip install nb_cron_nb[django]` | Django + Ninja 框架集成 |

## 快速开始

### 最简用法

```python
from nb_cron import NbCron, cron_register

cron = NbCron("my_project")  # name 必传，隔离不同项目

@cron.job("0 */5 * * * *")  # 每5分钟执行（6字段：秒 分 时 日 月 周）
@cron_register('my_task')   # 必须注册稳定名称
def my_task():
    print("Hello nb_cron!")

cron.start()  # 不阻塞，定时任务后台运行，进程不会退出
```

### 完整示例

```python
from datetime import timedelta, timezone
from nb_cron import NbCron, cron_register, add_cron_register, explain_cron

# 创建调度器（name 必传，选一个存储后端）
cron = NbCron("my_project")                                      # 内存存储
# cron = NbCron("my_project", "redis://localhost:6379/0")         # Redis（推荐）
# cron = NbCron("my_project", tz=timezone(timedelta(hours=8)))    # 指定东八区
# cron = NbCron("my_project", tz=timezone.utc)                    # UTC

# ── 装饰器方式（@cron_register 在下，@cron.job 在上） ──

# 无参数任务
@cron.job("0 */5 * * * *")
@cron_register('report')
def report_task():
    print("生成报告")

# 有参数任务（必须在装饰器中传 args/kwargs）
@cron.job("0 30 9 * * * *", args=("admin@example.com",), kwargs={"report_type": "daily"})
@cron_register('send_report_email')
def send_report_email(to_address: str, report_type: str = "daily"):
    print(f"发送{report_type}报表到 {to_address}")

# 有参数任务（装饰器中直接传参）
@cron.job("0 0 2 * * *", args=("backup_db",), kwargs={"compress": True})
@cron_register('backup')
def backup_database(db_name: str, compress: bool = False):
    print(f"备份数据库 {db_name}, 压缩={compress}")

# 异步任务也支持
@cron.job("30 0 9 * * 1-5", trigger="cron")
@cron_register('async_work')
async def async_task():
    print("异步任务也支持！")

# 间隔任务
@cron.job("@every 30s", trigger="interval")
@cron_register('heartbeat')
def heartbeat():
    print("心跳")

# 日期任务（一次性）
@cron.job("2026-10-01 09:00:00", trigger="date")
@cron_register('national_day')
def national_day_task():
    print("国庆节任务")

# ── 非装饰器写法（适合第三方函数或动态注册） ──

# 方式 1：add_cron_register + add_job
def send_sms(phone: str, message: str):
    """发送短信（第三方函数）"""
    print(f"发送短信到 {phone}: {message}")

add_cron_register('send_sms', send_sms)
cron.add_job(
    'send_sms',
    "0 0 8 * * *",
    trigger="cron",
    job_id="morning_sms",
    name="早安短信",
    args=("13800138000", "早上好！"),
)

# 方式 2：直接 add_job（自动注册）
from nb_cron import cron_register

def cleanup_logs(days: int = 7):
    """清理日志"""
    print(f"清理{days}天前的日志")

cron_register('cleanup_logs', cleanup_logs)
cron.add_job(
    cleanup_logs,  # 传函数对象
    "0 0 3 * * 0",
    trigger="cron",
    job_id="weekly_cleanup",
    name="每周清理",
    kwargs={"days": 30},  # 覆盖默认参数
)

# 方式 3：批量注册任务
def process_order(order_id: int):
    """处理订单"""
    print(f"处理订单 {order_id}")

add_cron_register('process_order', process_order)

# 动态添加多个不同参数的任务
cron.add_job('process_order', "@every 5m", job_id="process_order_batch1", args=(1001,))
cron.add_job('process_order', "@every 5m", job_id="process_order_batch2", args=(1002,))
cron.add_job('process_order', "@every 5m", job_id="process_order_batch3", args=(1003,))

# ── 启动 ──
cron.start()

# ── 管理 ──
cron.pause_job("daily_backup")
cron.resume_job("daily_backup")
cron.trigger_job("daily_backup")
cron.remove_job("daily_backup")
jobs = cron.get_jobs()

# ── Cron 翻译 ──
print(explain_cron("0 30 9 * * *", "zh"))    # "每天 09:30:00 执行"
print(explain_cron("0 30 9 * * *", "en"))    # "At 09:30:00, every day"
```

---

## 项目隔离（name 参数）

`NbCron` 的第一个参数 `name` 是**必传**的，用于隔离不同项目的数据：

- **Redis**: keys 格式为 `nb_cron:{name}:jobs`、`nb_cron:{name}:metrics`、`nb_cron:{name}:due`、`nb_cron:{name}:lock:*`
- **MongoDB**: collections 为 `nb_cron_{name}_jobs`、`nb_cron_{name}_metrics`、`nb_cron_{name}_locks`
- **SQLAlchemy**: 表名为 `nb_cron_{name}_jobs`、`nb_cron_{name}_metrics`、`nb_cron_{name}_locks`
- **Web UI**: 侧边栏标题显示 `name`，方便区分

```python
# 同一个 Redis，不同项目互不干扰
cron_a = NbCron("billing_service", "redis://localhost:6379/0")
cron_b = NbCron("user_service", "redis://localhost:6379/0")

# cron_a 只看到 billing_service:jobs 下的任务
# cron_b 只看到 user_service:jobs 下的任务
```

不传 `name` 或传空字符串会直接报错：

```python
NbCron("")   # ValueError: NbCron name 不能为空
NbCron()     # TypeError: missing required argument 'name'
```

---

## 时区支持

nb_cron 默认使用**本地时区**。所有时间（`next_run_time`、日期表达式解析等）都基于调度器的时区。

```python
from datetime import timedelta, timezone

# 默认：本地时区（推荐）
cron = NbCron("my_project")

# 显式指定时区
cron = NbCron("my_project", tz=timezone(timedelta(hours=8)))   # 东八区
cron = NbCron("my_project", tz=timezone.utc)                   # UTC

# Python 3.9+ 可用 zoneinfo
from zoneinfo import ZoneInfo
cron = NbCron("my_project", tz=ZoneInfo("Asia/Shanghai"))
```

`tz` 参数接受任何 `datetime.tzinfo` 对象。

---

## 三种触发器类型

nb_cron 支持三种触发器类型，通过 `trigger` 参数显式指定（也可以不传，自动推断）：

| trigger 值 | 含义 | expression 示例 |
|---|---|---|
| `"cron"` | 6 字段 cron 表达式 | `"0 */5 * * * *"` |
| `"interval"` | 固定间隔重复执行 | `"@every 30s"`, `"5m"`, `"2h"` |
| `"date"` | 指定时间执行一次 | `"2026-10-01 09:00:00"`, `"2026年10月01日"` |

### 自动推断 vs 显式指定

```python
# 自动推断（不传 trigger，nb_cron 自动判断）
@cron.job("0 */5 * * * *")              # → cron
@cron.job("@every 30s")                 # → interval
@cron.job("2026-10-01 09:00:00")        # → date

# 显式指定（推荐，语义更清晰）
@cron.job("0 */5 * * * *", trigger="cron")
@cron.job("@every 30s", trigger="interval")
@cron.job("2026-10-01 09:00:00", trigger="date")

# trigger="interval" 时支持简写（不需要 @every 前缀）
@cron.job("30s", trigger="interval")     # 等价于 @every 30s
@cron.job("5m", trigger="interval")      # 等价于 @every 5m
@cron.job("2h", trigger="interval")      # 等价于 @every 2h
```

> **注意：** 以上所有 `@cron.job` 下面都需要 `@cron_register('名称')` 装饰器。

---

## 函数注册（强制）

nb_cron **强制要求**所有定时函数必须通过 `@cron_register` 注册一个**稳定名称**（`cron_func_name`）。
这确保函数标识不依赖文件路径——重命名文件、移动函数不会影响调度。

```python
from nb_cron import cron_register, add_cron_register

# 装饰器注册
@cron_register('daily_backup')
def backup_db():
    print("备份")

backup_db.cron_func_name  # → 'daily_backup'  （IDE 可补全）

# 函数调用注册（适合第三方函数）
def send_email():
    print("发邮件")
add_cron_register('send_email', send_email)

# 也支持 cron_register 两参数形式
cron_register('send_email', send_email)
```

### 与 `@cron.job` 配合

`@cron_register` 放下面（靠近函数），`@cron.job` 放上面：

```python
@cron.job("0 0 2 * * *", trigger="cron")     # 第二步：读 .cron_func_name，注册调度
@cron_register('daily_backup')                # 第一步：设 .cron_func_name，注册函数
def backup_db():
    print("备份")
```

### `add_job` 三种传参方式

```python
# 1. 传函数对象（自动读 .cron_func_name）
cron.add_job(backup_db, "0 0 2 * * *", trigger="cron")

# 2. 传注册名字符串
cron.add_job('daily_backup', "0 0 2 * * *", trigger="cron")

# 3. 传 .cron_func_name（IDE 安全，等价于方式 2）
cron.add_job(backup_db.cron_func_name, "0 0 2 * * *", trigger="cron")

# 4. 带参数的任务（args 和 kwargs）
def send_email(to: str, subject: str, body: str = ""):
    print(f"发送邮件到 {to}: {subject}")

cron_register('send_email', send_email)

# 在装饰器中传参
@cron.job("0 9 * * * *", args=("admin@example.com", "日报"), kwargs={"body": "这是日报内容"})
@cron_register('daily_report_email')
def daily_report_email(to: str, subject: str, body: str = ""):
    print(f"发送日报到 {to}")

# 或在 add_job 中传参
cron.add_job(
    'send_email',
    "0 9 * * * *",
    trigger="cron",
    job_id="morning_email",
    args=("user@example.com", "晨报"),
    kwargs={"body": "这是晨报内容"},
)

# 5. 批量添加同函数不同参数的任务
def process_batch(batch_id: int):
    print(f"处理批次 {batch_id}")

cron_register('process_batch', process_batch)
cron.add_job('process_batch', "@every 10m", job_id="batch_1", args=(1,))
cron.add_job('process_batch', "@every 10m", job_id="batch_2", args=(2,))
cron.add_job('process_batch', "@every 10m", job_id="batch_3", args=(3,))
```

### 未注册直接报错

```python
@cron.job("0 */5 * * * *")
def simple_task():          # ❌ ValueError: 函数 'simple_task' 未注册 cron_func_name
    pass
```

---

## 函数找不到的处理

如果 Redis 中存储了某个 job 但对应的定时函数没有被导入或已被删除：

- **失败次数 +1**（计入 metrics）
- **job 状态变为 `error`**
- **前端显示红色"异常"标签**，不再显示"运行中"误导用户
- **日志打印 ERROR 级别信息**
- **继续调度**（下次触发时再次尝试，便于热修复后自动恢复）

---

## Web UI 管理后台（重点）

nb_cron 自带漂亮的管理后台，包含：
- **仪表盘**：任务总数、运行中/已暂停/异常卡片、24小时执行趋势图、成功率饼图
- **任务管理**：列表搜索/筛选、暂停/恢复/立即执行/删除操作、新建任务对话框
- **任务详情**：执行指标图表、最近10次执行记录、错误日志
- **Cron 工具**：Cron 表达式翻译器
- **中英文切换**

支持 **FastAPI、Flask、Django** 三种框架一键启动。

---

### 方式一：FastAPI 启动（推荐）

```bash
pip install nb_cron_nb[redis,fastapi]
```

创建 `app.py`：

```python
from nb_cron import NbCron, cron_register
from nb_cron.web import get_fastapi_app

cron = NbCron("my_project", "redis://localhost:6379/0")

# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳检测")
@cron_register('heartbeat')
def heartbeat():
    print("heartbeat OK")

# 有参数任务（必须在装饰器中传 args/kwargs）
@cron.job("0 */5 * * * *", trigger="cron", name="数据同步", args=(), kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
    print(f"sync from {source} to {target}")

# 有参数任务（装饰器中传参）
@cron.job("0 30 2 * * *", trigger="cron", name="每日备份", args=("prod_db",), kwargs={"compress": True})
@cron_register('daily_backup')
def daily_backup(db_name: str, compress: bool = False):
    print(f"backup {db_name}, compress={compress}")

# 非装饰器写法：第三方函数
def send_email(to: str, subject: str, body: str):
    """发送邮件（第三方库函数）"""
    print(f"邮件已发送到 {to}: {subject}")

cron_register('send_email', send_email)
cron.add_job(
    'send_email',
    "0 0 9 * * 1-5",
    trigger="cron",
    job_id="morning_report_email",
    name="晨报邮件",
    args=("admin@example.com", "每日晨报", "这是晨报内容"),
)

# 批量添加同函数不同参数的任务
def process_queue(queue_name: str):
    """处理队列"""
    print(f"processing queue: {queue_name}")

cron_register('process_queue', process_queue)
cron.add_job('process_queue', "@every 1m", job_id="process_queue_1", args=("queue_1",))
cron.add_job('process_queue', "@every 1m", job_id="process_queue_2", args=("queue_2",))
cron.add_job('process_queue', "@every 1m", job_id="process_queue_3", args=("queue_3",))

app = get_fastapi_app(cron)

@app.on_event("startup")
def startup():
    cron.start()

@app.on_event("shutdown")
def shutdown():
    cron.stop()
```

启动：

```bash
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
```

打开浏览器访问：

| 地址 | 说明 |
|---|---|
| http://localhost:8000/nb_cron/ui/ | 管理后台 UI 页面 |
| http://localhost:8000/nb_cron/api/jobs | REST API - 任务列表 |
| http://localhost:8000/nb_cron/api/health | 健康检查（含时区信息） |
| http://localhost:8000/nb_cron/api/dashboard/stats | 仪表盘统计数据 |
| http://localhost:8000/nb_cron/api/cron/explain?expression=0+*/5+*+*+*+* | Cron 翻译 |
| http://localhost:8000/docs | FastAPI 自动生成的 Swagger 文档 |

---

### 方式二：Flask 启动

```bash
pip install nb_cron_nb[redis,flask]
```

创建 `app.py`：

```python
from nb_cron import NbCron, cron_register
from nb_cron.web import get_flask_app

cron = NbCron("my_project", "redis://localhost:6379/0")

# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳")
@cron_register('heartbeat')
def heartbeat():
    print("heartbeat OK")

# 有参数任务（必须在装饰器中传 args/kwargs）
@cron.job("0 */5 * * * *", trigger="cron", name="同步", kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
    print(f"sync from {source} to {target}")

# 非装饰器写法
def cleanup(days: int = 7):
    """清理日志"""
    print(f"cleanup logs older than {days} days")

cron_register('cleanup', cleanup)
cron.add_job(
    'cleanup',
    "0 0 3 * * 0",
    trigger="cron",
    job_id="weekly_cleanup",
    name="每周清理",
    kwargs={"days": 30},
)

app = get_flask_app(cron)

if __name__ == "__main__":
    cron.start()
    app.run(host="0.0.0.0", port=5000, debug=False)
```

启动：

```bash
# 开发模式
python app.py

# 生产模式（注意: 只用 1 个 worker，或用 Redis 存储自动防重复）
gunicorn app:app -w 1 -b 0.0.0.0:5000
```

访问 http://localhost:5000/nb_cron/ui/

---

### 方式三：Django 启动

```bash
pip install nb_cron_nb[redis,django]
```

**Step 1** — 创建调度器配置文件 `your_project/cron_config.py`：

```python
from nb_cron import NbCron, cron_register

cron = NbCron("my_project", "redis://localhost:6379/0")

# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳")
@cron_register('heartbeat')
def heartbeat():
    print("heartbeat OK")

# 有参数任务（必须在装饰器中传 args/kwargs）
@cron.job("0 */5 * * * *", trigger="cron", name="同步", kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
    print(f"sync from {source} to {target}")

# 非装饰器写法：批量添加任务
def process_order(order_id: int):
    """处理订单"""
    print(f"processing order {order_id}")

cron_register('process_order', process_order)
cron.add_job('process_order', "@every 5m", job_id="process_order_1", args=(1001,))
cron.add_job('process_order', "@every 5m", job_id="process_order_2", args=(1002,))
cron.add_job('process_order', "@every 5m", job_id="process_order_3", args=(1003,))
```

**Step 2** — 在 `urls.py` 中挂载路由：

```python
from django.contrib import admin
from django.urls import path
from your_project.cron_config import cron
from nb_cron.web import get_django_urls

urlpatterns = [
    path('admin/', admin.site.urls),
] + get_django_urls(cron)
```

**Step 3** — 在 `apps.py` 中启动调度器（防止 reload 重复启动）：

```python
import os
from django.apps import AppConfig

class YourAppConfig(AppConfig):
    name = 'your_app'

    def ready(self):
        if os.environ.get('RUN_MAIN') == 'true':
            from your_project.cron_config import cron
            cron.start()
```

启动：

```bash
python manage.py runserver 0.0.0.0:8000
```

访问 http://localhost:8000/nb_cron/ui/

---

## 前端 UI 构建说明

nb_cron 的管理后台前端源码位于 `nb_cron_ui/` 目录，使用以下技术栈：

- **Vue 3** — 响应式前端框架
- **Element Plus** — UI 组件库
- **ECharts** — 图表库
- **Pinia** — 状态管理
- **Vue I18n** — 中英文国际化
- **Vue Router** — 路由管理
- **Vite** — 构建工具

### 构建前端（发布前必须执行）

```bash
cd nb_cron_ui
npm install          # 安装依赖
npm run build        # 编译，输出到 nb_cron/web/static/
```

构建完成后，`nb_cron/web/static/` 目录下会生成 `index.html` 和 `assets/` 目录，
Python 后端会自动读取并在 `/nb_cron/ui/` 路径下提供服务。

### 前端开发模式

如果你要修改前端代码：

```bash
# 终端 1：启动后端 API（以 FastAPI 为例）
uvicorn your_app:app --port 8000

# 终端 2：启动前端开发服务器（自动代理 API 到 8000 端口）
cd nb_cron_ui
npm run dev
```

Vite 开发服务器会自动将 `/nb_cron/api/` 请求代理到 `http://localhost:8000`，
实现前后端分离开发、热更新。

### 前端目录结构

```
nb_cron_ui/
├── package.json              # 依赖配置
├── vite.config.js            # Vite 配置（base路径、构建输出、API代理）
├── index.html                # 入口 HTML
├── src/
│   ├── main.js               # Vue 应用入口
│   ├── App.vue               # 根组件
│   ├── router/index.js       # 路由配置
│   ├── stores/app.js         # Pinia 状态管理（侧边栏、标签页、语言）
│   ├── i18n/                 # 国际化
│   │   ├── index.js          # i18n 配置
│   │   ├── zh.js             # 中文翻译
│   │   └── en.js             # 英文翻译
│   ├── api/index.js          # Axios API 封装
│   ├── views/
│   │   ├── Dashboard.vue     # 仪表盘（统计卡片 + ECharts 图表）
│   │   ├── JobList.vue       # 任务列表（搜索/操作/状态展示/新建任务）
│   │   ├── JobDetail.vue     # 任务详情（指标 + 执行记录 + 错误日志）
│   │   └── CronTool.vue      # Cron 表达式翻译工具
│   └── components/
│       ├── AppLayout.vue     # 整体布局（侧边栏 + 顶栏 + 内容区）
│       ├── Sidebar.vue       # 左侧导航栏
│       └── TabsBar.vue       # 右侧多标签栏
```

---

## Cron 表达式

nb_cron **强制 6 字段** cron 表达式，消除歧义：

```
┌──────────── 秒 (0-59)
│ ┌────────── 分 (0-59)
│ │ ┌──────── 时 (0-23)
│ │ │ ┌────── 日 (1-31)
│ │ │ │ ┌──── 月 (1-12)
│ │ │ │ │ ┌── 周 (0-6, 0=周日)
│ │ │ │ │ │
* * * * * *
```

**传入 5 字段会直接报错**，避免用户写错。

### 常用示例

| 表达式 | 含义 |
|---|---|
| `* * * * * *` | 每秒 |
| `0 * * * * *` | 每分钟整秒 |
| `*/10 * * * * *` | 每10秒 |
| `0 */5 * * * *` | 每5分钟 |
| `0 0 * * * *` | 每小时整点 |
| `0 30 9 * * *` | 每天 09:30:00 |
| `0 0 9 * * 1-5` | 工作日 09:00:00 |
| `0 0 0 1 * *` | 每月1号 00:00:00 |
| `0 0 2 * * 0` | 每周日 02:00:00 |

### 间隔表达式

除了 cron，也支持 `@every` 简写（trigger 自动推断为 `interval`）：

```python
@cron.job("@every 30s")                        # 每30秒（自动推断）
@cron.job("@every 5m", trigger="interval")     # 每5分钟（显式指定）
@cron.job("2h", trigger="interval")            # 每2小时（简写，显式指定时可省略 @every）
@cron.job("@every 1d")                         # 每天
@cron.job("@every 1w")                         # 每周
```

### 日期表达式（一次性任务）

支持多种日期格式（trigger 自动推断为 `date`），日期按调度器的时区解析：

```python
@cron.job("2026-10-01 09:00:00", trigger="date")     # ISO 格式
@cron.job("2026-10-01", trigger="date")               # 仅日期（当天 00:00:00）
@cron.job("2026/10/01 09:00:00", trigger="date")      # 斜线分隔
@cron.job("2026年10月01日", trigger="date")             # 中文日期
```

### Cron 翻译功能

```python
from nb_cron import explain_cron

print(explain_cron("0 30 9 * * *", "zh"))     # "每天09:30:00执行"
print(explain_cron("0 30 9 * * *", "en"))     # "At 09:30:00, every day"
print(explain_cron("0 0 9 * * 1-5", "zh"))    # "每周一至周五09:00:00执行"
print(explain_cron("*/5 * * * * *", "zh"))     # "每5秒执行"
```

REST API 也支持翻译：`GET /nb_cron/api/cron/explain?expression=0+*/5+*+*+*+*`

---

## 分布式部署

nb_cron 在 Redis/MongoDB/SQLAlchemy 存储模式下**自动支持分布式**：

```python
cron = NbCron("my_project", "redis://localhost:6379/0")
```

**原理：** 每次任务触发时，调度器先用 `SET NX PX` (Redis) 或 `findOneAndUpdate` (MongoDB) 获取分布式锁。锁的 key 包含任务 ID 和触发时间戳，确保同一次触发只有一个实例执行。

不需要额外配置，不需要 leader election，开箱即用。

---

## REST API

所有 API 前缀：`/nb_cron/api/`

| 方法 | 路径 | 说明 |
|---|---|---|
| GET | `/jobs` | 获取所有任务（含指标） |
| GET | `/jobs/{job_id}` | 获取单个任务详情 |
| POST | `/jobs` | 创建任务 |
| DELETE | `/jobs/{job_id}` | 删除任务 |
| POST | `/jobs/{job_id}/pause` | 暂停任务 |
| POST | `/jobs/{job_id}/resume` | 恢复任务 |
| POST | `/jobs/{job_id}/trigger` | 立即触发一次 |
| GET | `/jobs/{job_id}/metrics` | 获取任务指标 |
| GET | `/dashboard/stats` | 仪表盘统计（含 error_count） |
| GET | `/cron/explain?expression=...` | Cron 表达式翻译 |
| GET | `/functions` | 获取已注册函数列表 |
| GET | `/health` | 健康检查（含时区信息） |

### 创建任务 API

`POST /nb_cron/api/jobs`

```json
{
    "func_ref": "daily_backup",
    "expression": "0 0 2 * * *",
    "trigger": "cron",
    "job_id": "my_job",
    "name": "我的任务",
    "args": ["backup_db"],
    "kwargs": {"compress": true},
    "max_instances": 1
}
```

- `func_ref`: 函数的 `cron_func_name`（通过 `@cron_register` 注册的稳定名称）
- `trigger`: 可选 `"cron"` / `"interval"` / `"date"`，不传则自动推断
- `args`: 位置参数列表，会按顺序传给函数
- `kwargs`: 关键字参数字典，会作为命名参数传给函数

### 创建带参数的任务示例

```json
// 发送邮件任务
POST /nb_cron/api/jobs
{
    "func_ref": "send_email",
    "expression": "0 9 * * * *",
    "trigger": "cron",
    "job_id": "morning_email",
    "name": "晨报邮件",
    "args": ["admin@example.com", "每日晨报"],
    "kwargs": {"body": "这是晨报内容"}
}

// 批量处理任务（同函数不同参数）
POST /nb_cron/api/jobs
{
    "func_ref": "process_queue",
    "expression": "@every 1m",
    "trigger": "interval",
    "job_id": "process_queue_1",
    "name": "处理队列 1",
    "args": ["queue_1"]
}

POST /nb_cron/api/jobs
{
    "func_ref": "process_queue",
    "expression": "@every 1m",
    "trigger": "interval",
    "job_id": "process_queue_2",
    "name": "处理队列 2",
    "args": ["queue_2"]
}
```

### 获取已注册函数

`GET /nb_cron/api/functions`

从存储后端（Redis/MongoDB/SQLAlchemy）读取所有已注册的函数名称列表，支持跨 Git 项目共享。

**跨项目工作原理：**
- 项目 A 中用 `@cron_register` 标记的函数，会在 `cron.start()` 时自动同步到 Redis
- 项目 B 的 Web UI 通过此 API 读取 Redis 中的函数列表
- 即使项目 A 没有运行，函数列表依然可用（持久化在 Redis 中）

**示例：**
```bash
# 项目 A：只标记函数，不添加定时任务
@cron_register('send_email')
def send_email(to, subject):
    ...

@cron_register('generate_report')
def generate_report(type):
    ...

cron.start()  # 函数名自动同步到 Redis

# 项目 B：Web UI 调用 API
GET /nb_cron/api/functions
# 返回：{"functions": ["send_email", "generate_report"]}

# Web UI 下拉框显示这些函数，用户可以选择并创建定时任务
```

---

## 存储后端

| 后端 | URL 格式 | 分布式锁 | 适用场景 |
|---|---|---|---|
| Memory | `None`（默认） | 进程内锁 | 开发/单实例 |
| Redis | `redis://host:port/db` | SET NX PX | 生产/分布式（推荐） |
| MongoDB | `mongodb://host:port/db` | findOneAndUpdate | 生产/分布式 |
| SQLAlchemy | `sqlite:///path` / `mysql+pymysql://...` | INSERT conflict | 生产/分布式 |

---

## 任务指标

nb_cron 自动收集每个任务的执行指标（固定大小，不爆内存）：

- `total_runs` - 总执行次数
- `success_count` / `fail_count` - 成功/失败次数
- `last_run_at` - 最后执行时间
- `last_error` - 最后一次错误信息（截断500字符）
- `avg_duration_ms` / `max_duration_ms` / `min_duration_ms` - 执行耗时统计
- `recent_results` - 最近10次执行结果（环形缓冲区）
- `hourly_stats` - 24小时逐时统计（固定24个槽位）

Redis 中单个任务的指标占用不超过 2KB。

---

## 任务状态

| 状态 | 含义 | 前端显示 |
|---|---|---|
| `active` | 正常运行中 | 绿色 `运行中` |
| `paused` | 已暂停 | 黄色 `已暂停` |
| `error` | 定时函数未找到 | 红色 `异常` |

当函数未找到时（如函数被删除、模块未导入），job 自动标记为 `error` 状态并计入失败次数。修复函数后，下次触发会自动恢复。

---

## API 参考

### `NbCron(name, store_url=None, max_workers=20, tick_seconds=1.0, misfire_grace_seconds=60, tz=None)`

创建调度器实例。

- `name`: **必传**，调度器名称。用于隔离不同项目的 Redis keys / MongoDB collections / SQL 表。多个项目共享同一个 Redis 时互不干扰。UI 侧边栏会显示此名称
- `store_url`: 存储后端 URL，None 表示内存存储
- `max_workers`: 线程池大小
- `tick_seconds`: 调度循环间隔（秒）
- `misfire_grace_seconds`: misfire 容忍时间（超过此时间的错过任务会被跳过）
- `tz`: 时区，默认 `None` 使用本地时区。接受任何 `datetime.tzinfo` 对象

### `@cron.job(expression, *, trigger=None, job_id=None, name=None, args=(), kwargs=None, max_instances=1)`

装饰器，注册定时任务。同时支持同步函数和 async 函数。**函数必须先用 `@cron_register` 注册。**

- `expression`: 触发表达式（cron / 间隔 / 日期时间）
- `trigger`: 触发器类型，可选 `"cron"` / `"interval"` / `"date"`，不传则自动推断

### `cron.add_job(func, expression, *, trigger=None, job_id=None, name=None, ...)`

编程方式添加任务，返回 Job 对象。

- `func`: 函数对象 **或** `cron_func_name` 字符串（通过 `@cron_register` 注册的名称）
- `expression`: 触发表达式
- `trigger`: 触发器类型，可选 `"cron"` / `"interval"` / `"date"`，不传则自动推断

### `@cron_register(cron_func_name)` / `add_cron_register(cron_func_name, func)` / `cron_register(cron_func_name, func)`

给函数绑定路径无关的稳定名称。被装饰的函数会多出 `.cron_func_name` 属性。

```python
from nb_cron import cron_register, add_cron_register
```

### `cron.start()` / `cron.stop(wait=True)`

- `start()` **不阻塞**，立即返回，后面的代码继续执行。
- 主线程跑完后进程**不会退出**，定时任务持续运行。
- `Ctrl+C` 优雅停止。
- 不需要任何 `sleep` / `join` / `input` / `block` 参数。

### `@every` 间隔任务首次何时执行

`@every 5s` 等 **IntervalTrigger** 首次 `next_run` 为「注册时刻的 `now`」，会在**下一个调度 tick 内**执行第一次，**不会**先空等 5 秒；之后每隔 5 秒一次。

### `cron.pause_job(job_id)` / `cron.resume_job(job_id)`

暂停/恢复任务。

### `cron.trigger_job(job_id)`

立即执行一次（不等待下次触发时间）。

### `cron.get_jobs()` / `cron.get_job(job_id)` / `cron.remove_job(job_id)`

查询/删除任务。

### `explain_cron(expression, lang="en")`

翻译 cron 表达式为人类可读文本。`lang` 支持 `"zh"`（中文）和 `"en"`（英文）。

### `get_fastapi_app(cron)` / `get_flask_app(cron)` / `get_django_urls(cron)`

一行创建带 Web UI 的应用，分别返回 FastAPI app、Flask app、Django URL 列表。

```python
from nb_cron.web import get_fastapi_app, get_flask_app, get_django_urls
```

---

## 示例代码

完整可运行的示例代码在 `examples/` 目录下：

| 文件 | 说明 |
|---|---|
| `example_fastapi_redis.py` | FastAPI + Redis 完整示例，含多个任务 |
| `example_flask_redis.py` | Flask + Redis 完整示例 |
| `example_django_redis.py` | Django + Redis 集成指南（分步说明） |
| `example_memory_simple.py` | 最简示例，内存存储，无需 Redis |
| `demo_cross_git_project_manage_corn_tasks/proj1.py` | **跨项目示例 - 项目 1**：函数定义与定时任务 |
| `demo_cross_git_project_manage_corn_tasks/proj2_fastapi_cron.py` | **跨项目示例 - 项目 2**：Web UI 管理后台 |

---

## 跨 Git 项目示例（重点）

nb_cron 的核心特性：函数定义与任务调度分离，支持跨 Git 项目管理。

- **项目 1**（`proj1.py`）：业务项目，用 `@cron_register` 标记函数，函数名自动同步到 Redis
- **项目 2**（`proj2_fastapi_cron.py`）：FastAPI 管理后台，通过 Web UI 为项目 1 的函数添加定时任务

### 快速开始

```bash
# 终端 1：启动项目 1
cd examples/demo_cross_git_project_manage_corn_tasks
python proj1.py

# 终端 2：启动项目 2
uvicorn proj2_fastapi_cron:app --reload

# 访问 Web UI：http://localhost:8000/nb_cron/ui/
```

### 工作原理

```
项目 1 (业务项目)          Redis              项目 2 (管理后台)
@cron_register('func')   →  函数名列表  →   GET /api/functions
cron.start() 同步        →  job 配置    ←   POST /api/jobs 创建
执行函数 (本地进程)       ←  调度任务    ←   Web UI 操作
```

### 优势

- **职责分离**：项目 1 专注业务，项目 2 专注调度
- **安全**：只有 `@cron_register` 标记的函数才暴露
- **灵活**：项目 2 动态创建任务，无需修改项目 1 的代码
- **可维护**：项目 1 重构不影响项目 2 的调度

### 应用场景

微服务架构、多租户 SaaS、DevOps 自动化、数据平台等。

---

## Funboost 执行器（核弹级能力）

关于funboost的教程请参考：[https://funboost.readthedocs.io/zh-cn/latest/index.html](https://funboost.readthedocs.io/zh-cn/latest/index.html)


nb_cron 默认的执行器是在本地线程池中直接调用函数。但如果你将 `executor` 指定为 `FunboostExecutor`，nb_cron 的任务触发时**不在本地执行**，而是通过 funboost 的 `.push()` 把任务推送到消息队列（Redis / RabbitMQ / Kafka / MEMORY_QUEUE 等），由 funboost worker 进程消费执行。

这意味着你**瞬间获得了 funboost 的全部能力**：

### 为什么 FunboostExecutor 这么强？

因为 funboost 的 `BoosterParams` 提供了**工业级**的任务消费能力，一个参数类就覆盖了 99% 的调度和函数运行控制需求：

| 能力 | BoosterParams 参数 | 说明 |
|---|---|---|
| **30+ 种消息队列** | `broker_kind` | Redis / RabbitMQ / Kafka / RocketMQ / Celery / SQS ... 30+ 种中间件随意切换 |
| **精准控频** | `qps` | 指定每秒执行次数，支持小数（如 `0.01` = 每100秒1次），无需关心并发数 |
| **分布式控频** | `is_using_distributed_frequency_control` | 多个消费者实例共享同一 qps 限额，总频率不超 |
| **智能并发池** | `concurrent_num` + `concurrent_mode` | 线程/协程/协程+多进程/单线程，自适应扩缩容，任务少时自动缩减线程 |
| **自动重试** | `max_retry_times` | 函数出错自动重试，支持指数退避（`is_using_advanced_retry`） |
| **指数退避重试** | `advanced_retry_config` | `1s → 2s → 4s → 8s → 16s → 32s → 60s...`，支持 sleep 模式和 requeue 模式 |
| **死信队列** | `is_push_to_dlx_queue_when_retry_max_times` | 重试耗尽后自动进入死信队列，不丢消息 |
| **函数超时** | `function_timeout` | 运行超时自动杀死，防止任务卡死 |
| **消息过期** | `msg_expire_seconds` | 消息超过指定时间自动丢弃，不执行过期任务 |
| **任务去重** | `do_task_filtering` + `task_filtering_expire_seconds` | 相同参数的任务自动去重，防止重复执行 |
| **运行时间限制** | `allow_run_time_cron` | 只在指定 cron 时间段内消费执行，如 `* 9-17 * * 1-5` 仅工作日上班时间 |
| **执行结果持久化** | `function_result_status_persistance_conf` | 保存函数入参、运行结果、运行状态到 MongoDB，可追溯 |
| **RPC 模式** | `is_using_rpc_mode` | 发布端可同步获取消费端的执行结果 |
| **多进程消费** | `mp_consume(process_num=N)` | 协程/线程 + 多进程叠加，性能炸裂 |
| **消费者分组** | `booster_group` | 按业务分组启动消费者，灵活管理 |
| **自定义并发池** | `specify_concurrent_pool` | 多个消费者共享一个线程池，节约资源 |
| **async 支持** | `specify_async_loop` | 指定 event loop，支持 aiohttp 等要求同 loop 的异步库 |

### 安装

```bash
pip install nb_cron_nb[redis] funboost
```

### 基本用法

```python
from funboost import BoosterParams, BrokerEnum
from nb_cron import NbCron, cron_register
from nb_cron.executors.funboost_executor import FunboostExecutor

# 创建 funboost 执行器，BoosterParams 的所有参数都支持 IDE 自动补全
funboost_executor = FunboostExecutor(
    BoosterParams(
        queue_name="nb_cron_dispatch",       # 消息队列名
        broker_kind=BrokerEnum.REDIS,         # 中间件类型，30+ 种可选
        concurrent_num=50,                    # 并发数
        qps=20,                               # 精准控频：每秒最多执行 20 次
        max_retry_times=3,                    # 失败自动重试 3 次
        is_using_distributed_frequency_control=True,  # 分布式控频
    )
)

# NbCron 构造时自动调用 executor.bind_cron(self)
# worker 执行完后直接用 cron.metrics.record() 写指标，无需重建 store
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)

@cron.job("0 */5 * * * *", kwargs={"user_id": 42})
@cron_register('my_task')
def my_task(user_id: int):
    print(f"processing user {user_id}")

# 启动 funboost 消费者 + nb_cron 调度器
funboost_executor.consume()   # 单进程消费（非阻塞）
# funboost_executor.mp_consume(process_num=4)  # 多进程消费，性能炸裂
cron.start()
```

### 工作原理

```
nb_cron 调度器               funboost 消息队列              funboost worker
cron tick 触发任务  ─push()→  Redis/RabbitMQ/Kafka  ─consume()→  解析 cron_func_name
计算 next_run_time           持久化存储，不丢消息            FunctionRegistry.resolve()
分布式锁防重复                                               执行函数 + 上报 metrics
```

**关键区别**：默认执行器是「调度 + 执行在同一进程」，FunboostExecutor 是「调度端 push，消费端执行」，天然解耦。

### 高级用法

`BoosterParams` 的 入参非常丰富，各种函数控制功能都有，所以`nb_cron`的执行器可以充分借助`funboost`的威力，所以作者没有给`nb_cron`默认的本地线程池executor加太多功能，例如重试功能/超时杀死功能等。因为你即使没有安装任何消息队列，也可以 `BoosterParams(...,broker_kind=BrokerEnum.MEMORY_QUEUE)` 来使用funboost的内存队列。

#### 1. 指数退避重试

```python
funboost_executor = FunboostExecutor(
    BoosterParams(
        queue_name="nb_cron_dispatch",
        broker_kind=BrokerEnum.REDIS,
        max_retry_times=5,
        is_using_advanced_retry=True,
        advanced_retry_config={
            'retry_mode': 'requeue',          # requeue 模式：消息发回队列延迟重试，不占线程
            'retry_base_interval': 1.0,        # 基础间隔 1s
            'retry_multiplier': 2.0,           # 指数退避倍数
            'retry_max_interval': 60.0,        # 最大间隔 60s
            'retry_jitter': True,              # 随机抖动，防止惊群
        },
    )
)
# 重试间隔：1s → 2s → 4s → 8s → 16s → 32s → 60s → 60s...
```
#### 2. Worker 独立进程部署（横向扩展）

调度端和消费端可以完全分离部署，消费端可以独立横向扩展：

```python
# scheduler.py — 调度端（只 push，不消费）
from nb_cron import NbCron, cron_register
from nb_cron.executors.funboost_executor import FunboostExecutor
from funboost import BoosterParams, BrokerEnum

funboost_executor = FunboostExecutor(
    BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS)
)
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)

@cron.job("0 */5 * * * *")
@cron_register('my_task')
def my_task():
    print("执行任务")

cron.start()  # 只调度，不消费

# worker.py — 消费端（独立进程，可部署多台机器）
from nb_cron import NbCron
from nb_cron.executors.funboost_executor import FunboostExecutor
from funboost import BoosterParams, BrokerEnum
import my_tasks  # 触发 @cron_register，让注册表生效

funboost_executor = FunboostExecutor(
    BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS)
)
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)

funboost_executor.mp_consume(process_num=4)  # 4 进程消费
```

#### 3. 自定义指标回调

```python
def my_recorder(job_id, success, duration_ms, error):
    # 同时上报 Prometheus / 自定义监控系统
    prometheus_metrics.labels(job_id=job_id).observe(duration_ms)

funboost_executor = FunboostExecutor(
    BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS),
    metrics_recorder=my_recorder,
)
```

### FunboostExecutor vs 默认执行器

| 特性 | 默认 Executor | FunboostExecutor |
|---|---|---|
| 执行方式 | 本地线程池直接调用 | push 到消息队列，worker 消费执行 |
| 消息持久化 | ❌ 进程崩溃任务丢失 | ✅ 消息队列持久化，不丢任务 |
| 横向扩展 | ❌ 只能单进程 | ✅ worker 独立部署，无限扩展 |
| 精准控频 | ❌ | ✅ qps 参数，支持分布式控频 |
| 自动重试 | ❌ | ✅ max_retry_times + 指数退避 |
| 任务去重 | ❌ | ✅ do_task_filtering |
| 消息过期 | ❌ | ✅ msg_expire_seconds |
| 死信队列 | ❌ | ✅ 重试耗尽自动进入死信队列 |
| 函数超时 | ❌ | ✅ function_timeout |
| 运行时间限制 | ❌ | ✅ allow_run_time_cron |
| 30+ 种消息队列 | ❌ | ✅ broker_kind 一键切换 |
| 多进程消费 | ❌ | ✅ mp_consume(process_num=N) |
| 执行结果持久化 | ✅ (store) | ✅ (store + funboost MongoDB) |

---

## 运行示例

```bash
# FastAPI + Redis（推荐）
cd examples
pip install nb_cron_nb[redis,fastapi]
uvicorn example_fastapi_redis:app --reload

# Flask + Redis
pip install nb_cron_nb[redis,flask]
python example_flask_redis.py

# 最简示例（无需 Redis）
pip install nb_cron_nb[fastapi]
uvicorn example_memory_simple:app --reload

# 跨 Git 项目示例（重点推荐）
# 终端 1：启动项目 1（业务项目）
cd examples/demo_cross_git_project_manage_corn_tasks
python proj1.py

# 终端 2：启动项目 2（FastAPI 管理后台）
uvicorn proj2_fastapi_cron:app --reload
```

---

## License

MIT - ydf0509
