Metadata-Version: 2.4
Name: piko-cucc
Version: 0.1.7
Summary: Data-Oriented Async Task Orchestrator
Project-URL: Homepage, https://github.com/yingsf/piko
Project-URL: Repository, https://github.com/yingsf/piko
Project-URL: Documentation, https://github.com/yingsf/piko/tree/dev#readme
Author-email: chenjp <chenjp105@x.com>
License: MIT
License-File: LICENSE
Keywords: asyncio,etl,framework,pipeline,scheduler
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.12
Requires-Dist: aiofiles>=25.1.0
Requires-Dist: apscheduler>=3.11.2
Requires-Dist: asyncmy>=0.2.10
Requires-Dist: cloudpickle>=3.1.2
Requires-Dist: croniter>=6.0.0
Requires-Dist: cryptography>=46.0.3
Requires-Dist: dynaconf>=3.2.12
Requires-Dist: fastapi>=0.128.0
Requires-Dist: greenlet>=3.0.0
Requires-Dist: prometheus-client>=0.23.1
Requires-Dist: pydantic>=2.12.5
Requires-Dist: sqlalchemy>=2.0.45
Requires-Dist: structlog>=25.5.0
Requires-Dist: uvicorn>=0.40.0
Description-Content-Type: text/markdown

# Piko: Enterprise-Grade Async Orchestrator

[![PyPI version](https://img.shields.io/pypi/v/piko-cucc.svg)](https://pypi.org/project/piko-cucc/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Piko 专为数据工程设计，旨在解决**高并发 I/O** 与**复杂资源管理**之间的矛盾，通过**微内核设计**与**依赖注入**机制，让开发者能够轻松构建支撑数万 QPS 的流水线。

---

## 🔥 Piko 0.1.6 新特性 (New in v0.1.6)

- **App 实例模式**: `app = PikoApp()`，彻底告别隐式全局变量，支持多实例运行。
- **动态系统配置**: 支持在不重启服务的情况下，通过数据库动态调整轮询间隔等系统参数。
- **增强的连接池**: 自动探活与断线重连（Pre-ping），适应不稳定的网络环境。
- **智能回填策略**: 支持 `SKIP` (跳过) 和 `CATCH_UP` (追赶) 等多种回溯策略。

---

## 前置要求 (Prerequisites)

Piko 依赖 **MySQL** (5.7 或 8.0+) 作为核心组件。

```sql
# 创建数据库
CREATE DATABASE piko_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```

## 安装 (Installation)

```bash
pip install piko-cucc
```

------

## 快速开始 (Quick Start)

### 1. 定义应用 (`app.py`)

Piko 强制要求显式实例化 App 对象。

```python
from piko.app import PikoApp

# 实例化 App
app = PikoApp(name="my_crawler_service")
```

### 2. 编写任务 (`jobs.py`)

使用 `@app.job` 装饰器注册任务。

```Python
from .app import app

@app.job(
    job_id="hello_world",
    cron="* * * * *"  # 每分钟执行
)
async def hello_handler(ctx, scheduled_time):
    print(f"Hello Piko ! Time: {scheduled_time}")
```

### 3. 启动入口 (`main.py`)

```python
from my_project.app import app
import my_project.jobs  # 必须导入任务模块以触发注册

if __name__ == "__main__":
    # 一键启动：自动处理 DB 初始化、Leader 选举、Worker 启动
    app.run()
```

------

## 核心模式 (Core Patterns)

### 场景一：高并发网络 I/O (The Async IO Pattern)

利用 `asyncio` 原生并发能力，单节点轻松支撑数千并发。

```python
import aiohttp
from .app import app

@app.job(job_id="fetch_price", cron="*/1 * * * *")
async def fetch_handler(ctx, scheduled_time):
    # 纯异步非阻塞
    async with aiohttp.ClientSession() as session:
        async with session.get("[https://api.example.com/data](https://api.example.com/data)") as resp:
            data = await resp.json()
            print(f"Data: {data}")
```

### 场景二：资源依赖注入 (Dependency Injection)

通过 `resources` 参数注入连接池，自动管理生命周期，防止资源泄露。

```Python
from piko.core.resource import Resource
from contextlib import asynccontextmanager

class DBPool(Resource):
    @asynccontextmanager
    async def acquire(self, ctx):
        # 模拟借出连接
        yield "db_connection_obj"

@app.job(
    job_id="db_task",
    cron="0 * * * *",
    resources={"db": DBPool}  # 注入资源
)
async def db_handler(ctx, scheduled_time, db):
    # db 参数由框架自动注入
    print(f"Using connection: {db}")
```

### 场景三：动态系统调优 (Dynamic Configuration)

Piko 允许你通过数据库动态调整系统行为。

在 `job_config` 表中插入特殊 ID `piko_system_settings`：

```json
// job_id: piko_system_settings
{
    "poll_interval_s": 5,   // 将轮询间隔调整为 5 秒
    "log_level": "DEBUG"
}
```

Watcher 会自动感知并应用新配置，无需重启服务。

------

## 配置 (Configuration)

推荐使用 `settings.toml` 或环境变量。格式如下：

```toml
[default]
# ============================================================================
# General - 通用配置
# ============================================================================
version = "0.1.6"

# ============================================================================
# Startup Strategy - 启动策略
# ============================================================================
# startup_mode 控制服务启动时的容错行为：
#   - "fail_closed": 严格模式，任何配置或依赖检查失败都立即终止启动（生产推荐）
#   - "fail_open_snapshot": 宽容模式，允许使用快照数据降级启动（仅用于开发/测试）
startup_mode = "fail_closed"

# debug 模式会启用更详细的日志和调试端点，生产环境必须设为 false
debug = false

# ============================================================================
# Database - 数据库连接配置 (Piko 核心)
# ============================================================================
# [注意] 库内部默认留空，配合 init_db 的检查逻辑。
# 这样用户如果没配置环境变量，启动时会看到友好的 FATAL ERROR 提示
# 
# ⚠️ 必须使用异步驱动！
# 推荐使用 mysql+asyncmy (速度最快) 或 mysql+aiomysql (兼容性好)
# ❌ 不要使用 pymysql (这是同步驱动，会阻塞 Event Loop)
#
# 格式示例: 
#   "mysql+asyncmy://user:pass@host:port/dbname?charset=utf8mb4"
mysql_dsn = ""

# mysql_pool_size: 连接池初始大小，影响并发能力
# 设为 10-20 是基于典型中小型服务的经验值
mysql_pool_size = 20

# mysql_max_overflow: 连接池最大溢出连接数
# 允许在高峰期临时创建额外连接，避免请求阻塞
mysql_max_overflow = 10

# mysql_pool_recycle_s: 连接回收时间（秒）
# 强制每 3600 秒回收一次连接，防止连接存在太久被防火墙/MySQL服务端切断
# 这配合代码中的 pool_pre_ping=True 共同解决了 "Lost connection" 问题
mysql_pool_recycle_s = 3600

# ============================================================================
# Scheduler Tuning - 调度器调优参数
# ============================================================================
# poll_interval_s: 主轮询间隔（秒）
# [Piko 新特性] 这是"默认"间隔。
# 系统启动后，ConfigWatcher 会优先读取数据库中的 piko_system_settings 配置。
# 如果数据库没配置，才使用此值。
poll_interval_s = 10

# poll_jitter_s: 轮询抖动时间（秒）
# 随机化轮询时刻，避免多实例在同一时刻同时轮询造成数据库压力峰值
poll_jitter_s = 2

# debounce_s: 防抖窗口（秒）
# 在此时间窗口内重复触发的相同任务会被合并，减少无效调度
debounce_s = 2

# timezone: 时区设置
# 影响 cron 表达式的解析和任务触发时间计算，必须与业务时区一致
timezone = "Asia/Shanghai"

# ap_misfire_grace_s_default: APScheduler misfire 容忍时间（秒）
# 任务错过预定时间后，在此时间窗口内仍会执行；超出则跳过
# 300 秒（5分钟）是平衡及时性与系统压力的经验值
ap_misfire_grace_s_default = 300

# ap_max_instances_default: APScheduler 单任务最大并发实例数
# 设为 1 防止同一任务的多个实例并发执行，避免数据竞争
ap_max_instances_default = 1

# ============================================================================
# Leader Election - 分布式Leader选举
# ============================================================================
# leader_enabled: 是否启用Leader选举
# 多实例部署时必须启用，确保只有一个实例执行调度逻辑
leader_enabled = true

# leader_name: Leader名称/组名
# 同一组内的实例会竞争同一个Leader锁，不同组互不干扰
leader_name = "default"

# leader_lease_s: Leader租约时长（秒）
# Leader必须在此时间内续租，否则被视为失效，其他实例可接管
leader_lease_s = 30

# leader_renew_interval_s: 续租间隔（秒）
# Leader多久续租一次，必须小于 lease 时长且留有余量（当前为 1/3）
# 这样即使某次续租失败，仍有时间在租约到期前重试
leader_renew_interval_s = 10

# ============================================================================
# Concurrency & Compute - 并发与计算资源
# ============================================================================
# cpu_workers: CPU 密集型任务的工作线程数
# 0 表示自动检测（通常为 CPU 核心数），可根据任务特性手动调整
cpu_workers = 0

# per_job_cpu_max: 单任务最大 CPU 并发数
# 限制单个任务可使用的最大线程数，防止某个任务耗尽所有资源
per_job_cpu_max = 8

# ============================================================================
# Persistence - 持久化与缓冲配置
# ============================================================================
# persist_queue_max: 持久化队列最大长度
# 控制内存中待持久化对象的最大数量，防止内存溢出
persist_queue_max = 200

# persist_flush_timeout_s: 强制刷盘超时（秒）
# 即使队列未满，也会在此时间后强制写入，平衡数据丢失风险与 I/O 效率
persist_flush_timeout_s = 60

# persist_disk_fallback_path: 磁盘降级备份路径
# 当主存储（如数据库）不可用时，临时写入本地文件，防止数据丢失
# 注意：此路径应在容器/主机重启后仍可访问（如挂载卷）
persist_disk_fallback_path = "/tmp/piko_fallback.bin"

# ============================================================================
# Observability & Logging - 可观测性与日志
# ============================================================================
# metrics_enabled: 是否启用 Prometheus 等指标采集
# 生产环境强烈建议启用，用于监控调度器健康状况
metrics_enabled = true

# health_port: 健康检查和指标暴露端口
# K8s/Docker 可通过此端口进行 liveness/readiness probe
health_port = 8080

# log_level: 日志级别
# 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
# 生产环境建议 INFO，排查问题时临时调为 DEBUG
log_level = "INFO"

# log_json: 日志格式开关
#   - true: 生产模式，输出结构化 JSON 日志，便于日志聚合平台解析
#   - false: 开发模式，输出彩色文本日志，提升本地调试体验
log_json = false
```

## ⚙️ 进阶：动态系统配置 (Dynamic System Settings)

除了前面的配置文件或环境变量方式，Piko  还引入了 **“控制面与数据面分离”** 的设计理念。你还可以通过数据库实时调整系统的运行时行为，实现 **无停机调优 (Hot Reload)**。

这是通过在 `job_config` 表中插入一个特殊的保留任务 ID `piko_system_settings` 来实现的。

### 1. 静态 vs 动态配置对照表

并不是所有配置都能放入数据库。请务必遵循以下规则，否则配置将无效：

| 配置分类                        | 典型参数                                                     | 存放位置                                          | 生效时机                  | 备注                                                         |
| :------------------------------ | :----------------------------------------------------------- | :------------------------------------------------ | :------------------------ | :----------------------------------------------------------- |
| **基础设施 (Infrastructure)**   | `mysql_dsn`<br>`mysql_pool_size`<br>`leader_name`<br>`startup_mode`<br>`timezone` | **仅** `settings.toml`<br>或 环境变量             | **启动时**                | 涉及连接池初始化、线程启动或时区基准，修改后**必须重启服务**。 |
| **运行时策略 (Runtime Policy)** | `poll_interval_s`<br>`poll_jitter_s`<br>`log_level`<br>`per_job_cpu_max` | `settings.toml` (默认值)<br>**+ 数据库 (覆盖值)** | **即时生效**<br>(< 1分钟) | Watcher 会在下一次心跳时自动加载新值。                       |

### 2. 如何使用动态配置

你不需要重启服务，只需在数据库中插入或更新以下 JSON：

```sql
INSERT INTO job_config (job_id, schema_version, config_json, version, updated_at)
VALUES (
    'piko_system_settings',  -- ⚠️ 系统保留 ID，不可更改
    1,
    '{
        "poll_interval_s": 5,       -- [调优] 将轮询间隔加速到 5秒 (默认10s)
        "poll_jitter_s": 1,         -- [调优] 减少抖动范围
        "log_level": "DEBUG"        -- [排查] 临时开启调试日志，查完改回 INFO
    }',
    1,
    NOW()
) 
ON DUPLICATE KEY UPDATE 
    config_json = VALUES(config_json), 
    version = version + 1;
```

### 3. 常见误区 (Common Pitfalls)

- ❌ **错误做法**：在数据库里修改 `mysql_pool_size` 想扩大连接池。
  - **后果**：无效。连接池在进程启动的第一秒就已经固定了，数据库里的配置会被忽略。
- ❌ **错误做法**：在数据库里修改 `timezone`。
  - **后果**：极度危险。调度器可能产生逻辑分裂（部分任务按旧时区跑，部分按新时区计算），导致任务漏跑或重跑。
- ✅ **正确做法**：仅在数据库中调整 `poll_interval_s` (控制节奏) 和 `log_level` (控制日志量)。