Metadata-Version: 2.4
Name: modbuslink
Version: 1.4.2
Summary: 现代化、功能强大、开发者友好且高度可扩展的Python Modbus库 | Modern, powerful, developer-friendly and highly scalable Python Modbus library
Author-email: Miraitowa-la <2056978412@qq.com>
License: MIT
Project-URL: Homepage, https://github.com/Miraitowa-la/ModbusLink
Project-URL: Repository, https://github.com/Miraitowa-la/ModbusLink
Project-URL: Issues, https://github.com/Miraitowa-la/ModbusLink/issues
Keywords: modbus,industrial,automation,communication,protocol,tcp,rtu,ascii,serial,plc,scada,iot
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Hardware
Classifier: Topic :: Communications
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE.txt
Requires-Dist: pyserial>=3.5
Requires-Dist: pyserial-asyncio>=0.6
Requires-Dist: typing_extensions>=4.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-mock>=3.0; extra == "dev"
Requires-Dist: black>=22.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Dynamic: license-file

# ModbusLink

<div align="center">

[![PyPI Downloads](https://static.pepy.tech/badge/modbuslink)](https://pepy.tech/projects/modbuslink)
[![PyPI version](https://badge.fury.io/py/modbuslink.svg)](https://badge.fury.io/py/modbuslink)
[![Python Version](https://img.shields.io/pypi/pyversions/modbuslink.svg)](https://pypi.org/project/modbuslink/)
[![License](https://img.shields.io/github/license/Miraitowa-la/ModbusLink)](LICENSE.txt)

**A Modern, High-Performance Python Modbus Library**

*Industrial-grade • Developer-friendly • Production-ready*

[English](#) | [中文版](README-zh_CN.md) | [Documentation](https://miraitowa-la.github.io/ModbusLink/en/index.html) | [Examples](#examples)

</div>

---


## ✨ Introduction

**ModbusLink** is a Python Modbus protocol library designed specifically for modern industrial automation. It abandons the complexity of traditional libraries, adopting a clear layered architecture (separation of Transport and Application layers) and modern Python features (Type Hints, Asyncio).

Whether building a high-performance **TCP Server** or developing an **RTU Client** connecting multiple sensors, ModbusLink provides robust and efficient support.

### Core Features
- **Full Async Support**: Built on Python's native `asyncio` to easily handle high-concurrency connections, while also providing a complete **Synchronous** interface.
- **Decoupled Architecture**: Separation of Transport layer and Protocol layer (Client/Server) supports dependency injection, making it easy to test and extend.
- **Bilingual Support**: Built-in English/Chinese logs and exception messages, switchable via the `language` module.
- **Type Safety**: The codebase is fully covered by Type Hints, offering an excellent development experience with IDE autocompletion.
- **Comprehensive Protocol**: Supports Modbus TCP, RTU, and ASCII protocols, covering both Client (Master) and Server (Slave) modes.
- **Advanced Coding**: Built-in `PayloadCoder` for easy reading/writing of complex data types like Float32, Int64, and String.

---

## 📝 Version Description 

- `1.2.x` and earlier versions
    - Not recommended for use
- `1.3.x`
    - [`1.3.0`](notes/1.3.0.md) RS485 mode support (#1)
    - [`1.3.1`](notes/1.3.1.md) Log language switching support (#2)
    - [`1.3.2`](notes/1.3.2.md) Language Switching Support Optimization (#2)
    - [`1.3.3`](notes/1.3.3.md) Adding missing functions for asynchronous client (#3)
    - [`1.3.4`](notes/1.3.4.md) Asynchronous concurrent safety fix (#4)
- `1.4.x`
    - [`1.4.0`](notes/1.4.0.md) Refactoring: Solves project redundancy issues, unifies naming conventions, and improves the performance and stability of core components
    - [`1.4.1`](notes/1.4.1.md) RS485 mode support restored
    - [`1.4.2`](notes/1.4.2.md) TCP transport flush() method - Resolves transaction ID mismatch after timeout

---

## 🚀 Quick Start

### Installation

Install the latest version using pip:

```bash
pip install modbuslink
```

### 30-Second Quick Experience

Here is a simple synchronous TCP client example reading holding registers:

```python
from modbuslink.client import SyncModbusClient
from modbuslink.transport import SyncTcpTransport

# 1. Create a transport layer instance
transport = SyncTcpTransport(host='127.0.0.1', port=502)

# 2. Create a client instance (Dependency Injection)
with SyncModbusClient(transport) as client:
    # 3. Read 10 holding registers starting at address 0 (Slave ID: 1)
    data = client.read_holding_registers(slave_id=1, start_address=0, quantity=10)
    print(f"Read Result: {data}")
```

---

## 📚 Comprehensive User Guide

The core design philosophy of ModbusLink is **"Decoupling"**. Client logic (`Client`) is separated from communication methods (`Transport`). This means you need to instantiate a transport object first and then pass it to the client object.

### 1. TCP Client

Suitable for connecting to PLCs or meters via Ethernet (Modbus TCP).

#### TCP Synchronous Client (Sync)

The most intuitive programming style, suitable for simple scripts or non-high-concurrency applications.

```python
from modbuslink.client import SyncModbusClient
from modbuslink.transport import SyncTcpTransport

def main():
    # 1. Configure Transport (IP and Port)
    transport = SyncTcpTransport(host='192.168.1.10', port=502, timeout=2.0)

    # 2. Inject Transport to create Client
    with SyncModbusClient(transport) as client:
        print("Connected successfully!")

        # Read 10 Holding Registers (Function Code 0x03)
        # slave_id: Unit ID (Usually 1 or 255 in TCP)
        regs = client.read_holding_registers(slave_id=1, start_address=100, quantity=10)
        print(f"Register Values: {regs}")

        # Write Single Coil (Function Code 0x05)
        client.write_single_coil(slave_id=1, address=0, value=True)
        print("Coil write complete")

if __name__ == "__main__":
    main()
```

#### TCP Asynchronous Client (Async)

Based on `asyncio`, suitable for scenarios requiring high performance, high concurrency, or integration into FastAPI/GUI applications.

```python
import asyncio
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncTcpTransport

async def main():
    # 1. Configure Async Transport
    transport = AsyncTcpTransport(host='192.168.1.10', port=502)

    # 2. Use async with to automatically manage connections
    async with AsyncModbusClient(transport) as client:
        # Read Float32 (Automatically handles merging 2 registers and decoding)
        # Supports specifying byte order, defaults to Big Endian
        temp = await client.read_float32(slave_id=1, start_address=200)
        print(f"Current Temperature: {temp:.2f}°C")

        # Write Multiple Registers
        await client.write_multiple_registers(slave_id=1, start_address=300, values=[10, 20, 30])

if __name__ == "__main__":
    asyncio.run(main())
```

### 2. RTU Client

Suitable for connecting industrial devices via RS485/RS232 serial ports.

#### RTU Synchronous Client (Sync)

```python
from modbuslink.client import SyncModbusClient
from modbuslink.transport import SyncRtuTransport

# 1. Configure Serial Parameters
transport = SyncRtuTransport(
    port='COM3',        # Windows: COMx, Linux: /dev/ttyUSBx
    baudrate=9600,
    bytesize=8,
    parity='N',
    stopbits=1,
    timeout=1.0
)

# 2. Create Client
with SyncModbusClient(transport) as client:
    # Read Input Registers (Function Code 0x04)
    data = client.read_input_registers(slave_id=1, start_address=0, quantity=5)
    print(f"Sensor Data: {data}")
```

#### RTU Asynchronous Client (Async)

Does not block the main thread while waiting for serial IO.

```python
import asyncio
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncRtuTransport

async def main():
    # Async Serial Transport
    transport = AsyncRtuTransport(port='/dev/ttyUSB0', baudrate=115200)

    async with AsyncModbusClient(transport) as client:
        # Write Int32 (Automatically encoded into 2 registers)
        await client.write_int32(slave_id=2, start_address=10, value=50000)
        print("Parameter set successfully")

if __name__ == "__main__":
    asyncio.run(main())
```

### 3. ASCII Client

Suitable for certain legacy devices where data is transmitted as ASCII characters.

#### ASCII Synchronous Client (Sync)

```python
from modbuslink.client import SyncModbusClient
from modbuslink.transport import SyncAsciiTransport

# Configure for ASCII mode, usually 7 data bits
transport = SyncAsciiTransport(port='COM1', baudrate=9600, bytesize=7, parity='E', stopbits=1)

with SyncModbusClient(transport) as client:
    # Read Coils
    status = client.read_coils(slave_id=1, start_address=0, quantity=8)
    print(f"Device Status: {status}")
```

#### ASCII Asynchronous Client (Async)

```python
import asyncio
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncAsciiTransport

async def main():
    transport = AsyncAsciiTransport(port='/dev/ttyS0', baudrate=9600)
    
    async with AsyncModbusClient(transport) as client:
        # Write String (ModbusLink automatically encodes it into registers)
        await client.write_string(slave_id=1, start_address=100, value="HELLO")

if __name__ == "__main__":
    asyncio.run(main())
```

### 4. Client RS485 Mode 

Suitable for RS485 communication mode, usually additional control lines (DE/RE) need to be configured.
`SyncRtuTransport`/`AsyncRtuTransport`/`SyncAsciiTransport`/`AsyncAsciiTransport` can all be configured for RS585 mode. 

```python
# As an example of the synchronous RTU client (for other clients, simply modify "SyncRtuTransport" )
from serial.rs485 import RS485Settings
from modbuslink import SyncRtuTransport

rs485_settings = RS485Settings(
    rts_level_for_tx=True,
    rts_level_for_rx=False,
    delay_before_tx=0.001,
    delay_before_rx=0.001,
)
transport = SyncRtuTransport('/dev/ttyUSB0', rs485_mode=rs485_settings)
```

### 5. Server Examples

ModbusLink's server implementations are all based on Async IO, capable of efficiently handling multi-client concurrency (TCP) or fast responses (RTU/ASCII).

#### TCP Server

Create a server listening on local port 502 with data callbacks.

```python
import asyncio
from modbuslink.server import AsyncTcpModbusServer, ModbusDataStore

async def run_server():
    # 1. Initialize Data Store (Define register sizes)
    store = ModbusDataStore(
        coils_size=1000,
        holding_registers_size=1000
    )

    # 2. Set Initial Data
    store.write_holding_registers(address=0, values=[123, 456, 789])

    # 3. Add Callback: Triggered when a client writes to holding registers
    def on_register_write(address, values):
        print(f"[Callback] Client wrote registers @ {address}: {values}")
    
    store.add_callback('holding_registers', on_register_write)

    # 4. Start Server
    server = AsyncTcpModbusServer(host='0.0.0.0', port=502, data_store=store)
    
    print("TCP Server started, listening on port 502...")
    try:
        await server.serve_forever()
    except asyncio.CancelledError:
        await server.stop()

if __name__ == "__main__":
    asyncio.run(run_server())
```

#### RTU Server

Turn your computer into a Modbus RTU Slave device.

```python
import asyncio
from modbuslink.server import AsyncRtuModbusServer, ModbusDataStore

async def run_server():
    store = ModbusDataStore()
    
    # Initialize Serial Server
    # slave_id: Address of this device as a slave
    server = AsyncRtuModbusServer(
        port='COM4',
        baudrate=9600,
        slave_id=1, 
        data_store=store
    )

    print("RTU Server running...")
    await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(run_server())
```

#### ASCII Server

```python
import asyncio
from modbuslink.server import AsyncAsciiModbusServer, ModbusDataStore

async def run_server():
    store = ModbusDataStore()
    
    server = AsyncAsciiModbusServer(
        port='/dev/ttyUSB0',
        baudrate=9600,
        slave_id=1,
        data_store=store
    )

    print("ASCII Server running...")
    await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(run_server())
```

### Advanced Tip: Concurrent Data Collection

This is one of the most powerful features of ModbusLink. Using `asyncio`, you can wait for IO responses from multiple devices simultaneously instead of waiting serially, drastically reducing total polling time.

```python
import asyncio
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncTcpTransport

async def collect_data(device_ip, slave_id):
    """Logic to collect data from a single device"""
    transport = AsyncTcpTransport(host=device_ip, port=502)
    async with AsyncModbusClient(transport) as client:
        try:
            # Read data
            data = await client.read_holding_registers(slave_id, 0, 10)
            return {"ip": device_ip, "data": data, "status": "ok"}
        except Exception as e:
            return {"ip": device_ip, "error": str(e), "status": "error"}

async def main():
    # Assume 3 devices
    devices = ["192.168.1.101", "192.168.1.102", "192.168.1.103"]
    
    # 1. Create task list
    tasks = [collect_data(ip, slave_id=1) for ip in devices]
    
    print("Starting concurrent collection...")
    # 2. Execute concurrently (gather)
    results = await asyncio.gather(*tasks)
    
    # 3. Process results
    for res in results:
        print(res)

if __name__ == "__main__":
    asyncio.run(main())
```

### Advanced Tip: Error Recovery & Buffer Flushing

In unstable network environments, devices may occasionally respond slowly, causing timeouts. ModbusLink provides a `flush()` method to clear stale data from the receive buffer, preventing "late" responses from affecting subsequent requests.

#### Sync Client Error Recovery

```python
from modbuslink.client import SyncModbusClient
from modbuslink.transport import SyncTcpTransport
from modbuslink import TimeOutError, InvalidReplyError


def robust_read():
    """Robust read with error recovery"""
    transport = SyncTcpTransport(host='192.168.1.100', port=502, timeout=2.0)
    client = SyncModbusClient(transport)
    
    with client:
        try:
            # Try to read data
            data = client.read_holding_registers(slave_id=1, start_address=0, quantity=10)
            return data
        except (TimeOutError, InvalidReplyError) as e:
            print(f"Error: {e}")
            
            # Flush receive buffer, discard stale data
            discarded = transport.flush()
            print(f"Flushed {discarded} bytes of stale data")
            
            # Retry
            data = client.read_holding_registers(slave_id=1, start_address=0, quantity=10)
            return data
```

#### Async Client - Home Assistant Integration Pattern

Suitable for scenarios requiring long-term stable operation, such as Home Assistant integration:

```python
import asyncio
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncTcpTransport
from modbuslink import TimeOutError, InvalidReplyError


async def home_assistant_polling():
    """Home Assistant style device polling"""
    transport = AsyncTcpTransport(host='192.168.1.100', port=502, timeout=3.0)
    client = AsyncModbusClient(transport)
    
    poll_interval = 60  # Poll every 60 seconds
    max_consecutive_errors = 3
    consecutive_errors = 0
    
    async with client:
        while True:
            try:
                # Read device data
                data = await client.read_holding_registers(
                    slave_id=1,
                    start_address=0,
                    quantity=10
                )
                
                print(f"Device data: {data}")
                consecutive_errors = 0  # Reset error counter on success
                
            except TimeOutError:
                consecutive_errors += 1
                print(f"Timeout (consecutive errors: {consecutive_errors}/{max_consecutive_errors})")
                
                # Flush buffer to prevent stale responses from affecting next request
                discarded = await transport.flush()
                if discarded > 0:
                    print(f"Flushed {discarded} bytes of stale data")
                
                # Reconnect if too many consecutive errors
                if consecutive_errors >= max_consecutive_errors:
                    print("Too many consecutive errors, reconnecting...")
                    await transport.close()
                    await asyncio.sleep(1)
                    await transport.open()
                    consecutive_errors = 0
                    
            except InvalidReplyError as e:
                print(f"Invalid reply: {e}")
                # Flush buffer on transaction ID mismatch
                await transport.flush()
            
            # Wait for next poll
            await asyncio.sleep(poll_interval)


if __name__ == "__main__":
    asyncio.run(home_assistant_polling())
```

---

## ️ Production-Grade Features

### Comprehensive Error Handling

ModbusLink defines a clear exception hierarchy to facilitate fault diagnosis in industrial fields:

- `ConnectError`: Connection failed (Network interruption, Serial port occupied).
- `TimeOutError`: Device response timeout.
- `CrcError` / `LrcError`: Data validation failed, automatic noise filtering.
- `ModbusException`: Functional exception returned by the device (e.g., Illegal Address 0x02).

### Advanced Logging & Debugging

Built-in bilingual logging system supporting protocol-level HEX debug mode:

```python
from modbuslink.common.logging import ModbusLogger

# Enable debug mode, output raw data frames
ModbusLogger.setup_logging(enable_debug=True, language="EN")
ModbusLogger.enable_protocol_debug()
```

### Performance Benchmarking

ModbusLink's async core design allows it to easily handle high-concurrency scenarios. The following code demonstrates how to complete hundreds of Modbus requests in seconds using `asyncio`:

```python
import asyncio
import time
from modbuslink.client import AsyncModbusClient
from modbuslink.transport import AsyncTcpTransport

async def performance_benchmark():
    """Measure ModbusLink limits"""
    
    # Connect to a high-performance Modbus simulator or server
    transport = AsyncTcpTransport(host='127.0.0.1', port=502)
    client = AsyncModbusClient(transport)
    
    async with client:
        print("Starting benchmark...")
        start_time = time.time()
        
        # Create 100 concurrent read tasks (Non-blocking)
        # Key: All requests are issued almost simultaneously, not one by one
        tasks = [
            client.read_holding_registers(slave_id=1, start_address=i, quantity=1) 
            for i in range(100)
        ]
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # Statistics
        print(f"Performance Results:")
        print(f"  • Concurrent Ops: {len(tasks)}")
        print(f"  • Total Time: {duration:.3f} s")
        print(f"  • Throughput: {len(tasks)/duration:.1f} Ops/s")
        print(f"  • Avg Latency: {duration*1000/len(tasks):.2f} ms")

if __name__ == "__main__":
    asyncio.run(performance_benchmark())
```

---

## 📈 Supported Modbus Functions

Full implementation of the **Modbus Specification**:

| Code | Name | Description | Use Case |
|------|------|-------------|----------|
| **0x01** | Read Coils | Read 1-2000 coil states | Digital Output (Pumps, Valves, Motors) |
| **0x02** | Read Discrete Inputs | Read 1-2000 input states | Digital Sensors (Limit Switches, Buttons) |
| **0x03** | Read Holding Registers | Read 1-125 register values | Analog Output (Setpoints, Parameters) |
| **0x04** | Read Input Registers | Read 1-125 input values | Analog Input (Temperature, Pressure) |
| **0x05** | Write Single Coil | Write one coil | Control Single Device (Start Pump) |
| **0x06** | Write Single Register | Write one register | Set Single Parameter (Temp Setpoint) |
| **0x0F** | Write Multiple Coils | Write 1-1968 coils | Batch Control (Production Sequence) |
| **0x10** | Write Multiple Registers | Write 1-123 registers | Batch Parameters (Recipe Download) |

### Project Architecture

**Clean, Maintainable, Scalable** codebase structure:

```
ModbusLink/
├── src/modbuslink/
│   ├── client/                    # 📱 Client Layer
│   │   ├── sync_client.py         # Sync Modbus Client
│   │   └── async_client.py        # Async Modbus Client
│   │
│   ├── server/                    # 🖥️ Server Layer
│   │   ├── data_store.py          # Data Store Class
│   │   ├── base_server.py         # Server Base Class
│   │   ├── tcp_server.py          # TCP Server
│   │   ├── rtu_server.py          # RTU Server
│   │   └── ascii_server.py        # ASCII Server
│   │
│   ├── transport/                 # 🚚 Transport Layer
│   │   ├── base_transport.py      # Sync/Async Transport Interface
│   │   ├── tcp_transport.py       # Sync/Async TCP Implementation
│   │   ├── rtu_transport.py       # Sync/Async RTU Implementation
│   │   └── ascii_transport.py     # Sync/Async ASCII Implementation
│   │
│   ├── utils/                     # 🔧 Utility Layer
│   │   ├── crc.py                 # CRC16 Checksum (RTU)
│   │   ├── lrc.py                 # LRC Checksum (ASCII)
│   │   └── coder.py               # Data Type Conversion
│   │
│   └── common/                    # 🛠️ Common Components
│       ├── language.py            # Unified Language Configuration
│       ├── exceptions.py          # Custom Exception System
│       └── logging.py             # Advanced Logging System
│
├── examples/                      # 📚 Usage Examples
│   ├── sync_tcp_example.py        # Sync TCP Client Example
│   ├── async_tcp_example.py       # Async TCP Client Example
│   ├── sync_rtu_example.py        # Sync RTU Client Example
│   ├── async_rtu_example.py       # Async RTU Client Example
│   ├── sync_ascii_example.py      # Sync ASCII Client Example
│   ├── async_ascii_example.py     # Async ASCII Client Example
│   ├── async_tcp_server_example.py    # TCP Server Example
│   ├── async_rtu_server_example.py    # RTU Server Example
│   └── async_ascii_server_example.py  # ASCII Server Example
│
└── docs/                          # 📜 Documentation
```

---

## 📚 Examples

Explore **Real-World Scenarios** in the [examples](examples/) directory.

---

## System Requirements

- Python: 3.9 or higher
- Dependencies:
    - `pyserial >= 3.5`: Serial communication basis
    - `pyserial-asyncio >= 0.6`: Async serial support
    - `typing_extensions`: Type compatibility support
- OS: Windows, Linux, macOS (Cross-platform support)

---

## 📜 License and Contribution

**MIT License** - Commercial use allowed. See [LICENSE.txt](LICENSE.txt) for details.

### Contribution Guide

**Contributions are welcome!** Please:

1. 🍿 **Fork** the repository
2. 🌱 **Create** a feature branch
3. ✨ **Add** tests for new functionality
4. 📝 **Update** documentation
5. 🚀 **Submit** a pull request

### Community and Support

- 💬 **GitHub Issues**: Bug reports and feature requests
- 📧 **Email Support**: Technical questions and consulting
- 📚 **Documentation**: Comprehensive guides and API reference
- 🎆 **Examples**: Production-ready code samples

---

<div align="center">

**Built with ❤️ for the Industrial Automation Community**

*ModbusLink - Connecting Industrial Systems with Modern Python*

</div>
