Metadata-Version: 2.4
Name: by-framework
Version: 0.1.10
Summary: 分布式 Agent 调度框架
License-File: LICENSE
Requires-Python: >=3.12
Requires-Dist: httpx>=0.28.1
Requires-Dist: redis>=7.0.0
Requires-Dist: typing-extensions>=4.0.0
Provides-Extra: dev
Requires-Dist: isort>=5.13.0; extra == 'dev'
Requires-Dist: pre-commit>=4.0.0; extra == 'dev'
Requires-Dist: pyink>=24.0.0; extra == 'dev'
Requires-Dist: pylint>=3.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.3.0; extra == 'dev'
Description-Content-Type: text/markdown

# 🚀 by-framework-python

<div align="center">

[![Version](https://img.shields.io/badge/version-0.1.0-blue.svg)](pyproject.toml)
[![Python](https://img.shields.io/badge/python-3.12+-yellow.svg)](pyproject.toml)
[![Redis](https://img.shields.io/badge/redis-7.0+-red.svg)](pyproject.toml)
[![License](https://img.shields.io/badge/license-Apache_2.0-green.svg)](LICENSE)

</div>

<div align="center">

[**English**](README.md) | [**中文**](README_zh.md)

</div>

---

**by-framework** is a distributed Agent scheduling framework built on Redis Streams. It provides worker orchestration, session-scoped runtime state, and plugin-based agent capability registration for AI agent systems.

---

## Table of Contents

- [✨ Key Features](#-key-features)
- [🏗️ Architecture](#️-architecture)
- [📦 Installation](#-installation)
- [🚀 Quick Start](#-quick-start)
- [💡 Deep Dive](#-deep-dive)
- [🔌 Plugin System](#-plugin-system)
- [📡 Sending Tasks](#-sending-tasks)
- [🧪 Examples](#-examples)
- [🛠️ Configuration Reference](#️-configuration-reference)
- [📚 API Reference](#-api-reference)
- [🧩 Advanced Capabilities](#-advanced-capabilities)
- [🚀 Deployment Guide](#-deployment-guide)

---

## ✨ Key Features

- ⚡ **Native Async**: Built on Python `asyncio`, perfectly suited for I/O-intensive Agent tasks.
- 🧩 **Highly Plugin-Based**: Plugin system with agent config registration for tools, prompts, skills, and lifecycle hooks.
- 📊 **State Management**: Complete `AgentContext` support for easy streaming output, state transitions, and artifact handling.
- 🔄 **Decoupled Architecture**: Separates control streams from data streams for scalable Worker orchestration.
- 🎯 **Agent Type Routing**: Workers declare supported `agent_types` via `get_agent_types()` for routing and liveness checks.

---

## 🏗️ Architecture

The system uses event-driven design with high decoupling:

```
┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│   Client    │──────▶│  Redis Input │──────▶│   Gateway    │
│ (Gateway)   │       │     MQ       │       │   Worker     │
└─────────────┘       └──────────────┘       └──────┬───────┘
        ▲                                              │
        │                                              │
        │                                              ▼
┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│  Consumer   │◀─────│  Redis Data  │◀─────│   Business   │
│ / Backend   │       │   Streams    │       │   Logic      │
└─────────────┘       └──────────────┘       └──────────────┘
```

### Core Components

- **Access Layer**: `GatewayClient` publishes control commands to Redis Input MQ.
- **Scheduling Layer**: Uses Redis Stream for competitive consumption and routing among Worker clusters.
- **Execution Layer**: `GatewayWorker` actively pulls tasks and executes business logic in isolated workspaces.
- **Output Layer**: Data is asynchronously pushed to session-scoped data streams for downstream consumers.

### Worker Routing Semantics

There are three layers of routing semantics:

- **membership**: Worker declares which `agent_types` it supports. This is a static relationship, only updated at startup and graceful shutdown.
- **online / heartbeat**: Whether the Worker can currently accept tasks. Only online Workers are considered valid send targets.
- **worker_id lock**: Prevents duplicate startup of the same `worker_id`. This is instance mutex, not part of agent type routing.

Production main path uses agent type stream:

- Client writes to `byai_gateway:ctrl:agent_type:{agent_type}`
- Multiple Workers under the same agent type consume competitively via Redis consumer group
- Only checks "whether at least one online Worker exists for this agent type" before sending
- Does not pre-select a specific Worker before sending

Debug or direct control path uses worker stream:

- When `target_worker_id` is explicitly provided, messages are written to `byai_gateway:ctrl:worker:{worker_id}`
- This path is only for debug, direct dispatch, or worker-level control commands
- Explicitly checks if the worker is online before sending

### Data Flow

```
User Request
    ↓
Gateway (write to control stream)
    ↓
Worker (consume control stream, process task)
    ↓
Redis Stream (write to session data stream)
    ↓
Backend or consumer (read session data stream)
    ↓
Frontend (render real-time AI response)
```

---

## 📦 Installation

### Prerequisites

- Python 3.12+
- Redis 7.0+ (for message queue)

### Install via pip

```bash
# Basic installation
pip install by-framework

# Development mode installation
pip install -e ".[dev]"
```

### Install via uv (Recommended)

```bash
# Clone the project and install all dependencies
cd by-framework-python
uv sync
```

### Workspace Development

This repository uses `uv workspace` to manage local package development.

Common commands:

```bash
# Sync workspace dependencies in repository root
uv sync

# Run package tests
uv run pytest tests
```

## 🚀 Quick Start

### 1. Create a Simple Agent Worker

Create `my_agent.py`:

```python
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class MyAssistant(GatewayWorker):
    def get_agent_types(self):
        # Declare the Agent types this Worker can handle
        return ["weather_agent", "chat_agent"]

    async def process_command(self, command, context: AgentContext):
        # Send streaming text chunks
        await context.emit_chunk("Processing your request...\n")

        # Simulate time-consuming operation
        await asyncio.sleep(0.5)

        # Update task state
        await context.emit_state("thinking")

        # Read request content from the command payload
        user_input = (
            command.content if isinstance(command.content, str) else str(command.content)
        )

        # Send thinking process
        await context.emit_chunk(f"I received: {user_input}\n")
        await asyncio.sleep(0.3)

        # Send final result
        await context.emit_chunk("This is my reply!")

        return {
            "status": "success",
            "message": "Task completed",
            "data": {"answer": "The weather is sunny today"}
        }

if __name__ == "__main__":
    run_worker(
        worker_class=MyAssistant,
        worker_id="worker-01",
        redis_host="localhost",
        redis_port=6379,
    )
```

### 2. Start Redis

```bash
docker run -d -p 6379:6379 redis:7-alpine
```

### 3. Start Worker

```bash
cd by-framework-python
uv run python my_agent.py
```

### 4. Send a Test Task

Create `send_task.py`:

```python
import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def send_task():
    # Initialize shared Redis client and registry
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)

    # ByaiGatewayClient accepts redis_client / registry
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session-001",
        content="How's the weather in Beijing today?",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(send_task())
```

Run:

```bash
uv run python send_task.py
```

---

## 💡 Deep Dive

### GatewayWorker Base Class

`GatewayWorker` is the base class for all custom Workers. You need to implement the following methods:

| Method | Required | Description |
|--------|----------|-------------|
| `get_agent_types()` | Yes | Returns list of Agent types this Worker can handle |
| `process_command(command, context)` | Yes | Handle specific business logic |

### AgentContext

`AgentContext` provides the ability to interact with the runtime environment:

```python
from by_framework import AgentContext, ArtifactEvent


async def process_command(self, command, context: AgentContext):
    # 1. Send streaming output
    await context.emit_chunk("Processing...")

    # 2. Send artifacts/structured data
    await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))

    # 3. Get message ID and session ID
    msg_id = context.message_id
    session_id = context.session_id

    # 4. Call other Agents (supports suspending current task waiting for reply)
    result = await context.call_agent(
        target_agent_type="translator_agent",
        content="Hello",
        wait_for_reply=True
    )
```

### Commands and Message Protocol

#### AskAgentCommand (Task Command)

```python
from by_framework.core.protocol.commands import AskAgentCommand
from by_framework.core.protocol.message_header import MessageHeader

command = AskAgentCommand(
    header=MessageHeader(
        message_id="msg_123",
        session_id="sess_456",
        target_agent_type="weather_agent"
    ),
    content="Query Beijing weather",
    extra_payload={
        "location": "Beijing"
    }
)
```

#### Common EventType Values

| Event Type | Description |
|------------|-------------|
| `answerDelta` | Incremental answer content |
| `reasoningLogDelta` | Reasoning or intermediate log output |
| `appStreamResponse` | Marks stream completion |
| `taskCreate` | Task creation related event |
| `taskStop` | Task termination related event |

---

## 🔌 Plugin System

Plugins are the foundation of By-Framework's extensibility. You can register tools, prompt templates, etc. through plugins.

### Plugin Directory Structure

```
my_plugins/
├── weather_plugin.py
├── calculator_plugin.py
└── custom_hooks.py
```

### Writing a Plugin

Create `my_cool_plugin.py`:

```python
from by_framework import AgentConfig, AgentContext, Plugin, PluginBuildContext, PluginManifest
from typing import Any

class WeatherPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(
            plugin_id="weather_plugin",
            version="1.0.0",
        ))

    async def register_agent_configs(self, build_context: PluginBuildContext) -> list[AgentConfig]:
        # Plugin registers capabilities by returning a list of AgentConfig
        config = AgentConfig(
            agent_id="weather_assistant",
            tools={
                "get_current_weather": self._get_weather,
                "get_forecast": self._get_forecast
            },
            prompts={
                "system_prompt": "You are a weather assistant..."
            }
        )
        return [config]

    async def _get_weather(self, city: str) -> dict[str, Any]:
        """Get current weather"""
        # In real projects, this would call a real weather API
        return {
            "city": city,
            "temperature": 25,
            "condition": "Sunny",
            "humidity": 60
        }

    async def _get_forecast(self, city: str, days: int = 3) -> list[dict]:
        """Get weather forecast"""
        return [
            {"day": 1, "high": 28, "low": 18, "condition": "Sunny"},
            {"day": 2, "high": 26, "low": 16, "condition": "Cloudy"},
            {"day": 3, "high": 24, "low": 14, "condition": "Overcast"}
        ][:days]

    # Plugin lifecycle hooks
    async def on_task_start(self, context: AgentContext):
        """Called when task starts"""
        print(f"Task {context.message_id} started")

    async def on_task_complete(self, context: AgentContext, result: Any):
        """Called when task completes successfully"""
        print(f"Task {context.message_id} completed")

    async def on_task_error(self, context: AgentContext, error: Exception):
        """Called when task encounters an error"""
        print(f"Task {context.message_id} error: {error}")
```

### Using Plugins

Method 1: Pass via plugin_list parameter

```python
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_list=[WeatherPlugin()]
)
```

Method 2: Configure via plugin_configurator callback

```python
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

def configure_plugins(registry):
    registry.register_bundle(WeatherPlugin())

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_configurator=configure_plugins
)
```

Method 3: Load plugin modules from a directory

```python
run_worker(
    worker_class=MyAssistant,
    plugin_dir="./my_plugins"  # Scans .py files in this directory once at startup
)
```

---

## 📡 Sending Tasks

### Using ByaiGatewayClient

`ByaiGatewayClient` is a wrapper around `GatewayClient` that uses shared Byai codec for message serialization by default, supporting higher-level message protocols.

```python
import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def main():
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session_123",
        user_code="user_123",
        content="Query today's weather in Beijing",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(main())
```

### Sending Path Explanation

`GatewayClient.send_message(...)` has two modes:

- Default agent type mode: Writes to agent type stream based on `target_agent_type`, and verifies if an online worker exists when `require_online_worker=True`.
- Direct worker mode: When `target_worker_id` is provided, writes directly to worker stream, suitable for debug or direct control.

This means:

- `response.target_worker_id` may be empty in agent type mode, because the actual worker is only determined when a consumer in that agent type reads the message.
- If canceling a task that has already started executing, the execution registry will fill in the `worker_id` when the worker truly starts processing.

---

## 🧪 Examples

### Example 1: Basic Streaming Output

```python
class StreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["streaming_demo"]

    async def process_command(self, command, context: AgentContext):
        text = "This is a sample text for streaming output."

        for char in text:
            await context.emit_chunk(char)
            await asyncio.sleep(0.05)

        return {"status": "done"}
```

### Example 2: Registering Plugin Capabilities

Tools, prompts, and skills are registered through the plugin mechanism and exposed through `AgentConfig`.

```python
from by_framework import AgentConfig, GatewayWorker, Plugin, PluginBuildContext, PluginManifest


class CalculatorPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(plugin_id="calculator"))

    async def register_agent_configs(
        self, build_context: PluginBuildContext
    ) -> list[AgentConfig]:
        return [
            AgentConfig(
                agent_id="tool_demo",
                tools={"calculate": self.calculate},
            )
        ]

    async def calculate(self, a: float, b: float, op: str) -> float:
        if op == "+":
            return a + b
        if op == "-":
            return a - b
        if op == "*":
            return a * b
        if op == "/":
            return a / b if b != 0 else 0
        return 0


class ToolAgent(GatewayWorker):
    def get_agent_types(self):
        return ["tool_demo"]

    async def process_command(self, command, context: AgentContext):
        config = context.get_agent_config("tool_demo")
        await context.emit_chunk(
            f"Registered tools: {list(config.tools.keys()) if config else []}"
        )
        return {"status": "success"}
```

## 🧩 Advanced Capabilities

### User-in-the-Loop Flows

Workers can suspend execution and wait for user input through `context.ask_user(...)`. The follow-up reply is delivered back to the worker as a `ResumeCommand`.

```python
from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand


class ApprovalAgent(GatewayWorker):
    def get_agent_types(self):
        return ["approval_agent"]

    async def process_command(self, command, context: AgentContext):
        if isinstance(command, ResumeCommand):
            await context.emit_chunk(f"User replied: {command.content}")
            return {"status": "completed"}

        return await context.ask_user(
            AskUserEvent(prompt="Please confirm the deployment window.")
        )
```

### Scatter-Gather Dispatch

`dispatch_group(...)` can enqueue multiple subtasks under one task group, and `collect_group_results(...)` can gather the callback payloads when they finish.

```python
tasks = [
    {"target_agent_type": "researcher", "content": "Collect references"},
    {"target_agent_type": "writer", "content": "Draft the summary"},
]

group = await context.dispatch_group(tasks, wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])
```

### Byai Typed Worker Layer

If your business payloads use BaiYing message objects, `ByaiWorker` and `ByaiAgentContext` provide typed content decoding/encoding on top of the generic worker runtime.

### Service Discovery Utilities

The repository also ships Redis-backed service discovery and a discovery-aware HTTP client:

- `ServiceRegistry` for service registration and heartbeat
- `DiscoveryClient` for cached service lookup and load balancing
- `DiscoveryHttpClient` for node-switching HTTP retries on discovered instances

These utilities live in [src/by_framework/core/discovery.py](/Users/xiaozhongcheng/data/company/beyonai/by-framework-python/src/by_framework/core/discovery.py) and [src/by_framework/util/discovery_http_client.py](/Users/xiaozhongcheng/data/company/beyonai/by-framework-python/src/by_framework/util/discovery_http_client.py).

---

## 🛠️ Configuration Reference

### run_worker Function Parameters

`run_worker` function supports rich configuration options:

| Parameter | Type | Description | Default |
| :--- | :--- | :--- | :--- |
| `worker_class` | `Type[GatewayWorker]` | **Required**. Business Worker class. | - |
| `worker_id` | `str` | Unique identifier for Worker instance. | `"worker-1"` |
| `redis_host` | `str` | Redis server address. | `"localhost"` |
| `redis_port` | `int` | Redis port. | `6379` |
| `redis_db` | `int` | Redis database number. | `0` |
| `redis_password` | `str` | Redis password (optional). | `None` |
| `redis_username` | `str` | Redis username (optional). | `None` |
| `workspace_dir` | `str` | Local working directory for task execution. | `"/tmp/gateway-workspace"` |
| `consumer_group` | `str` | Redis consumer group name. | `"agent_engines"` |
| `max_concurrency` | `int` | Maximum concurrent tasks per Worker. | `50` |
| `fetch_count` | `int` | Number of messages to batch fetch from Redis each time. | `10` |
| `redis_max_connections` | `int` | Maximum Redis connections. | `max_concurrency + 10` |
| `plugin_list` | `List[Plugin]` | Explicitly passed plugin list. | `None` |
| `plugin_configurator` | `Callable` | Plugin configuration callback function. | `None` |
| `plugin_hook_timeout_seconds` | `float` | Default timeout for plugin hooks. | `None` |
| `plugin_log_hook_stats_on_shutdown` | `bool` | Whether to log plugin hook stats on shutdown. | `True` |
| `plugin_dir` | `str` | Directory scanned once at startup for `.py` plugin modules. | `None` |

### Environment Variables

| Environment Variable | Description | Default |
|---------|------|-------|
| `BYAI_WORKER_CONCURRENCY` | Maximum concurrency | `50` |
| `BYAI_WORKER_FETCH_COUNT` | Batch fetch count | `10` |
| `BYAI_REDIS_MAX_CONNECTIONS` | Redis max connections | `max_concurrency + 10` |

---

## 📚 API Reference

### GatewayWorker

```python
class GatewayWorker:
    def get_agent_types(self) -> List[str]:
        """Return list of Agent types this Worker can handle"""
        pass

    async def process_command(self, command, context: AgentContext) -> Any:
        """Process command and return result"""
        pass
```

### AgentContext

```python
class AgentContext:
    session_id: str
    trace_id: str
    current_agent_id: str
    message_id: str
    parent_message_id: str

    async def emit_chunk(self, event: Union[StreamChunkEvent, str], event_type: Optional[str] = None):
        """Send text chunk or streaming event"""

    async def emit_state(self, event: Union[StateChangeEvent, str], event_type: Optional[str] = None):
        """Send state update"""

    async def emit_artifact(self, event: Union[ArtifactEvent, str], event_type: Optional[str] = None):
        """Send artifact/attachment"""

    async def ask_user(self, event: Union[AskUserEvent, str]) -> dict:
        """Send waiting for input request to user"""

    async def call_agent(self, target_agent_type: str, content: object, **kwargs) -> dict:
        """Call other Agent"""

    async def dispatch_group(self, tasks: list[dict], **kwargs) -> dict:
        """Dispatch task group"""

    async def get_active_workers(self) -> Dict[str, Any]:
        """Get all active workers in cluster"""
```

### GatewayClient / ByaiGatewayClient

```python
class GatewayClient:
    async def send_message(
        self,
        target_agent_type: str,
        session_id: str,
        content: Any,
        user_code: str = "",
        action_type: str = "ASK_AGENT",
        metadata: Optional[dict] = None,
        target_worker_id: Optional[str] = None,
        require_online_worker: bool = True,
    ) -> SendMessageResponse:
        """Send message, return response object"""

    async def cancel_task(self, message_id: str, session_id: str, reason: str = "") -> CancelTaskResponse:
        """Cancel specified task"""
```

## 🚀 Deployment Guide

### Single Machine Deployment

1. **Prepare Environment**

```bash
# Install dependencies
cd by-framework-python
uv sync
```

2. **Start Redis**

```bash
docker run -d --name gateway-redis \
  -p 6379:6379 \
  --restart unless-stopped \
  redis:7-alpine
```

3. **Start Worker**

```bash
uv run python -m by_framework \
  --worker-class my_agent.MyAgent \
  --worker-id worker-01 \
  --redis-host localhost
```

### Multi-Worker Deployment

To scale horizontally, run multiple worker processes with different `worker_id` values while sharing the same Redis instance and `target_agent_type` streams.

### Production Environment Recommendations

1. **Use Connection Pool**

```python
run_worker(
    worker_class=MyAgent,
    redis_max_connections=50
)
```

2. **Configure Monitoring**

```python
import logging

from by_framework.common.logger import setup_logging

setup_logging(level=logging.INFO, use_json=True)
```

### FAQ

**Q: How to ensure tasks are not lost?**

A: Redis Streams provides persistence mechanism. Workers use `XACK` to acknowledge message processing completion. Unacknowledged messages will be redelivered.

**Q: How to implement Worker load balancing?**

A: Multiple Workers connect to the same Redis Stream, and Redis automatically performs load distribution among consumers in the consumer group.

---

## 🗺️ Roadmap

- [ ] **Observability Dashboard**: Integrated UI for monitoring worker health and task streams.
- [ ] **Advanced Sandbox**: WASM-based execution environment for enhanced isolation.
- [ ] **Long-term Memory**: Native support for vector-database backed session memory.
- [ ] **Native LangGraph Integration**: Enhanced adapter for complex stateful multi-agent flows.

## 🤝 Contributing

Issues and Pull Requests are welcome! Please check our [CONTRIBUTING.md](CONTRIBUTING.md) for details.

---

## 📄 License

This project is licensed under Apache 2.0 License - see [LICENSE](LICENSE) file for details.

---

Maintained by **byai team**.

Questions or suggestions? Feel free to contact us!
