Worker Management
允许管理员在不重启 Worker 进程的前提下,动态控制 Worker 的订阅权限与生命周期。 支持暂停、恢复、优雅下线、强制下线,以及基于 Denylist 的 agent_type 订阅管控。
概述
by-framework 是基于 Redis Streams 构建的分布式 Agent 调度引擎。 Worker 以竞争消费方式从控制流(Control Stream)获取任务,横向扩展无需改代码。
本功能在原有「纯订阅模型」基础上新增管控层,管理员通过 WorkerManager API
可以在运行时对任意 Worker 施加生命周期控制或订阅限制,无需重启进程、无需改动任何 Worker 代码。
消息一旦被 Worker 的
XREADGROUP 消费,所有权即归该 Worker。
若 Worker 在处理途中被驱逐或崩溃,该消息超时失败,框架不做重投递——适合 LLM Agent 避免重复调用大模型。
双通道机制
管控命令通过两条独立通道投递,互为冗余,确保指令必达:
Push 通道(即时)
WorkerManager 调用 XADD 把命令写入
byai_gateway:ctrl:worker:{worker_id},
Worker 的 XREADGROUP 循环立即消费并响应。
Pull 通道(心跳兜底)
同时把状态写入 byai_gateway:registry:worker:admin:{worker_id} HASH。
即使 Push 消息因网络抖动未被消费,Worker 的心跳线程每 5 秒读取一次该 HASH 并同步状态,确保指令最终生效。
新增 Redis Key
worker:admin:{worker_id} 在 Worker 重启后仍然存在。若希望重启后恢复默认行为,
需调用 clear_worker_admin_state() 或 resume_worker()。
WorkerManager API
所有操作均通过 WorkerManager 完成,已封装双通道投递逻辑。
from by_framework import WorkerManager from by_framework.common.redis_client import get_redis redis = get_redis() manager = WorkerManager(redis) # ── 生命周期管控 ────────────────────────────────────────── # 暂停 Worker(停止消费新消息,in-flight 任务继续跑完) await manager.suspend_worker("worker-abc", reason="maintenance") # 恢复 Worker await manager.resume_worker("worker-abc") # 优雅下线(等 in-flight 任务完成后进程自然退出) await manager.evict_worker("worker-abc", reason="scale down") # 强制下线(立即退出,in-flight 任务超时失败) await manager.evict_worker("worker-abc", force=True, reason="emergency") # ── 订阅权限管控 ────────────────────────────────────────── # 禁止某个 Worker 订阅某个 agent_type(≤5s 内生效) await manager.deny_worker_for_type("gpt-4o", "worker-abc") # 恢复订阅权限 await manager.allow_worker_for_type("gpt-4o", "worker-abc") # ── 查询 ────────────────────────────────────────────────── # 查看 Worker 当前 admin 状态 state = await manager.get_worker_admin_state("worker-abc") # → {"lifecycle": "suspended", "reason": "maintenance", "updated_at": "1749000000"} # 查看某个 agent_type 的 denylist denied = await manager.get_type_denylist("gpt-4o") # → ["worker-abc", "worker-xyz"] # 清除 admin 状态(恢复默认行为) await manager.clear_worker_admin_state("worker-abc")
新增协议命令
以下命令由 WorkerManager 自动封装,通常不需要直接使用。
在实现自定义管控逻辑时可直接导入。
| 命令类 | Action Type | 参数 | 用途 |
|---|---|---|---|
SuspendWorkerCommand |
SUSPEND_WORKER |
reason: str |
暂停 Worker 消费 |
ResumeWorkerCommand |
RESUME_WORKER |
— | 恢复 Worker 消费 |
EvictWorkerCommand |
EVICT_WORKER |
reason: str, force: bool |
驱逐 Worker(graceful / force) |
from by_framework import SuspendWorkerCommand, ResumeWorkerCommand, EvictWorkerCommand cmd = EvictWorkerCommand(force=True, reason="OOM kill") payload = cmd.to_redis_payload() # → dict for XADD
生命周期管控操作
暂停 / 恢复
suspend_worker() 让 Worker 停止从 control stream 取新消息,
但正在执行的任务(in-flight)不会被中断,LLM 调用会继续完成。
适合在维护窗口期间临时冻结流量。
resume_worker(),即时生效。
优雅下线(Graceful Evict)
Worker 收到 EvictWorkerCommand(force=False) 后,停止消费新消息,
等待所有 in-flight 任务完成,然后进程自然退出。
无需手动 kill 进程。
强制下线(Force Evict)
Worker 收到 EvictWorkerCommand(force=True) 后,立即退出进程。
in-flight 任务被中断,客户端等待超时后收到失败响应。
XREADGROUP 消费但未 XACK 的消息
留在 PEL(Pending Entry List)中。框架不做重投递(at-most-once 语义),
消息超时失败,客户端需自行处理重试。
Denylist 订阅管控
基于黑名单(Denylist)语义,默认开放:
key 不存在 = 该 agent_type 对所有 Worker 开放;加入 denylist = 禁止订阅该类型消息。
# 只允许 worker-new 处理 gpt-4o 任务,其余 worker 全部禁止 all_workers = ["worker-a", "worker-b", "worker-c"] for w in all_workers: await manager.deny_worker_for_type("gpt-4o", w) # 验证 denylist denied = await manager.get_type_denylist("gpt-4o") # → ["worker-a", "worker-b", "worker-c"] # 灰度完成后,恢复所有权限 for w in all_workers: await manager.allow_worker_for_type("gpt-4o", w)
deny_worker_for_type 最多有 5 秒延迟——
Worker 的心跳线程每 5s 刷新一次内存中的 denylist 缓存,刷新前仍可能消费该 agent_type 的消息。
实现原理
Worker 启动后,_active_agent_type_streams() 从内存
frozenset[denied_agent_types] 中过滤订阅列表(纯内存 O(N),无 Redis 调用),
心跳线程每次刷新该缓存。这避免了高消息速率下每次 consume 循环调用 N 次 SISMEMBER 的开销。
Worker 侧行为对照
或 ≤5s(兜底)
或 ≤5s(兜底)
或 ≤5s(兜底)
或 ≤5s(兜底)
注意事项
Worker 重启后状态保留
registry:worker:admin:{worker_id} HASH 无 TTL,Worker 重启后仍存在。
若希望重启后的 Worker 以默认状态(active)运行,需在重启前或重启后调用:
await manager.clear_worker_admin_state("worker-abc") # 或 await manager.resume_worker("worker-abc")
不会中断正在执行的 LLM 调用
suspend 和 evict(non-force)只影响新消息的消费,
不会中断正在进行的 process_command() 调用、LLM 流式输出或任何 in-flight 操作。
at-most-once 消息语义
消息被 XREADGROUP 消费后即归属该 Worker。
Worker 崩溃或被 force-evict 时,消息留在 PEL 中,框架不做重投递。
客户端等待超时后收到失败响应,重试由应用层决策——这避免了 LLM 调用被重复触发。
evict 后无需手动 kill
Worker 收到 evict 命令后会自行完成退出流程(graceful 等任务完成,force 立即退出),
无需额外 kill 进程操作。