Metadata-Version: 2.4
Name: harness-im-bridge
Version: 0.1.0
Summary: Multi-platform IM channel bridge with unified message abstraction for AI agents and bots
Project-URL: Repository, https://github.com/orcakit/harness-im-bridge
Project-URL: Issues, https://github.com/orcakit/harness-im-bridge/issues
Author: orcakit
License: MIT
License-File: LICENSE
Keywords: bot,bridge,channel,dingtalk,discord,feishu,im,qq,wechat
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Communications :: Chat
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.11
Requires-Dist: aiohttp>=3.9
Requires-Dist: dingtalk-stream>=0.24.3
Requires-Dist: discord-py>=2.3
Requires-Dist: httpx>=0.27.0
Requires-Dist: lark-oapi>=1.5.3
Requires-Dist: paho-mqtt>=2.0.0
Requires-Dist: pydantic>=2.0
Requires-Dist: python-dotenv>=1.0
Requires-Dist: python-socketio[asyncio-client]>=5.0
Requires-Dist: python-telegram-bot>=21.0
Requires-Dist: websocket-client>=1.6.0
Requires-Dist: websockets>=12.0
Requires-Dist: wecom-aibot-sdk>=1.0.7
Provides-Extra: dev
Requires-Dist: harness-agent>=0.1; extra == 'dev'
Requires-Dist: langchain-core>=1.4.0; extra == 'dev'
Requires-Dist: langchain-openai>=1.2.2; extra == 'dev'
Requires-Dist: langgraph>=1.2.1; extra == 'dev'
Requires-Dist: mypy>=1.9; extra == 'dev'
Requires-Dist: openai>=2.38.0; extra == 'dev'
Requires-Dist: pre-commit>=3.7; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: examples
Requires-Dist: harness-agent>=0.1; extra == 'examples'
Requires-Dist: langchain-core>=1.4.0; extra == 'examples'
Requires-Dist: langchain-openai>=1.2.2; extra == 'examples'
Requires-Dist: langgraph>=1.2.1; extra == 'examples'
Requires-Dist: openai>=2.38.0; extra == 'examples'
Description-Content-Type: text/markdown

<p align="center">
  <img src="assets/images/banner.jpeg" alt="Harness IM Bridge Banner" width="100%" />
</p>

<h1 align="center">Harness IM Bridge</h1>

<p align="center">
  <strong>Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.</strong>
</p>

<p align="center">
  <a href="https://pypi.org/project/harness-im-bridge/"><img src="https://img.shields.io/pypi/v/harness-im-bridge" alt="PyPI" /></a>
  <a href="https://github.com/orcakit/harness-im-bridge/actions/workflows/ci.yml"><img src="https://github.com/orcakit/harness-im-bridge/actions/workflows/ci.yml/badge.svg" alt="CI" /></a>
  <a href="https://pypi.org/project/harness-im-bridge/"><img src="https://img.shields.io/pypi/pyversions/harness-im-bridge" alt="Python" /></a>
  <a href="LICENSE"><img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="License: MIT" /></a>
</p>

<p align="center">
  <b>English</b> · <a href="README_CN.md">中文</a>
</p>

---

## Highlights

- **9 Platforms, One Processor** — Write one async generator function, run your bot on Feishu, QQ, WeChat Work, DingTalk, Discord, WeChat, and more
- **Streaming-First** — Delta events accumulate token-by-token; `COMPLETED` triggers merge, `<think>` cleanup, and send
- **Platform Constraints Handled** — Reply timeouts, rate limits, and typing indicators managed transparently per platform
- **Proactive Push** — Bot-initiated messages for notifications, webhooks, and scheduled tasks
- **LLM Integration Ready** — Works with OpenAI streaming, LangGraph agents, and any async generator pattern

---

## Overview

Harness IM Bridge lets you write one processor function and deploy your AI bot across 9 messaging platforms simultaneously. The library handles platform-specific transport (WebSocket, HTTP, Socket.IO), message format translation, media upload/download, rate limiting, and reply timeout strategies — so your bot logic stays clean and platform-agnostic.

```python
async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Received: {message.text}")
    yield MessageEvent.completed()
```

---

## Core Technology

| Component | Technology | Purpose |
|-----------|-----------|---------|
| Message Model | Pydantic v2 | Typed `InboundMessage` / `MessageEvent` / `ContentPart` |
| Transport | Platform SDKs + aiohttp | WebSocket, HTTP callback, Socket.IO, streaming |
| Orchestration | asyncio + Worker pools | Multi-channel queues with session-aware batching |
| Constraints | Sliding window + timeout guards | Transparent rate limiting and reply timeout handling |
| Media | Per-platform abstraction | Unified upload/download across IM platforms |
| Registry | Decorator + filesystem discovery | `@channel` auto-registration for custom channels |

---

## Features

- **Unified Model** — `InboundMessage` / `MessageEvent` / `ContentPart` across all platforms
- **Async Generator Pattern** — Simple processor: receive messages, yield responses
- **Multi-Channel** — Run multiple platforms simultaneously with session-aware batching
- **Streaming Support** — DELTA events for token-by-token LLM output, merged on COMPLETED
- **`<think>` Auto-Filtering** — Reasoning model thinking content stripped automatically
- **Platform Constraints** — Reply timeouts, rate limits, typing indicators per platform
- **Proactive Push** — Bot-initiated messages for notifications and scheduled tasks
- **Media Handling** — Upload/download abstraction per platform (images, files, audio, video)
- **Session Awareness** — Same-session serial, cross-session parallel, prevents message reordering
- **Message Batching** — Rapid consecutive messages from the same user are auto-batched
- **Extensible** — `@channel` decorator + filesystem discovery for custom channels

---

## Supported Platforms

| Platform | channel_type | Transport | Text | Image | File | Push |
|----------|-------------|-----------|:----:|:-----:|:----:|:----:|
| Feishu (Lark) | `feishu` | WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| QQ | `qq` | WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| WeChat Work | `wecom` | WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| DingTalk | `dingtalk` | Stream + REST | ✅ | ✅ | ✅ | ✅ |
| Discord | `discord` | discord.py | ✅ | ✅ | ✅ | ✅ |
| WeChat iLink | `weixin` | HTTP Long-Poll | ✅ | ✅ | ✅ | ✅ |
| AgentChat | `agentchat` | Socket.IO | ✅ | ✅ | ✅ | ✅ |
| Yuanbao | `yuanbao` | HTTP API | ✅ | ✅ | ✅ | ✅ |
| Dashboard | `dashboard` | Console (stdout) | ✅ | — | — | — |

---

## Quick Start

### Installation

```bash
pip install harness-im-bridge
```

### Minimal Echo Bot

```python
import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.dashboard import DashboardConfig

async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {message.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=echo)
    await manager.start()
    channel_id = await manager.add_channel("dashboard", DashboardConfig())
    manager.enqueue(channel_id, "Hello World")
    await asyncio.sleep(0.5)
    await manager.stop()

asyncio.run(main())
```

### Real Platform (QQ Bot)

```python
import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.qq import QQConfig

async def bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=bot)
    await manager.start()
    channel_id = await manager.add_qq_channel(
        QQConfig(app_id="YOUR_APP_ID", token="YOUR_TOKEN", secret="YOUR_SECRET")
    )
    await asyncio.Event().wait()

asyncio.run(main())
```

### Multi-Platform

```python
import asyncio, os
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.feishu import FeishuConfig
from harness_im_bridge.channels.qq import QQConfig
from harness_im_bridge.channels.dingtalk import DingTalkConfig

async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
    await manager.start()

    await manager.add_feishu_channel(
        FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
    )
    await manager.add_qq_channel(
        QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
    )
    await manager.add_dingtalk_channel(
        DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
    )
    await asyncio.Event().wait()

asyncio.run(main())
```

---

## Streaming + LLM Integration

### Event Types

| Event | Purpose | Channel Behavior |
|-------|---------|-----------------|
| `MESSAGE` | Complete message | Send immediately |
| `DELTA` | Token-by-token LLM output | Accumulate, merge on COMPLETED |
| `TOOL_START` | Tool call starting | Show status hint (configurable) |
| `TOOL_END` | Tool call finished | Silent by default |
| `TYPING` | Typing indicator | Platform typing status |
| `ERROR` | Error occurred | Send error message |
| `COMPLETED` | Stream ended | Flush delta buffer → clean `<think>` → send |

### OpenAI Streaming

```python
from openai import AsyncOpenAI
from harness_im_bridge import InboundMessage, MessageEvent

client = AsyncOpenAI(api_key="sk-xxx")

async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.typing()

    stream = await client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
    )

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            yield MessageEvent.delta(chunk.choices[0].delta.content)

    yield MessageEvent.completed()
```

### `<think>` Auto-Filtering

Reasoning models (DeepSeek-R1, QwQ, etc.) output `<think>...</think>` blocks. Harness IM Bridge strips these automatically — users see only the final answer.

---

## Platform Constraints

Different IM platforms have reply timeouts and rate limits. Harness IM Bridge handles them transparently:

| Platform | Reply Timeout | Rate Limit | Strategy |
|----------|--------------|------------|----------|
| WeChat Work | 10s | ≤2 per 5s | Placeholder on timeout |
| WeChat Official | 5s | ≤5 per min | Placeholder + typing refresh |
| QQ | None | ≤20 per min | Rate-limit wait |
| Feishu | None | Relaxed | No constraint |
| Discord | None | ≤5 per 5s | Rate-limit wait |

Configure constraints per channel:

```python
from harness_im_bridge import ChannelConstraints

constraints = ChannelConstraints(
    reply_timeout=10.0,
    send_rate_limit=(2, 5.0),
    timeout_strategy="placeholder",
    show_thinking=False,
    show_tool_hints=True,
)
```

---

## Proactive Push

```python
# channel_id is the UUID returned by add_*_channel()
await manager.push_text(channel_id, "user_open_id", "Your build completed ✅")

# Push rich content
from harness_im_bridge import TextContent, ImageContent
await manager.push_content(channel_id, "user_openid", [
    TextContent(text="📊 Daily Report:"),
    ImageContent(url="https://charts.example.com/daily.png"),
])
```

---

## Multi-Tenant Support

Run multiple instances of the same platform — each with its own `tenant_id` for session isolation:

```python
from harness_im_bridge.channels.feishu import FeishuConfig

# Two Feishu tenants, session keys auto-isolated as feishu-{tenant_id}-{sender_id}
t1_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)

# InboundMessage carries tenant context
async def bot(msg: InboundMessage):
    print(msg.channel_type)  # "feishu"
    print(msg.tenant_id)     # "tenant1" or "tenant2"
    print(msg.channel_id)    # UUID instance ID
    yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
    yield MessageEvent.completed()
```
```

---

## Custom Channels

```python
from dataclasses import dataclass
from harness_im_bridge import BaseChannel, ChannelConfig, InboundMessage, ContentPart, MessageProcessor
from harness_im_bridge.registry import channel

@dataclass
class TelegramConfig(ChannelConfig):
    bot_token: str = ""

@channel("telegram")
class TelegramChannel(BaseChannel):
    channel_type = "telegram"

    def __init__(self, processor: MessageProcessor, config: TelegramConfig, *,
                 channel_id: str | None = None, tenant_id: str | None = None):
        super().__init__(processor, channel_id=channel_id, tenant_id=tenant_id)
        self._bot_token = config.bot_token

    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def _send_text(self, to_handle: str, text: str, meta=None) -> None: ...
    async def _send_content(self, to_handle: str, parts: list[ContentPart], meta=None) -> None: ...
    async def _send_media(self, to_handle: str, media: ContentPart, meta=None) -> None: ...
    def parse_inbound(self, raw_payload) -> InboundMessage: ...
```

---

## Architecture

```
┌─────────────────┐
│  IM Platform    │   Feishu / QQ / WeChat Work / DingTalk / Discord / ...
└────────┬────────┘
         │ WebSocket / HTTP / Socket.IO
         ▼
┌─────────────────┐
│  BaseChannel    │   parse_inbound() → InboundMessage
│  (impl)         │   send_text() / send_content()
└────────┬────────┘
         │ enqueue
         ▼
┌─────────────────┐
│ ChannelManager  │   Multi-channel queues + Worker pool + Session lock
└────────┬────────┘
         │ InboundMessage
         ▼
┌─────────────────┐
│ Your Processor  │   async def(InboundMessage) -> AsyncIterator[MessageEvent]
└────────┬────────┘
         │ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
         ▼
┌─────────────────┐
│ BaseChannel     │   Constraints: timeout guard + rate limiter + typing
│ (send reply)    │   Delta merge → platform API delivery
└─────────────────┘
```

---

## Development

```bash
make install      # Install dependencies
make test         # Run tests
make test-quick   # Unit tests only (fast)
make lint         # Lint + type check
make format       # Format code
make build        # Build wheel
```

---

## Related Projects

| Project | Description |
|---------|-------------|
| [harness-agent](https://github.com/orcakit/harness-agent) | Production-grade AI agent platform built on LangChain Deep Agents |
| [harness-memory](https://github.com/orcakit/harness-memory) | Pluggable memory system with hierarchical recall and FTS |
| [harness-browser](https://github.com/orcakit/harness-browser) | AI-friendly browser automation via CDP |

---

## License

[MIT](LICENSE)
