Traffic Manager#
TrafficManager is the communication layer between your device class and the physical transport (Serial, VISA, TCP/IP, and others).
It standardizes connection handling and command dispatch so device implementations can focus on device behavior instead of low-level I/O details.
What It Does#
A traffic manager is responsible for:
Opening and closing a connection to a device resource.
Sending one command and returning one response.
Applying timeout behavior.
Providing consistent error handling through a shared interface.
Supporting context manager usage (
with ...) for safe cleanup.
In PlestyLib, the abstract base class is plestylib.traffic.TrafficManager.
Lifecycle and Interface#
The public API is:
open(...)send_command(command, timeout=None, ...)close()
When you implement a custom transport, you override three private hooks:
_open(...)_send_command(command, timeout=None, ...)_close()
The base class wraps these hooks with common behaviors:
In-use resource protection (same address cannot be opened by multiple manager instances).
Error capture and reporting.
__enter__and__exit__support.
Built-in Implementations#
Common built-in traffic managers include:
SerialTrafficManagerfor serial ports.VisaTrafficManagerfor VISA resources.TcpIpTrafficManagerfor TCP/IP connections.
Demo 1: Use the Built-in SerialTrafficManager#
from plestylib.traffic.serial import SerialTrafficManager
manager = SerialTrafficManager(
port="/dev/ttyUSB0", # Example on Linux
baudrate=9600,
timeout=5,
write_termination="\r",
read_termination="\r\n",
)
try:
opened = manager.open(parity="none", stopbits="one", bytesize=8)
if not opened:
raise RuntimeError("Failed to open serial port")
# Command format depends on your device protocol.
response = manager.send_command("MEAS:VOLT?")
print("Response:", response)
finally:
manager.close()
Notes for SerialTrafficManager:
It appends
write_terminationautomatically.It reads until
read_termination.Current response logic treats lines containing
OKas successful.
Demo 2: Implement Your Own Serial Traffic Manager#
Use this pattern if your serial device protocol differs from the default behavior.
from typing import Any
import serial
from plestylib.traffic import TrafficManager
class MySerialTrafficManager(TrafficManager):
def __init__(self, port: str, baudrate: int = 115200, timeout: int = 3):
super().__init__(address=port, timeout=timeout)
self.port = port
self.baudrate = baudrate
def _open(self) -> None:
self.inst = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
)
self.inst.reset_input_buffer()
self.inst.reset_output_buffer()
def _send_command(self, command: str, timeout=None) -> Any:
if timeout is not None:
self.inst.timeout = timeout
wire = (command + "\n").encode("utf-8")
self.inst.write(wire)
# Read one full line response for this example protocol.
raw = self.inst.readline()
if not raw:
return False
return raw.decode("utf-8", errors="replace").strip()
def _close(self) -> None:
self.inst.close()
Then use it through the standard public API:
tm = MySerialTrafficManager("/dev/ttyUSB0", baudrate=115200)
with tm:
print(tm.send_command("*IDN?"))
Integration with Device Classes#
In a synchronized device implementation, traffic managers are usually called inside low-level query and write paths:
Device receives a standardized operation.
Command solver builds the protocol command string.
Traffic manager sends/receives bytes or text.
Command solver decodes response into standardized value.
This separation keeps each layer simple and testable.
Best Practices#
Keep transport-specific logic inside traffic managers only.
Keep protocol formatting in command solvers, not in traffic managers.
Always use
with manager:ortry/finallyto guaranteeclose().Use explicit timeout values suitable for your device.
Validate port/resource settings early in
_open().