Source code for pymod.client

"""Client API — async source of truth and a sync facade.

`AsyncClient` owns a `Transport` and a `RetryPolicy`. Its primary public
method is `execute(pdu, ...)` — it sends a request PDU, applies the retry
policy, and returns the response PDU. The transport handles framing, the
client handles retries and per-call overrides.

`Client` is a synchronous facade running an `AsyncClient` on a private
event loop in a daemon thread. CLI users and ad-hoc scripts call sync
methods that block until the underlying coroutine completes.

Both `AsyncClient` and `Client` are safe to share across many concurrent
callers — the underlying transport serializes RTU buses internally and
demultiplexes pipelined TCP responses.

The typed batch `read([Holding(...), Coil(...)])` and `write([...])` APIs
are stubbed here; the planner that powers them is delivered in M4. Until
then, use `execute()` with raw PDUs encoded by `pymod.protocol.pdu`.
"""

from __future__ import annotations

import asyncio
import logging
import threading
from collections.abc import Awaitable, Callable, Coroutine, Sequence
from concurrent.futures import Future as ThreadFuture
from typing import Any, Self, TypeVar

from ._types import ReadItem, ReadResult, WriteItem, WriteResult
from .errors import ModbusError
from .planner import plan_and_execute_reads, plan_and_execute_writes
from .retry import RetryPolicy
from .transport import RtuOverTcpTransport, SerialTransport, TcpTransport
from .transport.base import Transport

_logger = logging.getLogger("pymod.client")

DEFAULT_TIMEOUT_S = 0.5
_DEFAULT_RETRY = RetryPolicy()

_T = TypeVar("_T")


# ---------- AsyncClient ----------------------------------------------------


[docs] class AsyncClient: """Async Modbus client. Construct via the `tcp`, `rtu`, or `rtu_over_tcp` factories.""" def __init__( self, transport: Transport, *, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, ) -> None: self._transport = transport self._default_unit_id = unit_id self._default_timeout_s = timeout_s self._default_retry = retry if retry is not None else _DEFAULT_RETRY # ---------- Factories ----------
[docs] @classmethod def tcp( cls, host: str, port: int = 502, *, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, pipeline: bool = True, connect_timeout_s: float = 3.0, ) -> Self: """Modbus TCP client. One auto-reconnecting connection per (host, port). Pipelining can be disabled per-PLC.""" transport: Transport = TcpTransport( host, port, pipeline=pipeline, connect_timeout_s=connect_timeout_s, ) return cls(transport, unit_id=unit_id, timeout_s=timeout_s, retry=retry)
[docs] @classmethod def rtu( cls, port: str, baudrate: int = 9600, *, bytesize: int = 8, parity: str = "N", stopbits: float = 1, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, inter_frame_delay_s: float | None = None, ) -> Self: """Modbus RTU client over a serial port. Internal lock serializes the bus across callers.""" transport: Transport = SerialTransport( port, baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, inter_frame_delay_s=inter_frame_delay_s, ) return cls(transport, unit_id=unit_id, timeout_s=timeout_s, retry=retry)
[docs] @classmethod def rtu_over_tcp( cls, host: str, port: int, *, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, connect_timeout_s: float = 3.0, ) -> Self: """RTU framing over a TCP socket — for serial-to-Ethernet gateways.""" transport: Transport = RtuOverTcpTransport( host, port, connect_timeout_s=connect_timeout_s ) return cls(transport, unit_id=unit_id, timeout_s=timeout_s, retry=retry)
# ---------- Lifecycle ----------
[docs] async def connect(self) -> None: await self._transport.connect()
[docs] async def close(self) -> None: await self._transport.close()
@property def is_connected(self) -> bool: return self._transport.is_connected async def __aenter__(self) -> Self: await self.connect() return self async def __aexit__(self, *exc_info: object) -> None: await self.close() # ---------- Low-level execute ----------
[docs] async def execute( self, pdu: bytes, *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> bytes: """Send a request PDU, return the response PDU. The retry policy is applied to errors listed in `policy.retry_on` (default: `ModbusTimeoutError`, `ModbusConnectionError`). Other errors — notably `ModbusExceptionResponse` subclasses — are NOT retried, on the assumption that retrying an illegal address yields the same illegal address. Per-call overrides default to the values supplied at client construction time. """ effective_unit_id = unit_id if unit_id is not None else self._default_unit_id effective_timeout = timeout_s if timeout_s is not None else self._default_timeout_s policy = retry if retry is not None else self._default_retry last_exc: BaseException | None = None for attempt in range(1, policy.max_attempts + 1): if attempt > 1: await asyncio.sleep(policy.delay_for(attempt - 1)) try: return await self._transport.send_and_receive( pdu, unit_id=effective_unit_id, timeout_s=effective_timeout, ) except policy.retry_on as e: last_exc = e if attempt < policy.max_attempts: _logger.debug( "attempt %d/%d failed (%s: %s); retrying", attempt, policy.max_attempts, type(e).__name__, e, ) continue raise # Unreachable: the loop either returns or raises. assert last_exc is not None raise last_exc
# ---------- Typed batch reads/writes ----------
[docs] async def read( self, items: Sequence[ReadItem], *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> list[ReadResult]: """Issue a heterogeneous batch read. Adjacent same-area ranges are coalesced into the minimum number of Modbus reads; per-FC PDU limits are honored by automatic splitting. Returns a list parallel to `items` — one item failing does not abort the batch. """ executor = self._make_pdu_executor(unit_id, timeout_s, retry) return await plan_and_execute_reads(items, executor)
[docs] async def write( self, items: Sequence[WriteItem], *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> list[WriteResult]: """Issue a batch write. FC05/06 vs FC15/16 selected per item by value count. Per-item results parallel `items`.""" executor = self._make_pdu_executor(unit_id, timeout_s, retry) return await plan_and_execute_writes(items, executor)
def _make_pdu_executor( self, unit_id: int | None, timeout_s: float | None, retry: RetryPolicy | None, ) -> "Callable[[bytes], Awaitable[bytes]]": eu = unit_id if unit_id is not None else self._default_unit_id et = timeout_s if timeout_s is not None else self._default_timeout_s er = retry if retry is not None else self._default_retry async def _exec(pdu: bytes) -> bytes: return await self.execute(pdu, unit_id=eu, timeout_s=et, retry=er) return _exec
# ---------- Sync facade ---------------------------------------------------- class _BackgroundLoop: """Owns an asyncio event loop running on a daemon thread. Coroutines submitted via `run_coroutine` execute on the loop's thread and the calling (synchronous) thread blocks on the resulting future. """ def __init__(self) -> None: self._loop = asyncio.new_event_loop() self._started = threading.Event() self._thread = threading.Thread( target=self._run, daemon=True, name="pymod-sync-loop", ) self._thread.start() self._started.wait() def _run(self) -> None: asyncio.set_event_loop(self._loop) self._started.set() try: self._loop.run_forever() finally: self._loop.close() def run_coroutine(self, coro: Coroutine[Any, Any, _T]) -> ThreadFuture[_T]: return asyncio.run_coroutine_threadsafe(coro, self._loop) def stop(self) -> None: if not self._thread.is_alive(): return self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join(timeout=5.0) @property def is_alive(self) -> bool: return self._thread.is_alive()
[docs] class Client: """Synchronous facade over `AsyncClient`. Owns a private daemon-thread event loop. Each sync call schedules the underlying coroutine on that loop and blocks the caller until it completes. The event loop is stopped on `close()`. """ def __init__(self, async_client: AsyncClient, loop: _BackgroundLoop) -> None: self._async = async_client self._loop = loop # ---------- Factories ----------
[docs] @classmethod def tcp( cls, host: str, port: int = 502, *, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, pipeline: bool = True, connect_timeout_s: float = 3.0, ) -> Self: loop = _BackgroundLoop() async_client = AsyncClient.tcp( host, port, unit_id=unit_id, timeout_s=timeout_s, retry=retry, pipeline=pipeline, connect_timeout_s=connect_timeout_s, ) return cls(async_client, loop)
[docs] @classmethod def rtu( cls, port: str, baudrate: int = 9600, *, bytesize: int = 8, parity: str = "N", stopbits: float = 1, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, inter_frame_delay_s: float | None = None, ) -> Self: loop = _BackgroundLoop() async_client = AsyncClient.rtu( port, baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, unit_id=unit_id, timeout_s=timeout_s, retry=retry, inter_frame_delay_s=inter_frame_delay_s, ) return cls(async_client, loop)
[docs] @classmethod def rtu_over_tcp( cls, host: str, port: int, *, unit_id: int = 1, timeout_s: float = DEFAULT_TIMEOUT_S, retry: RetryPolicy | None = None, connect_timeout_s: float = 3.0, ) -> Self: loop = _BackgroundLoop() async_client = AsyncClient.rtu_over_tcp( host, port, unit_id=unit_id, timeout_s=timeout_s, retry=retry, connect_timeout_s=connect_timeout_s, ) return cls(async_client, loop)
# ---------- Lifecycle ----------
[docs] def connect(self) -> None: self._loop.run_coroutine(self._async.connect()).result()
[docs] def close(self) -> None: if self._loop.is_alive: try: self._loop.run_coroutine(self._async.close()).result(timeout=5.0) except Exception: pass self._loop.stop()
@property def is_connected(self) -> bool: return self._async.is_connected def __enter__(self) -> Self: self.connect() return self def __exit__(self, *exc_info: object) -> None: self.close() # ---------- execute ----------
[docs] def execute( self, pdu: bytes, *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> bytes: return self._loop.run_coroutine( self._async.execute( pdu, unit_id=unit_id, timeout_s=timeout_s, retry=retry ) ).result()
# ---------- Typed reads/writes (M4) ----------
[docs] def read( self, items: Sequence[ReadItem], *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> list[ReadResult]: return self._loop.run_coroutine( self._async.read(items, unit_id=unit_id, timeout_s=timeout_s, retry=retry) ).result()
[docs] def write( self, items: Sequence[WriteItem], *, unit_id: int | None = None, timeout_s: float | None = None, retry: RetryPolicy | None = None, ) -> list[WriteResult]: return self._loop.run_coroutine( self._async.write(items, unit_id=unit_id, timeout_s=timeout_s, retry=retry) ).result()