Coverage for .tox/py312/lib/python3.12/site-packages/pydalec/transport/base.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-26 22:48 +0200

1"""Abstract transport interface used by DALEC clients.""" 

2 

3from abc import ABC, abstractmethod 

4from collections import deque 

5 

6from pydalec.measurement import Measurement 

7 

8 

9class BaseTransport(ABC): 

10 """Abstract interface for synchronous DALEC transports.""" 

11 

12 measurement_log: deque[Measurement] 

13 _connected: bool 

14 

15 def __init__(self) -> None: 

16 """Initialize shared transport state.""" 

17 self.measurement_log = deque(maxlen=40) 

18 

19 def set_measurement_log_size(self, size: int) -> None: 

20 """Resize measurement log while preserving the newest possible records.""" 

21 if size < 1: 

22 err_msg = 'size must be at least 1' 

23 raise ValueError(err_msg) 

24 self.measurement_log = deque(self.measurement_log, maxlen=size) 

25 

26 @property 

27 def connected(self) -> bool: 

28 """Return True if the transport is currently connected. 

29 

30 This is the single source of truth for connection state. 

31 """ 

32 return self._connected 

33 

34 @abstractmethod 

35 def connect(self) -> None: 

36 """Establish a connection to the transport backend.""" 

37 pass 

38 

39 @abstractmethod 

40 def send(self, data: str) -> None: 

41 """Send a command string to the transport backend.""" 

42 pass 

43 

44 @abstractmethod 

45 def disconnect(self) -> None: 

46 """Release any transport resources held by the backend.""" 

47 pass 

48 

49 @abstractmethod 

50 def start_measurements(self) -> None: 

51 """Start the background process for making and receiving measurements.""" 

52 pass 

53 

54 @abstractmethod 

55 def stop_measurements(self) -> None: 

56 """Stop the background process for making and receiving measurements.""" 

57 pass