Metadata-Version: 2.4
Name: volc-flink-cli
Version: 1.2.0
Summary: Volcano Engine Flink Stream Computing Python Client CLI
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Requires-Dist: requests>=2.0.0
Requires-Dist: volcengine-python-sdk>=5.0.0
Requires-Dist: tos>=2.9.0
Requires-Dist: confluent-kafka==2.0.2; python_version < "3.12"
Requires-Dist: confluent-kafka>=2.3.0; python_version >= "3.12"

# Volcano Engine Flink AI Skills + volc-flink-cli

本仓库将 `volc-flink-cli` 与 `skills` 技能体系整合在一起，目标是沉淀面向 AI 场景的“火山引擎 Flink 技能项目”：

- `volc_flink`：可执行的 CLI（同时包含 Python client），用于真实调用火山引擎 Flink API
- `skills/`：面向 AI 的技能定义与评测脚本，用于让 AI 以“技能路由 + 工具调用”的方式完成 Flink 运维与开发任务

## 目录

- [安装](#安装)
- [快速开始](#快速开始)
- [AI Skills（skills）](#ai-skillsskills)
- [常用场景](#常用场景)
  - [草稿（Drafts）](#草稿drafts)
  - [任务（Jobs）](#任务jobs)
  - [监控（Monitor）](#监控monitor)
  - [Catalog（元数据）](#catalog元数据)
  - [Sessions（Session 集群）](#sessionssession-集群)
  - [Files（TOS 文件）](#filestos-文件)
  - [连接（Conn / Kafka）](#连接conn--kafka)
- [配置与安全](#配置与安全)
- [Python API](#python-api)
- [开发说明](#开发说明)
- [常见问题](#常见问题)

## 安装

### 环境要求

- Python 3.7+

### 安装方式

#### 方式 A：从 PyPI 安装（推荐：无代码仓库也可用）

PyPI 地址：<https://pypi.org/project/volc-flink-cli/>

```bash
pip install volc-flink-cli

# 验证安装
volc_flink --version
volc_flink --help
volc_flink login status
```

需要 SQL/CDC 模板示例时：建议直接从你的业务仓库/本地文件提供 `--sql-file` / `--cdc-file`；如果确实需要本项目示例文件，可选择克隆仓库后参考 `skills/examples/`（示例仅用于演示结构，发布生产任务前请替换为真实参数与连接信息）。

如需使用 `conn kafka` 能力：本项目已在依赖中处理 `confluent-kafka` 的 Python 版本差异；若你使用 Python 3.12，建议使用 `confluent-kafka>=2.3.0`（通常可直接安装 wheel，避免本地编译 `librdkafka`）。

#### 方式 B：从源码安装（本仓库）

```bash
pip install .
```

如需使用 `conn kafka` 能力，请确保已安装 `confluent-kafka`（本仓库 `pyproject.toml` 已包含依赖；若你在 Python 3.12 环境下手动安装，建议 `confluent-kafka>=2.3.0` 以获得 wheel，避免本地编译 `librdkafka`）。

开发模式（可编辑安装）：

```bash
pip install -e .
```

安装完成后获得命令：`volc_flink`。

## 快速开始

### 1）登录

```bash
volc_flink login --region cn-beijing
```

也支持从环境变量提供登录信息（credentials 文件缺失/无效时兜底）：

```bash
export VOLCENGINE_ACCESS_KEY="xxx"
export VOLCENGINE_SECRET_KEY="yyy"
export VOLCENGINE_REGION="cn-beijing"  # optional
```

查看当前登录与默认配置状态：

```bash
volc_flink login status
volc_flink config show
```

### 2）选定项目（推荐）

先列举项目：

```bash
volc_flink projects list
```

设置默认项目（之后大部分命令可省略 `-p/--project` 或 `--project-id`）：

```bash
volc_flink config set-default-project --project-id YOUR_PROJECT_ID
```

也可以按项目名设置：

```bash
volc_flink config set-default-project --name YOUR_PROJECT_NAME
```

### 3）查看帮助

```bash
volc_flink --help
volc_flink drafts --help
volc_flink jobs --help
```

## 常用场景

### 草稿（Drafts）

#### 1）列目录 / 列草稿

列草稿目录（用于创建草稿时选择目录）：

```bash
volc_flink drafts dirs
```

创建草稿目录：

```bash
volc_flink drafts mkdir --name "/a/b/c"
```

删除草稿目录：

```bash
volc_flink drafts rmr --name "/a/b/c"
volc_flink drafts rmr --id "2037507470509535234"
```

删除草稿：

```bash
volc_flink drafts rm --id "2037518350102020097"
volc_flink drafts rm --name "my_draft"
volc_flink drafts rm --name "/a/b/c/my_draft.sql"
```

列草稿（用于查询草稿 ID、类型、目录等）：

```bash
volc_flink drafts apps --directory "/a/b/c"
```

兼容命令（等价于 `drafts apps`）：

```bash
volc_flink drafts list --directory "/a/b/c"
```

#### 2）校验 SQL 草稿（Validate）

说明：Validate 不只做语法解析，也可能会触发 connector/catalog 的校验，因此失败不一定是 SQL 语法错误。

```bash
# 通过草稿 ID 校验
volc_flink drafts validate --draft-id YOUR_DRAFT_ID

# 通过草稿名称或路径校验（自动解析草稿ID）
volc_flink drafts validate --draft "my_draft"
volc_flink drafts validate --draft "/a/b/c/my_draft.sql"
```

#### 3）创建草稿（SQL）

```bash
volc_flink drafts create \
  --name "test-streaming-sql" \
  --directory "/a/b/c" \
  --job-type FLINK_STREAMING_SQL \
  --engine-version FLINK_VERSION_1_17 \
  --sql "SELECT 1;"
```

#### 4）创建草稿（JAR：本地上传到 TOS）

说明：如果你是通过 PyPI 安装且本机没有本仓库代码，请将 `--jar-path` 替换为你自己的 JAR 文件路径；或自行下载一个可运行的 Flink 示例 JAR 到本地后再上传。

```bash
volc_flink drafts create \
  --name "test-batch-jar" \
  --directory "/a/b/c" \
  --job-type FLINK_BATCH_JAR \
  --engine-version FLINK_VERSION_1_20 \
  --jar-path ./WordCount-flink-1-20.jar \
  --main-class org.apache.flink.streaming.examples.wordcount.WordCount
```

#### 5）创建草稿（CDC：YAML Pipeline）

说明：

- CDC 草稿类型为 `FLINK_CDC_JAR`（等价参数：`--type cdc`）
- 当前 CDC 仅支持 `FLINK_VERSION_1_16`，CDC 版本默认 `v3.4`
- `--directory` 必须是已存在的草稿目录；可用 `drafts dirs` 查询，或用 `drafts mkdir` 创建

```bash
# 1) 选择/创建草稿目录
volc_flink drafts dirs
volc_flink drafts mkdir --name "/数据开发文件夹/CDC"

# 2) 创建 CDC 草稿（推荐：文件方式，最稳妥，不受 shell 转义影响）
volc_flink drafts create \
  --type cdc \
  --directory "/数据开发文件夹/CDC" \
  -n "mysql-to-paimon-cdc" \
  --engine-version FLINK_VERSION_1_16 \
  --cdc-version v3.4 \
  --cdc-file ./path/to/job.yml

# 3) 发布为 Job（仅提交发布，不自动启动）
volc_flink drafts publish --draft-id YOUR_DRAFT_ID --resource-pool YOUR_RESOURCE_POOL_NAME
```

可选：内联 YAML（适合复制粘贴；用 heredoc 避免 `${...}` 被 shell 展开）：

```bash
volc_flink drafts create \
  --type cdc \
  --directory "/数据开发文件夹/CDC" \
  -n "mysql-to-paimon-cdc" \
  --engine-version FLINK_VERSION_1_16 \
  --cdc-version v3.4 \
  --cdc "$(cat <<'YAML'
sources:
- source:
    type: mysql
    hostname: ${mysql_hostname}
    port: ${mysql_port}
    username: ${mysql_username}
    password: ${mysql_password}
    tables: fin_db.*\\.balance.*
    server-id: ${mysql_server_id}
    schema-change.enabled: true
sink:
  type: paimon-las
  commit.user: ${paimon_las_commit_user}
route:
- source-table: fin_db.*\\.balance.*
  sink-table: fin_db.balance
pipeline:
  name: ${pipeline_name}
  parallelism: ${pipeline_parallelism}
YAML
)"
```

MySQL CDC 额外提醒：需要用户自行上传 MySQL Driver（建议 `8.0.27`）到平台依赖资源中。

#### 6）依赖与动态参数

Dependency 字段为 JSON 字符串，例如：

```json
{"jars":["tos://flink-cwz-paimon/flink-client/resrouce/jars/WordCount-flink-1-20.jar"]}
```

给草稿添加依赖（支持 tos 路径或本地路径，本地会自动上传并输出 `tos://...` 路径）：

```bash
volc_flink drafts dependency add \
  --draft "/a/b/c/jar_job" \
  --jar tos://YOUR_BUCKET/path/to/jars/dep1.jar \
  --jar ./path/to/dep2.jar
```

动态参数（DynamicOptions）为 String-String KV JSON，例如：

```json
{"parallelism.default":"4","taskmanager.numberOfTaskSlots":"4"}
```

设置/删除动态参数：

```bash
volc_flink drafts params show --draft "/a/b/c/jar_job"
volc_flink drafts params set --draft "/a/b/c/jar_job" --kv parallelism.default=4
volc_flink drafts params unset --draft "/a/b/c/jar_job" --key parallelism.default
```

#### 7）更新与发布

更新 SQL：

```bash
volc_flink drafts update --draft "/a/b/c/sql_job" --sql-file /path/to/job.sql
```

发布草稿：

```bash
volc_flink drafts publish --draft "/a/b/c/sql_job" --resource-pool YOUR_RESOURCE_POOL_NAME
```

### 资源池（Resource Pools）

发布草稿（`drafts publish`）需要指定资源池（`--resource-pool` 或 `--resource-pool-id`）。

列举资源池：

```bash
volc_flink resource-pools list

# 指定项目（可选；不传则使用默认项目）
volc_flink resource-pools list -p "YOUR_PROJECT_NAME"
volc_flink resource-pools list --project-id "YOUR_PROJECT_ID"
```

查看资源池详情：

```bash
volc_flink resource-pools detail --resource-pool-id "YOUR_RESOURCE_POOL_ID"
volc_flink resource-pools detail --name "YOUR_RESOURCE_POOL_NAME"
```

### 任务（Jobs）

列举任务与详情：

```bash
volc_flink jobs list
volc_flink jobs detail --job-id YOUR_JOB_ID
```

查询任务运行记录/事件：

```bash
volc_flink jobs instances --job-id YOUR_JOB_ID --limit 20
volc_flink jobs events --job-id YOUR_JOB_ID --limit 50
```

启动/停止/重启（支持 `--inspect` 轮询状态）：

```bash
volc_flink jobs start --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs stop --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs restart --job-id YOUR_JOB_ID --inspect --timeout 300
```

Rescale（调整规格并自动发布重启）：

```bash
volc_flink jobs rescale --job-id YOUR_JOB_ID --parallelism 32 --tm-spec 4C16GB --tm-slots 4 --jm-spec 2C8GB --inspect --timeout 300
```

### 监控（Monitor）

查询事件：

```bash
volc_flink monitor events --job-id YOUR_JOB_ID --limit 50
```

查询日志（默认选择最新实例，支持时间范围、cursor、tail）：

```bash
volc_flink monitor logs --job-id YOUR_JOB_ID --level ERROR --since "2025-01-01 12:00:00" --until "2025-01-01 13:00:00"
volc_flink monitor logs --job-id YOUR_JOB_ID --follow --interval 2
volc_flink monitor logs --job-id YOUR_JOB_ID --cursor YOUR_CURSOR
```

查询 FlinkUI：

```bash
volc_flink monitor flinkui url --job-id YOUR_JOB_ID
volc_flink monitor flinkui overview --job-id YOUR_JOB_ID
volc_flink monitor flinkui exceptions --job-id YOUR_JOB_ID
```

### Catalog（元数据）

列出 Catalog 列表：

```bash
volc_flink catalog tree
```

展开某个 Catalog（以列表形式列出该 Catalog 下的 Database）：

```bash
volc_flink catalog tree --catalog-id 45
```

展开某个 Database（列出 Table）：

```bash
volc_flink catalog tree --catalog-id 45 --database test_db
```

查看详细信息：

```bash
volc_flink catalog catalogs list
volc_flink catalog catalogs show --catalog-id 45
volc_flink catalog databases show --catalog-id 45 --database test_db
volc_flink catalog tables show --catalog-id 45 --database test_db --table prds_pk1
```

其中 `catalog tables show` 默认会输出表基础信息，以及字段列表的 `name`、`type`、`comment`、`nullable`、`primary-key` 五列；如需排查接口原始返回，可追加 `--raw`。

### Sessions（Session 集群）

列举 Session 集群：

```bash
volc_flink sessions list
```

创建 Session 集群：

```bash
volc_flink sessions create \
  --name "abcdef" \
  --resource-pool-id "YOUR_RESOURCE_POOL_ID" \
  --vcu 10 \
  --min-replica 1 \
  --max-replica 10 \
  --engine-version "FLINK_VERSION_1_17"
```

启动/停止：

```bash
volc_flink sessions start --id YOUR_SESSION_ID
volc_flink sessions stop --id YOUR_SESSION_ID
```

获取 FlinkUI（工具会自动获取 token 并通过网关拼接 URL）：

```bash
volc_flink sessions ui --id YOUR_SESSION_ID
```

### Files（TOS 文件）

说明：Files 模块使用 `config set-tos-jar-prefix` 配置的 `tos://...` 目录作为根目录，对其下对象进行管理。

列出文件：

```bash
volc_flink files list
volc_flink files list --prefix "sub/dir" --limit 50
```

上传/更新：

```bash
volc_flink files upload --file ./a.jar
volc_flink files upload --file ./a.jar --name "custom/a.jar"
volc_flink files update --name "custom/a.jar" --file ./a.jar
```

查看详情与获取下载链接：

```bash
volc_flink files detail --name "custom/a.jar"
volc_flink files url --name "custom/a.jar" --expires 3600
```

### 连接（Conn / Kafka）

说明：`conn` 与 `connection` 是等价命令别名，目前优先支持 Kafka 连接与消息消费能力。

新增/更新 Kafka instance（保存默认 topic/group-id，可选）：

```bash
volc_flink conn kafka instance add \
  --name demo_kafka \
  --topic events \
  --group-id test_group
```

为 instance 增加接入点（endpoint）。支持两种方式：

1) 交互式（不传 endpoint 连接参数时会提示输入）

```bash
volc_flink conn kafka endpoint add --instance demo_kafka
```

2) 参数式（不进入交互；至少需要 `--bootstrap-servers`）

```bash
volc_flink conn kafka endpoint add \
  --instance demo_kafka \
  --name private \
  --bootstrap-servers broker1:9092,broker2:9092 \
  --security-protocol PLAINTEXT
```

删除 instance / endpoint：

```bash
volc_flink conn kafka instance rm --instance demo_kafka
volc_flink conn kafka endpoint rm --instance demo_kafka --endpoint private
```

列出 Kafka instances：

```bash
volc_flink conn kafka instance
```

列出 Kafka endpoints：

```bash
volc_flink conn kafka endpoint
volc_flink conn kafka endpoint --instance demo_kafka
```

查看 Kafka endpoint 详情（默认不打印明文密码）：

```bash
volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public
volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public --show-password
```

按时间范围消费消息：

```bash
volc_flink conn kafka messages consume \
  --instance demo_kafka \
  --endpoint private \
  --limit 10
```

如果不传 `--partition`，默认会从所有 partition 消费；如果不传 `--start`，默认从最近 3 小时开始消费；如果不传 `--limit`，默认返回 10 条消息。

只消费单个 partition：

```bash
volc_flink conn kafka messages consume \
  --partition 0 \
  --instance demo_kafka \
  --endpoint private \
  --limit 10
```

指定结束时间：

```bash
volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --end 2024-07-01T01:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10
```

覆盖默认 Topic / Group：

```bash
volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --topic other_topic \
  --group-id other_group \
  --limit 10
```

输出原始 JSON：

```bash
volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10 \
  --raw
```

调整超时时间：

```bash
volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --poll-timeout 5 \
  --max-idle-polls 12 \
  --offsets-timeout 30 \
  --limit 10
```

使用别名命令：

```bash
volc_flink connection kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10
```

说明：

- `consume` 使用保存的 Kafka instance 与 endpoint 建立消费者
- 如存在多个 instance，建议通过 `--instance` 显式指定
- 如一个 instance 下存在多个 endpoint，建议通过 `--endpoint` 显式指定
- `--partition` 不传时默认从所有 partition 消费
- `--start` 支持 ISO8601 时间或时间戳；不传时默认最近 3 小时
- `--end` 不传时默认消费到当前时间
- `--limit` 不传时默认返回 10 条消息
- `--poll-timeout`、`--max-idle-polls`、`--offsets-timeout` 可用于调大超时时间
- 如未配置 Kafka instance/endpoint，命令会提示先执行 `conn kafka instance add` 与 `conn kafka endpoint add`

## AI Skills（skills）

`skills/` 目录提供面向 AI 的 Flink 技能体系（统一入口 + 子技能路由 + 评测脚本），与 `volc_flink` CLI 配合使用：技能负责“理解/规划/路由”，CLI 负责“真实执行/查询”。

入口文档：

- `skills/README.md`：技能体系说明与目录结构
- `skills/SKILL.md`：统一入口技能定义（路由）

## 配置与安全

### 配置文件位置

- Windows：`C:\Users\YourUsername\.volc_flink\credentials.json`（兼容旧目录：`.volcano_flink`）
- Linux/macOS：`~/.volc_flink/credentials.json`（兼容旧目录：`~/.volcano_flink/credentials.json`）

也可通过 `VOLC_FLINK_CONFIG_DIR` 指定配置目录位置（便于容器/CI）。

常用环境变量（credentials 文件缺失/无效时兜底）：

- `VOLCENGINE_ACCESS_KEY` / `VOLCENGINE_SECRET_KEY` / `VOLCENGINE_REGION`
- `VOLCENGINE_FLINK_PROJECT` / `VOLCENGINE_FLINK_PROJECT_ID` / `VOLCENGINE_FLINK_TOS_PREFIX`

### 安全说明

- 密钥信息存储在用户本地目录，文件权限设置为仅当前用户可读写（0600）
- 建议定期更换访问密钥
- 生产环境建议使用更安全的密钥管理方案

## Python API

```python
from volcano_flink_client.auth.auth import AuthManager
from volcano_flink_client.api import ApiClient
from volcano_flink_client.projects.projects import ProjectManager

auth_manager = AuthManager()
credentials = auth_manager.login(access_key="YOUR_AK", secret_key="YOUR_SK", region="cn-beijing")
api_client = ApiClient(credentials)

project_manager = ProjectManager(api_client)
projects = project_manager.list_projects()
for p in projects:
    print(p.name, p.id)
```

## 开发说明

### 代码结构

```
skills/                  # AI 技能定义与评测（与 volc_flink CLI 整合）
volcano_flink_client/
├── api.py               # HTTP 请求与签名
├── auth/                # 认证管理
├── projects/            # 项目操作
├── resource_pools/      # 资源池操作
├── drafts/              # 草稿操作
├── jobs/                # 任务操作
├── catalogs/            # Catalog 元数据
└── sessions/            # Session 集群

volcano_flink_cli/       # CLI 命令实现（argparse 子命令）
volcano_flink.py         # CLI 入口（volc_flink）
```

### 本地验证

```bash
python integration_test.py
```

## 常见问题

### Q：如何获取 AK/SK？

进入火山引擎控制台的访问控制页面创建与管理访问密钥：<https://console.volcengine.com/iam/accesskey/>

### Q：命令忘了怎么用？

```bash
volc_flink --help
volc_flink <module> --help
volc_flink <module> <cmd> --help
```

### Q：登录失败怎么办？

- 确认 AK/SK 正确
- 确认网络可访问火山引擎 OpenAPI
- 确认账号具备对应权限
