Coverage for .tox/py312/lib/python3.12/site-packages/pydalec/measurement.py: 97%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-26 22:48 +0200

1"""Class for measurement data for the pydalec mock instrument.""" 

2 

3from __future__ import annotations 

4 

5import datetime 

6import enum 

7import math 

8from decimal import Decimal 

9from typing import Literal, Self 

10 

11from pydantic import BaseModel, ConfigDict, Field, field_validator 

12 

13RAW_DATA_FIXED_FIELD_COUNT = 22 

14SPECTRUM_LENGTH = 190 

15RAW_DATA_FIELD_COUNT = RAW_DATA_FIXED_FIELD_COUNT + SPECTRUM_LENGTH 

16 

17 

18def _has_max_decimals(value: float, decimals: int) -> bool: 

19 """Check if a float value has at most a specified number of decimal places.""" 

20 if math.isnan(value): 

21 return True 

22 decimal_value = Decimal(str(value)) 

23 quantized_value: Decimal = decimal_value.quantize(Decimal(1).scaleb(-decimals)) 

24 return decimal_value == quantized_value 

25 

26 

27def _parse_utc_timestamp(value: str) -> datetime.datetime: 

28 """Parse a UTC timestamp emitted by the instrument.""" 

29 return datetime.datetime.fromisoformat(value.replace('Z', '+00:00')) 

30 

31 

32def _normalize_serial_number(value: str) -> str: 

33 """Normalize instrument serial numbers to the model's four-digit format.""" 

34 return value.zfill(4) if value.isdigit() and len(value) < 4 else value 

35 

36 

37def _validate_range_or_nan(value: float, minimum: float, maximum: float, field_name: str) -> float: 

38 """Accept NaN for unavailable readings; otherwise enforce the numeric range.""" 

39 if math.isnan(value): 

40 return value 

41 if value < minimum or value > maximum: 

42 err_msg = f'{field_name} must be in range {minimum}..{maximum}' 

43 raise ValueError(err_msg) 

44 return value 

45 

46 

47class StatusFlag(enum.IntEnum): 

48 """Status indicator used in telemetry data. 

49 

50 0 means stationary during integration, 1 means moving during integration. 

51 """ 

52 

53 STATIONARY = 0 

54 MOVING = 1 

55 

56 

57class Coordinates(BaseModel): 

58 """Position (lat/lon) in decimal degrees.""" 

59 

60 model_config = ConfigDict(extra='forbid') 

61 

62 lat: float 

63 lon: float 

64 

65 @field_validator('lat') 

66 @classmethod 

67 def _validate_lat(cls, value: float) -> float: 

68 return _validate_range_or_nan(value, -90.0, 90.0, 'lat') 

69 

70 @field_validator('lon') 

71 @classmethod 

72 def _validate_lon(cls, value: float) -> float: 

73 return _validate_range_or_nan(value, -180.0, 180.0, 'lon') 

74 

75 def __str__(self) -> str: 

76 """String representation in the format 'lat,lon' with N/S and E/W.""" 

77 latitude: str = f'{abs(self.lat):010.7f}N' if self.lat >= 0 else f'{abs(self.lat):010.7f}S' 

78 longitude: str = f'{abs(self.lon):010.7f}E' if self.lon >= 0 else f'{abs(self.lon):010.7f}W' 

79 return f'{latitude},{longitude}' 

80 

81 

82class Telemetry(BaseModel): 

83 """Container for telemetry data part of the measurement data. 

84 

85 Attributes: 

86 voltage_volts (float): Voltage reading in volts. 

87 humidity_mm_hg (float): Humidity reading in mmHg. 

88 temperature_diode_celsius (float): Temperature reading from diode in degrees Celsius. 

89 status_flag (StatusFlag): Status indicator where 0 = stationary and 1 = moving 

90 during integration. 

91 

92 Validation: 

93 - voltage_volts: Must have at most 1 decimal place 

94 - humidity_mm_hg: Must have at most 1 decimal place 

95 - temperature_diode_celsius: Must have at most 3 decimal places 

96 - Extra fields are forbidden (model_config sets extra='forbid') 

97 

98 String Representation: 

99 Returns a formatted string with all telemetry values in the format: 

100 "Voltage:XXXX.X;Humidity:XX.X;Temperature:XX.XXX;Status flag:[STATIONARY|MOVING]" 

101 """ 

102 

103 model_config = ConfigDict(extra='forbid') 

104 

105 voltage_volts: float 

106 humidity_mm_hg: float 

107 temperature_diode_celsius: float 

108 status_flag: int = Field(ge=0) # multi-bit quality flag from instrument Qflag field 

109 

110 @field_validator('voltage_volts') 

111 @classmethod 

112 def _validate_voltage_precision(cls, value: float) -> float: 

113 if not _has_max_decimals(value, 1): 

114 err_msg: str = f'voltage_volts must have at most 1 decimal place (value: {value})' 

115 raise ValueError(err_msg) 

116 return value 

117 

118 @field_validator('humidity_mm_hg') 

119 @classmethod 

120 def _validate_humidity_precision(cls, value: float) -> float: 

121 if not _has_max_decimals(value, 1): 

122 err_msg: str = f'Humidity_mm_hg must have at most 1 decimal place (value: {value})' 

123 raise ValueError(err_msg) 

124 return value 

125 

126 @field_validator('temperature_diode_celsius') 

127 @classmethod 

128 def _validate_temp_precision(cls, value: float) -> float: 

129 if not _has_max_decimals(value, decimals=3): 

130 err_msg: str = ( 

131 f'temperature_diode_celsius must have at most 3 decimal places (value: {value})' 

132 ) 

133 raise ValueError(err_msg) 

134 return value 

135 

136 def __str__(self) -> str: 

137 """String representation of the telemetry data. 

138 

139 Returns a string in the format: 

140 "Voltage:XXXX.X;Humidity:XX.X;Temperature:XX.XXX;Status flag:X" 

141 """ 

142 return ( 

143 f'Voltage:{self.voltage_volts:04.1f};Humidity:{self.humidity_mm_hg:04.1f};Temperature:' 

144 f'{self.temperature_diode_celsius:06.3f};Status flag:{self.status_flag}' 

145 ) 

146 

147 

148class Measurement(BaseModel): 

149 """Container for measurement data from the mock instrument.""" 

150 

151 model_config = ConfigDict(extra='forbid') 

152 

153 device_id: Literal['DALEC'] 

154 serial_number: str = Field(pattern=r'^\d{4}$') # four digits, e.g. '0010' 

155 channel_type: Literal['Ed', 'Lu', 'Lsky'] 

156 utc_time: datetime.datetime 

157 location: Coordinates 

158 sat_compass_heading: float 

159 solar_azimuth_deg: float 

160 solar_zenith_deg: float 

161 gear_position_deg: float 

162 azimuth_deg: float 

163 relative_azimuth_deg: float 

164 pitch_start_measurement_deg: float 

165 roll_start_measurement_deg: float 

166 telemetry: Telemetry 

167 int_time: int = Field(ge=1, le=6000) 

168 signal_percentage: float 

169 dark_counts: int = Field(ge=0, le=65535) 

170 max_counts: int = Field(ge=0, le=65535) 

171 spectrum: list[int] = Field( 

172 min_length=190, 

173 max_length=190, 

174 description='List of 190 integer values representing the spectrum, each in range 0..65535', 

175 ) # Validation in _validate_spectrum_values 

176 

177 @field_validator('utc_time') 

178 @classmethod 

179 def _validate_utc_time(cls, v: datetime.datetime) -> datetime.datetime: 

180 if v.tzinfo is None or v.utcoffset() != datetime.timedelta(0): 

181 err_msg = 'utc_time must be timezone-aware UTC' 

182 raise ValueError(err_msg) 

183 return v 

184 

185 @field_validator('spectrum') 

186 @classmethod 

187 def _validate_spectrum_values(cls, value: list[int]) -> list[int]: 

188 if any((x < 0 or x > 65535) for x in value): 

189 err_msg = 'All spectrum values must be in range 0..65535' 

190 raise ValueError(err_msg) 

191 return value 

192 

193 @field_validator( 

194 'sat_compass_heading', 

195 'solar_azimuth_deg', 

196 'solar_zenith_deg', 

197 'azimuth_deg', 

198 ) 

199 @classmethod 

200 def _validate_bearing_fields(cls, value: float, info) -> float: 

201 return _validate_range_or_nan(value, 0.0, 359.9, info.field_name) 

202 

203 @field_validator('gear_position_deg', 'relative_azimuth_deg', 'roll_start_measurement_deg') 

204 @classmethod 

205 def _validate_signed_heading_fields(cls, value: float, info) -> float: 

206 return _validate_range_or_nan(value, -179.9, 180.0, info.field_name) 

207 

208 @field_validator('pitch_start_measurement_deg') 

209 @classmethod 

210 def _validate_pitch_field(cls, value: float) -> float: 

211 return _validate_range_or_nan(value, -90.0, 90.0, 'pitch_start_measurement_deg') 

212 

213 @field_validator('signal_percentage') 

214 @classmethod 

215 def _validate_signal_percentage(cls, value: float) -> float: 

216 return _validate_range_or_nan(value, 0.0, 100.0, 'signal_percentage') 

217 

218 def __str__(self) -> str: 

219 """Human-readable multiline string representation of a measurement.""" 

220 spectrum_values = ', '.join(str(value) for value in self.spectrum) 

221 return ( 

222 f'device_id={self.device_id!r},\n' 

223 f'serial_number={self.serial_number!r},\n' 

224 f'channel_type={self.channel_type!r},\n' 

225 f'utc_time={self.utc_time!r},\n' 

226 f'location={self.location!r},\n' 

227 f'sat_compass_heading={self.sat_compass_heading},\n' 

228 f'solar_azimuth_deg={self.solar_azimuth_deg}, ' 

229 f'solar_zenith_deg={self.solar_zenith_deg},\n' 

230 f'gear_position_deg={self.gear_position_deg},\n' 

231 f'azimuth_deg={self.azimuth_deg}, relative_azimuth_deg={self.relative_azimuth_deg},\n' 

232 'pitch_start_measurement_deg=' 

233 f'{self.pitch_start_measurement_deg}, ' 

234 f'roll_start_measurement_deg={self.roll_start_measurement_deg},\n' 

235 'telemetry=Telemetry(\n' 

236 f'\tvoltage_volts={self.telemetry.voltage_volts},\n' 

237 f'\thumidity_mm_hg={self.telemetry.humidity_mm_hg},\n' 

238 f'\ttemperature_diode_celsius={self.telemetry.temperature_diode_celsius},\n' 

239 f'\tstatus_flag={self.telemetry.status_flag},\n' 

240 '),\n' 

241 f'int_time={self.int_time},\n' 

242 f'signal_percentage={self.signal_percentage},\n' 

243 f'dark_counts={self.dark_counts},\n' 

244 f'max_counts={self.max_counts},\n' 

245 f'spectrum=[{spectrum_values}]' 

246 ) 

247 

248 @property 

249 def has_valid_position_fix(self) -> bool: 

250 """Return True if location has non-NaN latitude and longitude. 

251 

252 This property checks whether the GNSS position fix is available. 

253 A valid fix requires both lat and lon to be non-NaN values. 

254 """ 

255 lat = getattr(self.location, 'lat', float('nan')) 

256 lon = getattr(self.location, 'lon', float('nan')) 

257 return not (math.isnan(lat) or math.isnan(lon)) 

258 

259 @property 

260 def has_valid_solar_zenith(self) -> bool: 

261 """Return True if solar_zenith_deg is non-NaN. 

262 

263 This property checks whether the solar zenith angle measurement 

264 is available. 

265 """ 

266 return not math.isnan(self.solar_zenith_deg) 

267 

268 @classmethod 

269 def from_raw_data(cls, raw_data: str) -> Self: 

270 """Factory method from string to Measurement model.""" 

271 fields = [field.strip() for field in raw_data.strip().split(',')] 

272 if len(fields) != RAW_DATA_FIELD_COUNT: 

273 err_msg = f'raw_data must contain {RAW_DATA_FIELD_COUNT} comma-separated fields' 

274 raise ValueError(err_msg) 

275 

276 payload = { 

277 'device_id': fields[0], 

278 'serial_number': _normalize_serial_number(fields[1]), 

279 'channel_type': fields[2], 

280 'utc_time': _parse_utc_timestamp(fields[3]), 

281 'location': { 

282 'lat': float(fields[4]), 

283 'lon': float(fields[5]), 

284 }, 

285 'sat_compass_heading': float(fields[6]), 

286 'solar_azimuth_deg': float(fields[7]), 

287 'solar_zenith_deg': float(fields[8]), 

288 'gear_position_deg': float(fields[9]), 

289 'azimuth_deg': float(fields[10]), 

290 'relative_azimuth_deg': float(fields[11]), 

291 'pitch_start_measurement_deg': float(fields[12]), 

292 'roll_start_measurement_deg': float(fields[13]), 

293 'telemetry': { 

294 'voltage_volts': float(fields[14]), 

295 'humidity_mm_hg': float(fields[15]), 

296 'temperature_diode_celsius': float(fields[16]), 

297 'status_flag': int(fields[17], 2), 

298 }, 

299 'int_time': int(fields[18]), 

300 'signal_percentage': float(fields[19]), 

301 'dark_counts': int(fields[20]), 

302 'max_counts': int(fields[21]), 

303 'spectrum': [int(value) for value in fields[RAW_DATA_FIXED_FIELD_COUNT:]], 

304 } 

305 return cls.model_validate(payload)