Metadata-Version: 2.4
Name: harness-gateway
Version: 0.8.1
Summary: Multi-platform IM channel bridge with unified message abstraction for AI agents and bots
Project-URL: Homepage, https://github.com/orcakit/harness-gateway
Project-URL: Repository, https://github.com/orcakit/harness-gateway
Project-URL: Issues, https://github.com/orcakit/harness-gateway/issues
Project-URL: Changelog, https://github.com/orcakit/harness-gateway/blob/main/CHANGELOG.md
Author: orcakit
License: MIT
License-File: LICENSE
Keywords: bot,bridge,channel,dingtalk,discord,feishu,im,qq,wechat
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Communications :: Chat
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.11
Requires-Dist: aiohttp>=3.9
Requires-Dist: dingtalk-stream>=0.24.3
Requires-Dist: discord-py>=2.3
Requires-Dist: httpx>=0.27.0
Requires-Dist: lark-oapi>=1.5.3
Requires-Dist: paho-mqtt>=2.0.0
Requires-Dist: pydantic>=2.0
Requires-Dist: python-dotenv>=1.0
Requires-Dist: python-socketio[asyncio-client]>=5.0
Requires-Dist: python-telegram-bot>=21.0
Requires-Dist: websocket-client>=1.6.0
Requires-Dist: websockets>=12.0
Requires-Dist: wecom-aibot-sdk>=1.0.7
Provides-Extra: dev
Requires-Dist: langchain-core>=1.4.0; extra == 'dev'
Requires-Dist: langchain-openai>=1.2.2; extra == 'dev'
Requires-Dist: langgraph>=1.2.1; extra == 'dev'
Requires-Dist: mypy>=1.9; extra == 'dev'
Requires-Dist: openai>=2.38.0; extra == 'dev'
Requires-Dist: orcakit-harness-agent>=0.1; extra == 'dev'
Requires-Dist: pre-commit>=3.7; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: examples
Requires-Dist: langchain-core>=1.4.0; extra == 'examples'
Requires-Dist: langchain-openai>=1.2.2; extra == 'examples'
Requires-Dist: langgraph>=1.2.1; extra == 'examples'
Requires-Dist: openai>=2.38.0; extra == 'examples'
Requires-Dist: orcakit-harness-agent>=0.1; extra == 'examples'
Description-Content-Type: text/markdown

<p align="center">
  <img src="assets/images/banner.jpeg" alt="Harness Gateway Banner" width="100%" />
</p>

<h1 align="center">Harness Gateway</h1>

<p align="center">
  <strong>Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.</strong>
</p>

<p align="center">
  <a href="https://pypi.org/project/harness-gateway/"><img src="https://img.shields.io/pypi/v/harness-gateway" alt="PyPI" /></a>
  <a href="https://github.com/orcakit/harness-gateway/actions/workflows/ci.yml"><img src="https://github.com/orcakit/harness-gateway/actions/workflows/ci.yml/badge.svg" alt="CI" /></a>
  <a href="https://pypi.org/project/harness-gateway/"><img src="https://img.shields.io/pypi/pyversions/harness-gateway" alt="Python" /></a>
  <a href="LICENSE"><img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="License: MIT" /></a>
</p>

<p align="center">
  <b>English</b> · <a href="README_CN.md">中文</a>
</p>

---

## Highlights

- **9 Platforms, One Processor** — Write one async generator, run your bot on Feishu, QQ, WeCom, DingTalk, WeChat iLink, Yuanbao, Xiaoyi, MQTT, and Telegram
- **Streaming-First** — `DELTA` events accumulate token-by-token; `COMPLETED` triggers merge, `<think>` cleanup, and send
- **Platform Constraints Handled** — Reply timeouts, rate limits, and typing indicators managed per platform
- **Proactive Push** — Bot-initiated messages via `ChannelSubject` routing for notifications and scheduled tasks
- **LLM Integration Ready** — OpenAI streaming, LangGraph agents, and any async-generator processor pattern

---

## Overview

Harness Gateway lets you write one processor function and deploy your AI bot across multiple messaging platforms. The library handles platform transport (WebSocket, HTTP, MQTT, long-poll), message normalization, media persistence, rate limiting, and reply-timeout strategies — so your bot logic stays platform-agnostic.

```python
async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Received: {message.text}")
    yield MessageEvent.completed()
```

---

## Core Technology

| Component | Technology | Purpose |
|-----------|-----------|---------|
| Message Model | Pydantic v2 | Typed `InboundMessage` / `MessageEvent` / `ContentPart` |
| Routing | `ChannelSubject` | Unified outbound handle (`subject_id` + platform metadata) |
| Transport | Platform SDKs + aiohttp | WebSocket, Stream, HTTP, MQTT, long-poll |
| Orchestration | asyncio + worker pools | Multi-channel queues with session-aware batching |
| Constraints | Sliding window + timeout guards | Rate limits, reply timeouts, typing keepalive |
| Media | `MediaBackend` | Pluggable storage; per-channel `fetch_remote_media` for auth |

---

## Features

- **Unified Model** — `InboundMessage` / `MessageEvent` / `ContentPart` across all platforms
- **Async Generator Pattern** — Receive messages, yield response events
- **Multi-Channel** — Run several platforms simultaneously with session-aware batching
- **Streaming Support** — `DELTA` for token-by-token LLM output, merged on `COMPLETED`
- **`<think>` Filtering** — Reasoning-model thinking blocks stripped by default
- **Localized Tool Hints** — Pass `tool_hint_text` in event metadata, or fall back to templates
- **Platform Constraints** — Per-channel reply timeouts, rate limits, typing indicators
- **Proactive Push** — `push_text` / `push_content` / `push_to_all` via `ChannelSubject`
- **Media Handling** — Inbound auto-persist + outbound `load_media_bytes` through `MediaBackend`
- **Multi-Tenant** — Optional `tenant_id` per channel instance for session isolation
- **Extensible** — Subclass `BaseChannel` and register via `add_channel(instance)`

---

## Supported Platforms

| Platform | `channel_type` | Transport | Text | Image | File | Push |
|----------|---------------|-----------|:----:|:-----:|:----:|:----:|
| Feishu (Lark) | `feishu` | WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| QQ | `qq` | WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| WeCom (Enterprise WeChat) | `wecom` | WebSocket + stream reply | ✅ | ⚠️ | ⚠️ | ✅ |
| DingTalk | `dingtalk` | Stream + REST | ✅ | ✅ | ✅ | ✅ |
| WeChat iLink | `weixin` | HTTP long-poll | ✅ | ⚠️ | ⚠️ | ✅ |
| Yuanbao (腾讯元宝) | `yuanbao` | HTTP REST | ✅ | ✅ | ✅ | ✅ |
| Xiaoyi (小艺) | `xiaoyi` | WebSocket (primary + backup) | ✅ | ✅ | ✅ | ✅ |
| MQTT | `mqtt` | pub/sub broker | ✅ | ⚠️ | ⚠️ | ✅ |
| Telegram | `telegram` | long-polling | ✅ | ✅ | ✅ | ✅ |

> ✅ = supported · ⚠️ = URL or text fallback (no native bytes on some platforms)

See [README_CN.md](README_CN.md) for a detailed capability matrix (group chat, audio/video, Markdown, etc.).

### Examples

| Platform | Example script | Key env vars |
|----------|---------------|--------------|
| Telegram | `examples/telegram_bot.py` | `TELEGRAM_BOT_TOKEN` |
| MQTT | `examples/mqtt_bot.py` | `MQTT_HOST`, `MQTT_USERNAME`, `MQTT_PASSWORD` |
| Xiaoyi | `examples/xiaoyi_bot.py` | `XIAOYI_AK`, `XIAOYI_SK`, `XIAOYI_AGENT_ID` |
| QQ | `examples/qq_bot.py` | `QQ_APP_ID`, `QQ_TOKEN`, `QQ_SECRET` |
| Feishu | `examples/feishu_bot.py` | `FEISHU_APP_ID`, `FEISHU_APP_SECRET` |
| WeCom | `examples/wecom_bot.py` | `WECOM_BOT_ID`, `WECOM_SECRET` |
| Weixin | `examples/weixin_bot.py` | `WEIXIN_ACCOUNT_ID`, `WEIXIN_TOKEN` |
| Multi-channel | `examples/all_channels.py` | Combined credentials |

Copy `.env.example` to `.env` and fill in credentials before running examples.

---

## Quick Start

### Installation

```bash
pip install harness-gateway

# With example / agent integration deps
pip install "harness-gateway[examples]"
```

### Minimal Echo Bot (Telegram)

```python
import asyncio
import os
from collections.abc import AsyncIterator

from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.telegram import TelegramConfig

async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {message.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=echo)
    await manager.start()
    await manager.add_telegram_channel(
        TelegramConfig(bot_token=os.environ["TELEGRAM_BOT_TOKEN"])
    )
    await asyncio.Event().wait()

asyncio.run(main())
```

### Multi-Platform

```python
import asyncio
import os
from collections.abc import AsyncIterator

from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.dingtalk import DingTalkConfig
from harness_gateway.channels.feishu import FeishuConfig
from harness_gateway.channels.qq import QQConfig

async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
    await manager.start()

    await manager.add_feishu_channel(
        FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
    )
    await manager.add_qq_channel(
        QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
    )
    await manager.add_dingtalk_channel(
        DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
    )
    await asyncio.Event().wait()

asyncio.run(main())
```

---

## Streaming + LLM Integration

### Event Types

| Event | Purpose | Channel Behavior |
|-------|---------|-----------------|
| `MESSAGE` | Complete message | Send immediately |
| `DELTA` | Token-by-token LLM output | Accumulate, merge on `COMPLETED` |
| `TOOL_START` | Tool call starting | Status hint (configurable) |
| `TOOL_END` | Tool call finished | Optional completion hint |
| `TYPING` | Typing indicator | Platform typing status |
| `ERROR` | Error occurred | Send error message |
| `COMPLETED` | Stream ended | Flush delta buffer → strip thinking → send |

### OpenAI Streaming

```python
from openai import AsyncOpenAI
from harness_gateway import InboundMessage, MessageEvent

client = AsyncOpenAI(api_key="sk-xxx")

async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.typing()

    stream = await client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
    )

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            yield MessageEvent.delta(chunk.choices[0].delta.content)

    yield MessageEvent.completed()
```

### Localized Tool Hints

By default, `TOOL_START` / `TOOL_END` use English templates from `ChannelConstraints`. Pass a pre-localized string in event metadata to override:

```python
yield MessageEvent.tool_start(
    "read_file",
    tool_hint_text="🔧 正在调用工具：读取文件",
)
```

Disable hints per channel: `ChannelConstraints(show_tool_hints=False)`.

### `<think>` Auto-Filtering

Reasoning models (DeepSeek-R1, QwQ, etc.) emit `<think>...</think>` blocks. Harness Gateway strips them before delivery unless `show_thinking=True`.

---

## Platform Constraints

Default constraints vary by platform. Override via `add_*_channel(..., constraints=...)` or `channel.constraints` at runtime.

| Platform | Reply Timeout | Rate Limit | Typing Keepalive | Notes |
|----------|--------------|------------|------------------|-------|
| WeCom | 10s | 2 / 5s | — | Stream reply + placeholder on timeout |
| WeChat iLink | 5s | 5 / 60s | 5s | `sendtyping` API |
| Telegram | — | 20 / 60s | 4s | When `show_typing=True` |
| MQTT | — | 30 / 60s | — | URL-based media payload |
| Xiaoyi | — | 20 / 60s | — | WebSocket dual connection |
| Feishu / QQ / DingTalk / Yuanbao | — | — | — | Use custom constraints as needed |

```python
from harness_gateway import ChannelConstraints

constraints = ChannelConstraints(
    reply_timeout=10.0,
    send_rate_limit=(2, 5.0),
    timeout_strategy="placeholder",
    show_thinking=False,
    show_tool_hints=True,
    tool_hint_template="🔧 Calling tool: {tool_name}",
)
```

---

## Media Backend

Inbound media is downloaded (with per-platform auth when needed), saved through `MediaBackend`, and stamped on `ContentPart.local_path`. Outbound sends read bytes via `load_media_bytes` (`data` → `local_path` → `url`).

```python
from harness_gateway import ChannelManager, FileSystemMediaBackend

media = FileSystemMediaBackend("/tmp/harness-gateway/media")
manager = ChannelManager(processor=my_bot, media_backend=media)
```

Override `fetch_remote_media()` on your channel subclass when platform URLs require auth headers.

---

## Proactive Push

Outbound routing uses `ChannelSubject` — auto-tracked on every inbound message (`channel_subject.subject_id` identifies the user).

```python
# Single user (subject must have interacted before, or supply routing metadata)
subject = channel.get_subject("user_open_id")
if subject:
    await manager.push_text(channel_id, subject, "Your build completed ✅")

# Rich content
from harness_gateway import ImageContent, TextContent

await manager.push_content(channel_id, subject, [
    TextContent(text="📊 Daily Report:"),
    ImageContent(url="https://charts.example.com/daily.png"),
])

# Broadcast to all known subjects on a channel
await manager.push_to_all(channel_id, "👋 Scheduled greeting!")
```

---

## Multi-Tenant Support

Run multiple instances of the same platform with isolated `tenant_id`:

```python
from harness_gateway.channels.feishu import FeishuConfig

t1_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)

async def bot(msg: InboundMessage):
    # msg.tenant_id → "tenant1" or "tenant2"
    # msg.channel_id → UUID instance key
    yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
    yield MessageEvent.completed()
```

---

## Custom Channels

Subclass `BaseChannel`, implement `start` / `stop` / `_send_*` / `parse_inbound`, then register:

```python
from dataclasses import dataclass

from harness_gateway import BaseChannel, ChannelConfig, ChannelSubject, ContentPart, InboundMessage

@dataclass
class MyPlatformConfig(ChannelConfig):
    api_key: str = ""

class MyPlatformChannel(BaseChannel):
    channel_type = "myplatform"

    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def _send_text(self, subject: ChannelSubject, text: str) -> None: ...
    async def _send_content(self, subject: ChannelSubject, parts: list[ContentPart]) -> None: ...
    async def _send_media(self, subject: ChannelSubject, media: ContentPart) -> None: ...
    def parse_inbound(self, raw_payload) -> InboundMessage: ...

# Register an instance
channel = MyPlatformChannel(processor, MyPlatformConfig(api_key="..."))
channel_id = await manager.add_channel(channel)
```

See `AGENTS.md` for the full channel implementation specification.

---

## Architecture

```
┌─────────────────┐
│  IM Platform    │   Feishu / QQ / WeCom / DingTalk / Telegram / MQTT / ...
└────────┬────────┘
         │ WebSocket / HTTP / MQTT
         ▼
┌─────────────────┐
│  BaseChannel    │   parse_inbound() → InboundMessage
│  (impl)         │   _send_text() / _send_content() / _send_media()
└────────┬────────┘
         │ enqueue
         ▼
┌─────────────────┐
│ ChannelManager  │   Queues + workers + session batching + MediaBackend
└────────┬────────┘
         │ InboundMessage
         ▼
┌─────────────────┐
│ Your Processor  │   async def(InboundMessage) → AsyncIterator[MessageEvent]
└────────┬────────┘
         │ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
         ▼
┌─────────────────┐
│ BaseChannel     │   Constraints + delta merge → platform API
└─────────────────┘
```

---

## Development

**Prerequisites:** Python 3.11+, [uv](https://docs.astral.sh/uv/)

```bash
git clone https://github.com/orcakit/harness-gateway.git
cd harness-gateway
make install           # uv sync --extra dev
make all               # lint + typecheck + test (CI ship bar)
```

| Command | Description |
|---------|-------------|
| `make lint` | Ruff check + format check |
| `make format` | Ruff auto-fix + format |
| `make typecheck` | mypy src |
| `make test` | pytest -m "not integration" |
| `make test-integration` | pytest -m integration (requires network) |
| `make build` | wheel + sdist → dist/ |
| `make clean` | remove build artifacts and caches |

---

## Contributing

Contributions are welcome! Please read [CONTRIBUTING.md](CONTRIBUTING.md) and run `make all` before opening a PR.

Security issues: see [SECURITY.md](SECURITY.md).

---

## Related Projects

| Project | Description |
|---------|-------------|
| [Octop](https://github.com/orcakit/orca) | Self-hosted multi-user AI control plane |
| [harness-agent](https://github.com/orcakit/harness-agent) | Production-grade AI agent platform built on LangChain Deep Agents |
| [harness-memory](https://github.com/orcakit/harness-memory) | Pluggable memory system with hierarchical recall and FTS |
| [harness-browser](https://github.com/orcakit/harness-browser) | AI-friendly browser automation via CDP |

---

## License

[MIT](LICENSE)
