Coverage for .tox/py312/lib/python3.12/site-packages/pydalec/transport/tcp.py: 85%
193 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-04 16:06 +0200
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-04 16:06 +0200
1"""Synchronous Telnet transport used by the DALEC client."""
3import datetime
4import io
5import logging
6import threading
7from collections import deque
8from dataclasses import dataclass
9from pathlib import Path
10from typing import Literal, Union
12from pydantic import ValidationError
13from telnetlib3.sync import TelnetConnection
15from pydalec.errors import PyDalecConnectionError
16from pydalec.measurement import Measurement
17from pydalec.transport.base import BaseTransport
19_LINE_STREAM_OPTIONS = Literal['raw', 'error']
20_LOGGER = logging.getLogger(__name__)
23@dataclass
24class StreamState:
25 """Track the current file handle and rollover state for one stream."""
27 handle: io.TextIOBase | None = None
28 path: Path | None = None
29 size_bytes: int = 0
30 day_key: str | None = None
33class DataSink:
34 """Persist incoming DALEC lines to per-day stream files."""
36 def __init__(
37 self,
38 data_root_dir: str | Path | None,
39 max_file_size_kb: int,
40 ):
41 """Configure data sink and prepare directories when enabled."""
42 self._enabled = data_root_dir is not None
43 self._data_root_dir: Path | None = None
44 self._max_file_size_bytes = 0
45 self._stream_states: dict[_LINE_STREAM_OPTIONS, StreamState] = {
46 'raw': StreamState(),
47 'error': StreamState(),
48 }
49 self._lock = threading.Lock()
51 if data_root_dir is None:
52 return
54 if max_file_size_kb <= 0:
55 err_msg = 'max_file_size_kb must be greater than 0'
56 raise ValueError(err_msg)
58 self._max_file_size_bytes = max_file_size_kb * 1024
59 self._data_root_dir = Path(data_root_dir).expanduser().resolve()
60 self._data_root_dir.mkdir(parents=True, exist_ok=True)
62 @staticmethod
63 def _format_iso8601_utc(timestamp: datetime.datetime) -> str:
64 utc_timestamp = timestamp.astimezone(datetime.timezone.utc)
65 return utc_timestamp.isoformat(timespec='microseconds').replace('+00:00', 'Z')
67 @staticmethod
68 def _format_filename_timestamp_utc(timestamp: datetime.datetime) -> str:
69 """Return filesystem-safe ISO8601 basic UTC timestamp for filenames."""
70 utc_timestamp = timestamp.astimezone(datetime.timezone.utc)
71 return utc_timestamp.strftime('%Y%m%dT%H%M%S.%fZ')
73 def store_line(
74 self,
75 stream: _LINE_STREAM_OPTIONS,
76 message: str,
77 timestamp: datetime.datetime,
78 ) -> None:
79 """Store a single incoming line to the matching stream file."""
80 if not self._enabled:
81 return
83 iso_timestamp = self._format_iso8601_utc(timestamp)
84 line = f'{iso_timestamp} {message}\n'
85 line_size = len(line.encode('utf-8'))
87 with self._lock:
88 try:
89 self._ensure_stream_ready(
90 stream=stream,
91 timestamp=timestamp,
92 incoming_line_size=line_size,
93 )
94 state = self._stream_states[stream]
95 handle = state.handle
96 if handle is None:
97 return
98 handle.write(line)
99 handle.flush()
100 state.size_bytes += line_size
101 except OSError:
102 _LOGGER.exception('Failed to persist incoming %s line', stream)
104 def _ensure_stream_ready(
105 self,
106 stream: _LINE_STREAM_OPTIONS,
107 timestamp: datetime.datetime,
108 incoming_line_size: int,
109 ) -> None:
110 if self._data_root_dir is None:
111 return
113 state = self._stream_states[stream]
114 day_key = timestamp.astimezone(datetime.timezone.utc).strftime('%Y%m%d')
115 if state.day_key is not None and state.day_key != day_key:
116 self._close_stream(stream)
117 state = self._stream_states[stream]
119 if state.handle is not None:
120 next_size = state.size_bytes + incoming_line_size
121 if next_size > self._max_file_size_bytes:
122 self._close_stream(stream)
123 state = self._stream_states[stream]
125 if state.handle is not None:
126 return
128 day_dir = self._data_root_dir / day_key
129 day_dir.mkdir(parents=True, exist_ok=True)
130 timestamp_label = self._format_filename_timestamp_utc(timestamp)
131 file_name = f'DALEC_{timestamp_label}.{stream}'
132 path = day_dir / file_name
133 handle = path.open('a', encoding='utf-8', newline='')
135 state.handle = handle
136 state.path = path
137 state.size_bytes = path.stat().st_size
138 state.day_key = day_key
140 def _close_stream(self, stream: _LINE_STREAM_OPTIONS) -> None:
141 state = self._stream_states[stream]
142 if state.handle is not None:
143 state.handle.close()
144 self._stream_states[stream] = StreamState()
146 def close_all_streams(self) -> None:
147 """Close any open stream file handles."""
148 if not self._enabled:
149 return
150 with self._lock:
151 self._close_stream('raw')
152 self._close_stream('error')
155class TCPTransport(BaseTransport):
156 """Telnet-based synchronous transport for DALEC commands."""
158 def __init__(
159 self,
160 host: str,
161 port: int = 23,
162 data_root_dir: str | Path | None = None,
163 max_file_size_kb: int = 10240,
164 ):
165 """Connect to a DALEC endpoint over Telnet."""
166 super().__init__()
167 self._connected = False
168 self._data_sink = DataSink(data_root_dir=data_root_dir, max_file_size_kb=max_file_size_kb)
169 self._connection = TelnetConnection(host, port, connect_minwait=0.0, encoding='utf8')
170 self._host: str = host
171 self._port: int = port
172 try:
173 self.connect()
174 except ConnectionRefusedError as e:
175 self._connected = False
176 err_msg = f'Unable to connect to DALEC at {host}:{port}'
177 raise PyDalecConnectionError(err_msg) from e
179 def _setup_background_reader(self) -> None:
180 self._measurement_log_lock = threading.Lock()
181 self._responses: deque[str | None] = deque()
182 self._responses_lock = threading.Lock()
183 self._reader_thread = threading.Thread(target=self._read_incoming_data, daemon=True)
184 self._reader_thread.start()
186 def set_measurement_log_size(self, size: int) -> None:
187 """Resize measurement log while holding the measurement lock."""
188 with self._measurement_log_lock:
189 super().set_measurement_log_size(size)
191 def send(self, data: str) -> None:
192 """Send a single command line to the instrument."""
193 with self._responses_lock:
194 self._responses.clear()
195 self._connection.write(data + '\r\n')
196 self._connection.flush()
198 def _get_reply(self) -> Union[str, None]:
199 """Return the oldest response from the instrument."""
200 with self._responses_lock:
201 return self._responses.popleft() if self._responses else None
203 def disconnect(self) -> None:
204 """Close the telnet connection and stop the background reader."""
205 if not self._connected:
206 return
208 _LOGGER.info('Disconnecting from DALEC at %s:%s', self._host, self._port)
210 self._connection.close()
211 self._connected = False
212 self._reader_thread.join(timeout=1)
213 self._data_sink.close_all_streams()
215 def connect(self) -> None:
216 """Reconnect to the DALEC endpoint if currently disconnected."""
217 if self._connected:
218 return
219 else:
220 try:
221 self._connection.connect()
222 except ConnectionRefusedError:
223 err_msg = (
224 f'Unable to connect to DALEC at {self._host}:{self._port}. '
225 'Check instrument power/connection and that DALECview is not connected.'
226 )
227 raise PyDalecConnectionError(err_msg) from None
228 else:
229 self._connected = True
230 self._setup_background_reader()
231 _LOGGER.info('Connected to DALEC at %s:%s', self._host, self._port)
233 def _read_incoming_data(self) -> None:
234 while True:
235 try:
236 raw_message = self._connection.readline()
237 except (EOFError, RuntimeError):
238 break
239 if not raw_message:
240 break
242 if isinstance(raw_message, str):
243 message = raw_message.strip()
244 elif isinstance(raw_message, bytes):
245 message = raw_message.decode('utf-8').strip()
246 else:
247 message = bytes(raw_message).decode('utf-8').strip()
249 if message:
250 self._handle_incoming_data(message)
252 def _handle_incoming_data(self, message: str) -> None:
253 received_at = self._utc_now()
254 try:
255 measurement = Measurement.from_raw_data(message)
256 except (ValidationError, ValueError):
257 _LOGGER.debug('Received non-measurement line: %s', message)
258 self._data_sink.store_line(
259 stream='error',
260 message=message,
261 timestamp=received_at,
262 )
263 with self._responses_lock:
264 self._responses.append(message)
265 return
267 self._data_sink.store_line(
268 stream='raw',
269 message=message,
270 timestamp=received_at,
271 )
273 with self._measurement_log_lock:
274 self.measurement_log.append(measurement)
276 @staticmethod
277 def _utc_now() -> datetime.datetime:
278 return datetime.datetime.now(datetime.timezone.utc)
280 def start_measurements(self) -> None:
281 """Send command to start making measurements."""
282 with self._measurement_log_lock:
283 self.measurement_log.clear()
284 _LOGGER.debug('Sending START command')
285 self._connection.write('START\r\n')
286 self._connection.flush()
288 def stop_measurements(self) -> None:
289 """Send command to stop making measurements."""
290 _LOGGER.debug('Sending STOP command')
291 self._connection.write('STOP\r\n')
292 self._connection.flush()
294 def __repr__(self) -> str:
295 """Representation of TCPTransport instance.
297 Returns:
298 str: formatted string showing the host and port of the TCPTransport instance.
299 """
300 return self.__str__()
302 def __str__(self) -> str:
303 """String representation of TCPTransport instance.
305 Returns:
306 str: formatted string showing the host and port of the TCPTransport instance.
307 """
308 return f'TCPTransport ({self._host}:{self._port})'