Coverage for .tox/py312/lib/python3.12/site-packages/pydalec/instrument.py: 93%

86 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-26 22:48 +0200

1"""Synchronous client for communicating with a DALEC instrument.""" 

2 

3from __future__ import annotations 

4 

5import time 

6from collections import deque 

7from dataclasses import dataclass 

8from typing import Any, Callable, TypeVar 

9 

10from pydalec.errors import ( 

11 PyDalecNoPositionDataError, 

12 PyDalecNoSolarZenithDataError, 

13) 

14from pydalec.transport.mock import MockTransport 

15from pydalec.transport.tcp import TCPTransport 

16 

17 

18class DalecStatus: 

19 """Instrument status.""" 

20 

21 def __init__(self): 

22 """Initialize the status with default values.""" 

23 self.measuring: bool = False 

24 

25 

26@dataclass(frozen=True) 

27class Location: 

28 """Simple latitude/longitude position fix.""" 

29 

30 lat: float 

31 lon: float 

32 

33 def __str__(self) -> str: 

34 """Return string representation of the class. 

35 

36 Returns: 

37 str: representation of the class, including lat and lon values. 

38 """ 

39 lat: str = ( 

40 f'{self.lat}\N{{DEGREE SIGN}}N' if self.lat >= 0 else f'{-self.lat}\N{{DEGREE SIGN}}S' 

41 ) 

42 lon: str = ( 

43 f'{self.lon}\N{{DEGREE SIGN}}E' if self.lon >= 0 else f'{-self.lon}\N{{DEGREE SIGN}}W' 

44 ) 

45 return f'Location(lat={lat}, lon={lon})' 

46 

47 

48class Dalec: 

49 """Client API for synchronous DALEC commands.""" 

50 

51 def __init__(self, transport): 

52 """Initialize the client with a transport implementation.""" 

53 self.transport = transport 

54 self.status = DalecStatus() 

55 

56 @classmethod 

57 def connect_tcp( 

58 cls, 

59 host: str = '127.0.0.1', 

60 port: int = 23, 

61 data_root_dir: str | None = None, 

62 max_file_size_kb: int = 51200, 

63 ): 

64 """Create a client connected to a DALEC TCP endpoint.""" 

65 return cls( 

66 transport=TCPTransport( 

67 host, 

68 port, 

69 data_root_dir=data_root_dir, 

70 max_file_size_kb=max_file_size_kb, 

71 ) 

72 ) 

73 

74 @classmethod 

75 def connect_mock( 

76 cls, 

77 delay: float = 0.0, 

78 error_rate: float = 0.0, 

79 data_root_dir: str | None = None, 

80 max_file_size_kb: int = 51200, 

81 ): 

82 """Create a client using the in-memory mock transport.""" 

83 return cls( 

84 MockTransport( 

85 delay=delay, 

86 error_rate=error_rate, 

87 data_root_dir=data_root_dir, 

88 max_file_size_kb=max_file_size_kb, 

89 ) 

90 ) 

91 

92 def disconnect(self) -> None: 

93 """Close the transport connection.""" 

94 self.transport.disconnect() 

95 

96 @property 

97 def connected(self) -> bool: 

98 """Return True if the client is currently connected to the instrument.""" 

99 return self.transport.connected 

100 

101 def connect(self) -> None: 

102 """Reconnect to the instrument if currently disconnected.""" 

103 if not self.connected: 

104 self.transport.connect() 

105 

106 @property 

107 def measurement_log(self): 

108 """Return a list of recent measurements from the instrument.""" 

109 return list(self.transport.measurement_log) 

110 

111 def __repr__(self) -> str: 

112 """Representation of DALEC instance. 

113 

114 Returns: 

115 str: formatted string showing the transport information of the DALEC instance. 

116 """ 

117 return self.__str__() 

118 

119 def __str__(self) -> str: 

120 """Return string representation of the class. 

121 

122 Returns: 

123 str: representation of the class, including transport information. 

124 """ 

125 return f'DALEC at {self.transport}' 

126 

127 def start_measurements(self) -> None: 

128 """Start the background process for making and receiving measurements.""" 

129 self.transport.start_measurements() 

130 self.status.measuring = True 

131 

132 def stop_measurements(self) -> None: 

133 """Stop the background process for making and receiving measurements.""" 

134 self.transport.stop_measurements() 

135 self.status.measuring = False 

136 

137 T = TypeVar('T') 

138 

139 def _get_measurement_field( 

140 self, 

141 timeout_secs: float, 

142 is_valid: Callable[[Any], bool], 

143 extract_value: Callable[[Any], T], 

144 timeout_error_type: type[Exception], 

145 timeout_error_template: str, 

146 ) -> T: 

147 """Poll measurements until a valid value is available or timeout expires.""" 

148 if timeout_secs <= 0: 

149 err_msg = 'timeout must be greater than 0 seconds' 

150 raise ValueError(err_msg) 

151 

152 was_measuring: bool = self.status.measuring 

153 saved_log = deque( 

154 self.transport.measurement_log, maxlen=self.transport.measurement_log.maxlen 

155 ) 

156 last_seen_measurement = None 

157 

158 if not was_measuring: 

159 self.start_measurements() 

160 

161 deadline = time.monotonic() + timeout_secs 

162 try: 

163 while time.monotonic() < deadline: 

164 measurements = list(self.transport.measurement_log) 

165 

166 if last_seen_measurement is None: 

167 new_measurements = measurements 

168 else: 

169 try: 

170 last_index = measurements.index(last_seen_measurement) 

171 new_measurements = measurements[last_index + 1 :] 

172 except ValueError: 

173 new_measurements = measurements 

174 

175 for measurement in new_measurements: 

176 if is_valid(measurement): 

177 return extract_value(measurement) 

178 last_seen_measurement = measurement 

179 

180 time.sleep(0.05) 

181 finally: 

182 if not was_measuring: 

183 try: 

184 self.stop_measurements() 

185 finally: 

186 self.transport.measurement_log = deque(saved_log, maxlen=saved_log.maxlen) 

187 

188 err_msg = timeout_error_template.format(timeout_secs=timeout_secs) 

189 raise timeout_error_type(err_msg) 

190 

191 def get_location(self, timeout_secs: float = 10.0) -> Location: 

192 """Return the first valid GNSS position fix received within timeout.""" 

193 return self._get_measurement_field( 

194 timeout_secs=timeout_secs, 

195 is_valid=lambda measurement: measurement.has_valid_position_fix, 

196 extract_value=lambda measurement: Location( 

197 lat=measurement.location.lat, 

198 lon=measurement.location.lon, 

199 ), 

200 timeout_error_type=PyDalecNoPositionDataError, 

201 timeout_error_template='No valid GNSS position fix received within {timeout_secs:.1f}s', 

202 ) 

203 

204 def get_solar_zenith(self, timeout_secs: float = 10.0) -> float: 

205 """Return the first valid solar zenith received within timeout.""" 

206 return self._get_measurement_field( 

207 timeout_secs=timeout_secs, 

208 is_valid=lambda measurement: measurement.has_valid_solar_zenith, 

209 extract_value=lambda measurement: measurement.solar_zenith_deg, 

210 timeout_error_type=PyDalecNoSolarZenithDataError, 

211 timeout_error_template='No valid solar zenith received within {timeout_secs:.1f}s', 

212 )