Python SDK New Feature Ops / Admin

Worker Management

允许管理员在不重启 Worker 进程的前提下,动态控制 Worker 的订阅权限与生命周期。 支持暂停、恢复、优雅下线、强制下线,以及基于 Denylist 的 agent_type 订阅管控。

概述

by-framework 是基于 Redis Streams 构建的分布式 Agent 调度引擎。 Worker 以竞争消费方式从控制流(Control Stream)获取任务,横向扩展无需改代码。

本功能在原有「纯订阅模型」基础上新增管控层,管理员通过 WorkerManager API 可以在运行时对任意 Worker 施加生命周期控制或订阅限制,无需重启进程、无需改动任何 Worker 代码。

消息语义:at-most-once
消息一旦被 Worker 的 XREADGROUP 消费,所有权即归该 Worker。 若 Worker 在处理途中被驱逐或崩溃,该消息超时失败,框架不做重投递——适合 LLM Agent 避免重复调用大模型。

双通道机制

管控命令通过两条独立通道投递,互为冗余,确保指令必达:

WorkerManager admin.suspend() admin.evict() Push 通道 Pull 兜底 ctrl:worker:{worker_id} Redis Stream · XADD 立即推送 registry:worker:admin:{id} Redis HASH · HSET 持久存储 · 心跳轮询 Worker Heartbeat 5s XREADGROUP 即时 ≤5s 兜底
1

Push 通道(即时)

WorkerManager 调用 XADD 把命令写入 byai_gateway:ctrl:worker:{worker_id}, Worker 的 XREADGROUP 循环立即消费并响应。

2

Pull 通道(心跳兜底)

同时把状态写入 byai_gateway:registry:worker:admin:{worker_id} HASH。 即使 Push 消息因网络抖动未被消费,Worker 的心跳线程每 5 秒读取一次该 HASH 并同步状态,确保指令最终生效。

新增 Redis Key

byai_gateway:registry:worker:admin:{worker_id}
HASH
存储 Worker 的管理员控制状态。无 TTL,持久存在,需显式清除。
lifecycle: active | suspended | evicted reason: string updated_at: unix_timestamp
byai_gateway:registry:agent_type:denied:{agent_type}
SET
被禁止订阅该 agent_type 的 worker_id 集合。 Denylist 语义:key 不存在 = 所有 Worker 均可订阅,默认开放,向后兼容。
member: worker_id
worker:admin:{worker_id} 在 Worker 重启后仍然存在。若希望重启后恢复默认行为, 需调用 clear_worker_admin_state()resume_worker()

WorkerManager API

所有操作均通过 WorkerManager 完成,已封装双通道投递逻辑。

python
from by_framework import WorkerManager
from by_framework.common.redis_client import get_redis

redis = get_redis()
manager = WorkerManager(redis)

# ── 生命周期管控 ──────────────────────────────────────────

# 暂停 Worker(停止消费新消息,in-flight 任务继续跑完)
await manager.suspend_worker("worker-abc", reason="maintenance")

# 恢复 Worker
await manager.resume_worker("worker-abc")

# 优雅下线(等 in-flight 任务完成后进程自然退出)
await manager.evict_worker("worker-abc", reason="scale down")

# 强制下线(立即退出,in-flight 任务超时失败)
await manager.evict_worker("worker-abc", force=True, reason="emergency")

# ── 订阅权限管控 ──────────────────────────────────────────

# 禁止某个 Worker 订阅某个 agent_type(≤5s 内生效)
await manager.deny_worker_for_type("gpt-4o", "worker-abc")

# 恢复订阅权限
await manager.allow_worker_for_type("gpt-4o", "worker-abc")

# ── 查询 ──────────────────────────────────────────────────

# 查看 Worker 当前 admin 状态
state = await manager.get_worker_admin_state("worker-abc")
# → {"lifecycle": "suspended", "reason": "maintenance", "updated_at": "1749000000"}

# 查看某个 agent_type 的 denylist
denied = await manager.get_type_denylist("gpt-4o")
# → ["worker-abc", "worker-xyz"]

# 清除 admin 状态(恢复默认行为)
await manager.clear_worker_admin_state("worker-abc")

新增协议命令

以下命令由 WorkerManager 自动封装,通常不需要直接使用。 在实现自定义管控逻辑时可直接导入。

命令类 Action Type 参数 用途
SuspendWorkerCommand SUSPEND_WORKER reason: str 暂停 Worker 消费
ResumeWorkerCommand RESUME_WORKER 恢复 Worker 消费
EvictWorkerCommand EVICT_WORKER reason: str, force: bool 驱逐 Worker(graceful / force)
python · 直接使用示例
from by_framework import SuspendWorkerCommand, ResumeWorkerCommand, EvictWorkerCommand

cmd = EvictWorkerCommand(force=True, reason="OOM kill")
payload = cmd.to_redis_payload()  # → dict for XADD

生命周期管控操作

暂停 / 恢复

suspend_worker() 让 Worker 停止从 control stream 取新消息, 但正在执行的任务(in-flight)不会被中断,LLM 调用会继续完成。 适合在维护窗口期间临时冻结流量。

暂停后,Worker 进程仍在运行并保持心跳,Redis 中的 Worker 在线状态不变。 恢复只需调用 resume_worker(),即时生效。

优雅下线(Graceful Evict)

Worker 收到 EvictWorkerCommand(force=False) 后,停止消费新消息, 等待所有 in-flight 任务完成,然后进程自然退出。 无需手动 kill 进程。

强制下线(Force Evict)

Worker 收到 EvictWorkerCommand(force=True) 后,立即退出进程。 in-flight 任务被中断,客户端等待超时后收到失败响应。

Force evict 时,已被 XREADGROUP 消费但未 XACK 的消息 留在 PEL(Pending Entry List)中。框架不做重投递(at-most-once 语义), 消息超时失败,客户端需自行处理重试。

Denylist 订阅管控

基于黑名单(Denylist)语义,默认开放:
key 不存在 = 该 agent_type 对所有 Worker 开放;加入 denylist = 禁止订阅该类型消息。

python · 典型场景:灰度流量管控
# 只允许 worker-new 处理 gpt-4o 任务,其余 worker 全部禁止
all_workers = ["worker-a", "worker-b", "worker-c"]
for w in all_workers:
    await manager.deny_worker_for_type("gpt-4o", w)

# 验证 denylist
denied = await manager.get_type_denylist("gpt-4o")
# → ["worker-a", "worker-b", "worker-c"]

# 灰度完成后,恢复所有权限
for w in all_workers:
    await manager.allow_worker_for_type("gpt-4o", w)
deny_worker_for_type 最多有 5 秒延迟—— Worker 的心跳线程每 5s 刷新一次内存中的 denylist 缓存,刷新前仍可能消费该 agent_type 的消息。

实现原理

Worker 启动后,_active_agent_type_streams() 从内存 frozenset[denied_agent_types] 中过滤订阅列表(纯内存 O(N),无 Redis 调用), 心跳线程每次刷新该缓存。这避免了高消息速率下每次 consume 循环调用 N 次 SISMEMBER 的开销。

Worker 侧行为对照

操作
感知时机
Worker 行为
suspend_worker()
即时(push)
或 ≤5s(兜底)
停止从 control stream 取新消息;in-flight 任务继续执行完毕
resume_worker()
即时(push)
或 ≤5s(兜底)
恢复正常消费
evict_worker()
即时(push)
或 ≤5s(兜底)
停止消费 → 等所有 in-flight 任务完成 → 进程自然退出
evict_worker(force=True)
即时(push)
或 ≤5s(兜底)
立即退出进程;in-flight 任务中断,客户端超时失败
deny_worker_for_type()
≤5s(心跳刷新 denylist 缓存)
从活跃订阅流列表中移除该 agent_type,不再消费该类型消息
allow_worker_for_type()
≤5s(心跳刷新 denylist 缓存)
重新订阅该 agent_type

注意事项

Worker 重启后状态保留

registry:worker:admin:{worker_id} HASH 无 TTL,Worker 重启后仍存在。 若希望重启后的 Worker 以默认状态(active)运行,需在重启前或重启后调用:

python
await manager.clear_worker_admin_state("worker-abc")
# 或
await manager.resume_worker("worker-abc")

不会中断正在执行的 LLM 调用

suspendevict(non-force)只影响新消息的消费, 不会中断正在进行的 process_command() 调用、LLM 流式输出或任何 in-flight 操作。

at-most-once 消息语义

消息被 XREADGROUP 消费后即归属该 Worker。 Worker 崩溃或被 force-evict 时,消息留在 PEL 中,框架不做重投递。 客户端等待超时后收到失败响应,重试由应用层决策——这避免了 LLM 调用被重复触发。

evict 后无需手动 kill

Worker 收到 evict 命令后会自行完成退出流程(graceful 等任务完成,force 立即退出), 无需额外 kill 进程操作。