Coverage for .tox/py313/lib/python3.13/site-packages/pydalec/transport/base.py: 100%
31 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
1"""Abstract transport interface used by DALEC clients."""
3from abc import ABC, abstractmethod
4from collections import deque
6from pydalec.measurement import Measurement
9class BaseTransport(ABC):
10 """Abstract interface for synchronous DALEC transports."""
12 measurement_log: deque[Measurement]
13 _connected: bool
15 def __init__(self) -> None:
16 """Initialize shared transport state."""
17 self.measurement_log = deque(maxlen=40)
19 def set_measurement_log_size(self, size: int) -> None:
20 """Resize measurement log while preserving the newest possible records."""
21 if size < 1:
22 err_msg = 'size must be at least 1'
23 raise ValueError(err_msg)
24 self.measurement_log = deque(self.measurement_log, maxlen=size)
26 @property
27 def connected(self) -> bool:
28 """Return True if the transport is currently connected.
30 This is the single source of truth for connection state.
31 """
32 return self._connected
34 @abstractmethod
35 def connect(self) -> None:
36 """Establish a connection to the transport backend."""
37 pass
39 @abstractmethod
40 def send(self, data: str) -> None:
41 """Send a command string to the transport backend."""
42 pass
44 @abstractmethod
45 def disconnect(self) -> None:
46 """Release any transport resources held by the backend."""
47 pass
49 @abstractmethod
50 def start_measurements(self) -> None:
51 """Start the background process for making and receiving measurements."""
52 pass
54 @abstractmethod
55 def stop_measurements(self) -> None:
56 """Stop the background process for making and receiving measurements."""
57 pass