Coverage for .tox/py314/lib/python3.14/site-packages/pydalec/transport/tcp.py: 85%

193 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-04 16:06 +0200

1"""Synchronous Telnet transport used by the DALEC client.""" 

2 

3import datetime 

4import io 

5import logging 

6import threading 

7from collections import deque 

8from dataclasses import dataclass 

9from pathlib import Path 

10from typing import Literal, Union 

11 

12from pydantic import ValidationError 

13from telnetlib3.sync import TelnetConnection 

14 

15from pydalec.errors import PyDalecConnectionError 

16from pydalec.measurement import Measurement 

17from pydalec.transport.base import BaseTransport 

18 

19_LINE_STREAM_OPTIONS = Literal['raw', 'error'] 

20_LOGGER = logging.getLogger(__name__) 

21 

22 

23@dataclass 

24class StreamState: 

25 """Track the current file handle and rollover state for one stream.""" 

26 

27 handle: io.TextIOBase | None = None 

28 path: Path | None = None 

29 size_bytes: int = 0 

30 day_key: str | None = None 

31 

32 

33class DataSink: 

34 """Persist incoming DALEC lines to per-day stream files.""" 

35 

36 def __init__( 

37 self, 

38 data_root_dir: str | Path | None, 

39 max_file_size_kb: int, 

40 ): 

41 """Configure data sink and prepare directories when enabled.""" 

42 self._enabled = data_root_dir is not None 

43 self._data_root_dir: Path | None = None 

44 self._max_file_size_bytes = 0 

45 self._stream_states: dict[_LINE_STREAM_OPTIONS, StreamState] = { 

46 'raw': StreamState(), 

47 'error': StreamState(), 

48 } 

49 self._lock = threading.Lock() 

50 

51 if data_root_dir is None: 

52 return 

53 

54 if max_file_size_kb <= 0: 

55 err_msg = 'max_file_size_kb must be greater than 0' 

56 raise ValueError(err_msg) 

57 

58 self._max_file_size_bytes = max_file_size_kb * 1024 

59 self._data_root_dir = Path(data_root_dir).expanduser().resolve() 

60 self._data_root_dir.mkdir(parents=True, exist_ok=True) 

61 

62 @staticmethod 

63 def _format_iso8601_utc(timestamp: datetime.datetime) -> str: 

64 utc_timestamp = timestamp.astimezone(datetime.timezone.utc) 

65 return utc_timestamp.isoformat(timespec='microseconds').replace('+00:00', 'Z') 

66 

67 @staticmethod 

68 def _format_filename_timestamp_utc(timestamp: datetime.datetime) -> str: 

69 """Return filesystem-safe ISO8601 basic UTC timestamp for filenames.""" 

70 utc_timestamp = timestamp.astimezone(datetime.timezone.utc) 

71 return utc_timestamp.strftime('%Y%m%dT%H%M%S.%fZ') 

72 

73 def store_line( 

74 self, 

75 stream: _LINE_STREAM_OPTIONS, 

76 message: str, 

77 timestamp: datetime.datetime, 

78 ) -> None: 

79 """Store a single incoming line to the matching stream file.""" 

80 if not self._enabled: 

81 return 

82 

83 iso_timestamp = self._format_iso8601_utc(timestamp) 

84 line = f'{iso_timestamp} {message}\n' 

85 line_size = len(line.encode('utf-8')) 

86 

87 with self._lock: 

88 try: 

89 self._ensure_stream_ready( 

90 stream=stream, 

91 timestamp=timestamp, 

92 incoming_line_size=line_size, 

93 ) 

94 state = self._stream_states[stream] 

95 handle = state.handle 

96 if handle is None: 

97 return 

98 handle.write(line) 

99 handle.flush() 

100 state.size_bytes += line_size 

101 except OSError: 

102 _LOGGER.exception('Failed to persist incoming %s line', stream) 

103 

104 def _ensure_stream_ready( 

105 self, 

106 stream: _LINE_STREAM_OPTIONS, 

107 timestamp: datetime.datetime, 

108 incoming_line_size: int, 

109 ) -> None: 

110 if self._data_root_dir is None: 

111 return 

112 

113 state = self._stream_states[stream] 

114 day_key = timestamp.astimezone(datetime.timezone.utc).strftime('%Y%m%d') 

115 if state.day_key is not None and state.day_key != day_key: 

116 self._close_stream(stream) 

117 state = self._stream_states[stream] 

118 

119 if state.handle is not None: 

120 next_size = state.size_bytes + incoming_line_size 

121 if next_size > self._max_file_size_bytes: 

122 self._close_stream(stream) 

123 state = self._stream_states[stream] 

124 

125 if state.handle is not None: 

126 return 

127 

128 day_dir = self._data_root_dir / day_key 

129 day_dir.mkdir(parents=True, exist_ok=True) 

130 timestamp_label = self._format_filename_timestamp_utc(timestamp) 

131 file_name = f'DALEC_{timestamp_label}.{stream}' 

132 path = day_dir / file_name 

133 handle = path.open('a', encoding='utf-8', newline='') 

134 

135 state.handle = handle 

136 state.path = path 

137 state.size_bytes = path.stat().st_size 

138 state.day_key = day_key 

139 

140 def _close_stream(self, stream: _LINE_STREAM_OPTIONS) -> None: 

141 state = self._stream_states[stream] 

142 if state.handle is not None: 

143 state.handle.close() 

144 self._stream_states[stream] = StreamState() 

145 

146 def close_all_streams(self) -> None: 

147 """Close any open stream file handles.""" 

148 if not self._enabled: 

149 return 

150 with self._lock: 

151 self._close_stream('raw') 

152 self._close_stream('error') 

153 

154 

155class TCPTransport(BaseTransport): 

156 """Telnet-based synchronous transport for DALEC commands.""" 

157 

158 def __init__( 

159 self, 

160 host: str, 

161 port: int = 23, 

162 data_root_dir: str | Path | None = None, 

163 max_file_size_kb: int = 10240, 

164 ): 

165 """Connect to a DALEC endpoint over Telnet.""" 

166 super().__init__() 

167 self._connected = False 

168 self._data_sink = DataSink(data_root_dir=data_root_dir, max_file_size_kb=max_file_size_kb) 

169 self._connection = TelnetConnection(host, port, connect_minwait=0.0, encoding='utf8') 

170 self._host: str = host 

171 self._port: int = port 

172 try: 

173 self.connect() 

174 except ConnectionRefusedError as e: 

175 self._connected = False 

176 err_msg = f'Unable to connect to DALEC at {host}:{port}' 

177 raise PyDalecConnectionError(err_msg) from e 

178 

179 def _setup_background_reader(self) -> None: 

180 self._measurement_log_lock = threading.Lock() 

181 self._responses: deque[str | None] = deque() 

182 self._responses_lock = threading.Lock() 

183 self._reader_thread = threading.Thread(target=self._read_incoming_data, daemon=True) 

184 self._reader_thread.start() 

185 

186 def set_measurement_log_size(self, size: int) -> None: 

187 """Resize measurement log while holding the measurement lock.""" 

188 with self._measurement_log_lock: 

189 super().set_measurement_log_size(size) 

190 

191 def send(self, data: str) -> None: 

192 """Send a single command line to the instrument.""" 

193 with self._responses_lock: 

194 self._responses.clear() 

195 self._connection.write(data + '\r\n') 

196 self._connection.flush() 

197 

198 def _get_reply(self) -> Union[str, None]: 

199 """Return the oldest response from the instrument.""" 

200 with self._responses_lock: 

201 return self._responses.popleft() if self._responses else None 

202 

203 def disconnect(self) -> None: 

204 """Close the telnet connection and stop the background reader.""" 

205 if not self._connected: 

206 return 

207 

208 _LOGGER.info('Disconnecting from DALEC at %s:%s', self._host, self._port) 

209 

210 self._connection.close() 

211 self._connected = False 

212 self._reader_thread.join(timeout=1) 

213 self._data_sink.close_all_streams() 

214 

215 def connect(self) -> None: 

216 """Reconnect to the DALEC endpoint if currently disconnected.""" 

217 if self._connected: 

218 return 

219 else: 

220 try: 

221 self._connection.connect() 

222 except ConnectionRefusedError: 

223 err_msg = ( 

224 f'Unable to connect to DALEC at {self._host}:{self._port}. ' 

225 'Check instrument power/connection and that DALECview is not connected.' 

226 ) 

227 raise PyDalecConnectionError(err_msg) from None 

228 else: 

229 self._connected = True 

230 self._setup_background_reader() 

231 _LOGGER.info('Connected to DALEC at %s:%s', self._host, self._port) 

232 

233 def _read_incoming_data(self) -> None: 

234 while True: 

235 try: 

236 raw_message = self._connection.readline() 

237 except (EOFError, RuntimeError): 

238 break 

239 if not raw_message: 

240 break 

241 

242 if isinstance(raw_message, str): 

243 message = raw_message.strip() 

244 elif isinstance(raw_message, bytes): 

245 message = raw_message.decode('utf-8').strip() 

246 else: 

247 message = bytes(raw_message).decode('utf-8').strip() 

248 

249 if message: 

250 self._handle_incoming_data(message) 

251 

252 def _handle_incoming_data(self, message: str) -> None: 

253 received_at = self._utc_now() 

254 try: 

255 measurement = Measurement.from_raw_data(message) 

256 except (ValidationError, ValueError): 

257 _LOGGER.debug('Received non-measurement line: %s', message) 

258 self._data_sink.store_line( 

259 stream='error', 

260 message=message, 

261 timestamp=received_at, 

262 ) 

263 with self._responses_lock: 

264 self._responses.append(message) 

265 return 

266 

267 self._data_sink.store_line( 

268 stream='raw', 

269 message=message, 

270 timestamp=received_at, 

271 ) 

272 

273 with self._measurement_log_lock: 

274 self.measurement_log.append(measurement) 

275 

276 @staticmethod 

277 def _utc_now() -> datetime.datetime: 

278 return datetime.datetime.now(datetime.timezone.utc) 

279 

280 def start_measurements(self) -> None: 

281 """Send command to start making measurements.""" 

282 with self._measurement_log_lock: 

283 self.measurement_log.clear() 

284 _LOGGER.debug('Sending START command') 

285 self._connection.write('START\r\n') 

286 self._connection.flush() 

287 

288 def stop_measurements(self) -> None: 

289 """Send command to stop making measurements.""" 

290 _LOGGER.debug('Sending STOP command') 

291 self._connection.write('STOP\r\n') 

292 self._connection.flush() 

293 

294 def __repr__(self) -> str: 

295 """Representation of TCPTransport instance. 

296 

297 Returns: 

298 str: formatted string showing the host and port of the TCPTransport instance. 

299 """ 

300 return self.__str__() 

301 

302 def __str__(self) -> str: 

303 """String representation of TCPTransport instance. 

304 

305 Returns: 

306 str: formatted string showing the host and port of the TCPTransport instance. 

307 """ 

308 return f'TCPTransport ({self._host}:{self._port})'