Coverage for .tox/py311/lib/python3.11/site-packages/pydalec/measurement.py: 97%
145 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
1"""Class for measurement data for the pydalec mock instrument."""
3from __future__ import annotations
5import datetime
6import enum
7import math
8from decimal import Decimal
9from typing import Literal, Self
11from pydantic import BaseModel, ConfigDict, Field, field_validator
13RAW_DATA_FIXED_FIELD_COUNT = 22
14SPECTRUM_LENGTH = 190
15RAW_DATA_FIELD_COUNT = RAW_DATA_FIXED_FIELD_COUNT + SPECTRUM_LENGTH
18def _has_max_decimals(value: float, decimals: int) -> bool:
19 """Check if a float value has at most a specified number of decimal places."""
20 if math.isnan(value):
21 return True
22 decimal_value = Decimal(str(value))
23 quantized_value: Decimal = decimal_value.quantize(Decimal(1).scaleb(-decimals))
24 return decimal_value == quantized_value
27def _parse_utc_timestamp(value: str) -> datetime.datetime:
28 """Parse a UTC timestamp emitted by the instrument."""
29 return datetime.datetime.fromisoformat(value.replace('Z', '+00:00'))
32def _normalize_serial_number(value: str) -> str:
33 """Normalize instrument serial numbers to the model's four-digit format."""
34 return value.zfill(4) if value.isdigit() and len(value) < 4 else value
37def _validate_range_or_nan(value: float, minimum: float, maximum: float, field_name: str) -> float:
38 """Accept NaN for unavailable readings; otherwise enforce the numeric range."""
39 if math.isnan(value):
40 return value
41 if value < minimum or value > maximum:
42 err_msg = f'{field_name} must be in range {minimum}..{maximum}'
43 raise ValueError(err_msg)
44 return value
47class StatusFlag(enum.IntEnum):
48 """Status indicator used in telemetry data.
50 0 means stationary during integration, 1 means moving during integration.
51 """
53 STATIONARY = 0
54 MOVING = 1
57class Coordinates(BaseModel):
58 """Position (lat/lon) in decimal degrees."""
60 model_config = ConfigDict(extra='forbid')
62 lat: float
63 lon: float
65 @field_validator('lat')
66 @classmethod
67 def _validate_lat(cls, value: float) -> float:
68 return _validate_range_or_nan(value, -90.0, 90.0, 'lat')
70 @field_validator('lon')
71 @classmethod
72 def _validate_lon(cls, value: float) -> float:
73 return _validate_range_or_nan(value, -180.0, 180.0, 'lon')
75 def __str__(self) -> str:
76 """String representation in the format 'lat,lon' with N/S and E/W."""
77 latitude: str = f'{abs(self.lat):010.7f}N' if self.lat >= 0 else f'{abs(self.lat):010.7f}S'
78 longitude: str = f'{abs(self.lon):010.7f}E' if self.lon >= 0 else f'{abs(self.lon):010.7f}W'
79 return f'{latitude},{longitude}'
82class Telemetry(BaseModel):
83 """Container for telemetry data part of the measurement data.
85 Attributes:
86 voltage_volts (float): Voltage reading in volts.
87 humidity_mm_hg (float): Humidity reading in mmHg.
88 temperature_diode_celsius (float): Temperature reading from diode in degrees Celsius.
89 status_flag (StatusFlag): Status indicator where 0 = stationary and 1 = moving
90 during integration.
92 Validation:
93 - voltage_volts: Must have at most 1 decimal place
94 - humidity_mm_hg: Must have at most 1 decimal place
95 - temperature_diode_celsius: Must have at most 3 decimal places
96 - Extra fields are forbidden (model_config sets extra='forbid')
98 String Representation:
99 Returns a formatted string with all telemetry values in the format:
100 "Voltage:XXXX.X;Humidity:XX.X;Temperature:XX.XXX;Status flag:[STATIONARY|MOVING]"
101 """
103 model_config = ConfigDict(extra='forbid')
105 voltage_volts: float
106 humidity_mm_hg: float
107 temperature_diode_celsius: float
108 status_flag: int = Field(ge=0) # multi-bit quality flag from instrument Qflag field
110 @field_validator('voltage_volts')
111 @classmethod
112 def _validate_voltage_precision(cls, value: float) -> float:
113 if not _has_max_decimals(value, 1):
114 err_msg: str = f'voltage_volts must have at most 1 decimal place (value: {value})'
115 raise ValueError(err_msg)
116 return value
118 @field_validator('humidity_mm_hg')
119 @classmethod
120 def _validate_humidity_precision(cls, value: float) -> float:
121 if not _has_max_decimals(value, 1):
122 err_msg: str = f'Humidity_mm_hg must have at most 1 decimal place (value: {value})'
123 raise ValueError(err_msg)
124 return value
126 @field_validator('temperature_diode_celsius')
127 @classmethod
128 def _validate_temp_precision(cls, value: float) -> float:
129 if not _has_max_decimals(value, decimals=3):
130 err_msg: str = (
131 f'temperature_diode_celsius must have at most 3 decimal places (value: {value})'
132 )
133 raise ValueError(err_msg)
134 return value
136 def __str__(self) -> str:
137 """String representation of the telemetry data.
139 Returns a string in the format:
140 "Voltage:XXXX.X;Humidity:XX.X;Temperature:XX.XXX;Status flag:X"
141 """
142 return (
143 f'Voltage:{self.voltage_volts:04.1f};Humidity:{self.humidity_mm_hg:04.1f};Temperature:'
144 f'{self.temperature_diode_celsius:06.3f};Status flag:{self.status_flag}'
145 )
148class Measurement(BaseModel):
149 """Container for measurement data from the mock instrument."""
151 model_config = ConfigDict(extra='forbid')
153 device_id: Literal['DALEC']
154 serial_number: str = Field(pattern=r'^\d{4}$') # four digits, e.g. '0010'
155 channel_type: Literal['Ed', 'Lu', 'Lsky']
156 utc_time: datetime.datetime
157 location: Coordinates
158 sat_compass_heading: float
159 solar_azimuth_deg: float
160 solar_zenith_deg: float
161 gear_position_deg: float
162 azimuth_deg: float
163 relative_azimuth_deg: float
164 pitch_start_measurement_deg: float
165 roll_start_measurement_deg: float
166 telemetry: Telemetry
167 int_time: int = Field(ge=1, le=6000)
168 signal_percentage: float
169 dark_counts: int = Field(ge=0, le=65535)
170 max_counts: int = Field(ge=0, le=65535)
171 spectrum: list[int] = Field(
172 min_length=190,
173 max_length=190,
174 description='List of 190 integer values representing the spectrum, each in range 0..65535',
175 ) # Validation in _validate_spectrum_values
177 @field_validator('utc_time')
178 @classmethod
179 def _validate_utc_time(cls, v: datetime.datetime) -> datetime.datetime:
180 if v.tzinfo is None or v.utcoffset() != datetime.timedelta(0):
181 err_msg = 'utc_time must be timezone-aware UTC'
182 raise ValueError(err_msg)
183 return v
185 @field_validator('spectrum')
186 @classmethod
187 def _validate_spectrum_values(cls, value: list[int]) -> list[int]:
188 if any((x < 0 or x > 65535) for x in value):
189 err_msg = 'All spectrum values must be in range 0..65535'
190 raise ValueError(err_msg)
191 return value
193 @field_validator(
194 'sat_compass_heading',
195 'solar_azimuth_deg',
196 'solar_zenith_deg',
197 'azimuth_deg',
198 )
199 @classmethod
200 def _validate_bearing_fields(cls, value: float, info) -> float:
201 return _validate_range_or_nan(value, 0.0, 359.9, info.field_name)
203 @field_validator('gear_position_deg', 'relative_azimuth_deg', 'roll_start_measurement_deg')
204 @classmethod
205 def _validate_signed_heading_fields(cls, value: float, info) -> float:
206 return _validate_range_or_nan(value, -179.9, 180.0, info.field_name)
208 @field_validator('pitch_start_measurement_deg')
209 @classmethod
210 def _validate_pitch_field(cls, value: float) -> float:
211 return _validate_range_or_nan(value, -90.0, 90.0, 'pitch_start_measurement_deg')
213 @field_validator('signal_percentage')
214 @classmethod
215 def _validate_signal_percentage(cls, value: float) -> float:
216 return _validate_range_or_nan(value, 0.0, 100.0, 'signal_percentage')
218 def __str__(self) -> str:
219 """Human-readable multiline string representation of a measurement."""
220 spectrum_values = ', '.join(str(value) for value in self.spectrum)
221 return (
222 f'device_id={self.device_id!r},\n'
223 f'serial_number={self.serial_number!r},\n'
224 f'channel_type={self.channel_type!r},\n'
225 f'utc_time={self.utc_time!r},\n'
226 f'location={self.location!r},\n'
227 f'sat_compass_heading={self.sat_compass_heading},\n'
228 f'solar_azimuth_deg={self.solar_azimuth_deg}, '
229 f'solar_zenith_deg={self.solar_zenith_deg},\n'
230 f'gear_position_deg={self.gear_position_deg},\n'
231 f'azimuth_deg={self.azimuth_deg}, relative_azimuth_deg={self.relative_azimuth_deg},\n'
232 'pitch_start_measurement_deg='
233 f'{self.pitch_start_measurement_deg}, '
234 f'roll_start_measurement_deg={self.roll_start_measurement_deg},\n'
235 'telemetry=Telemetry(\n'
236 f'\tvoltage_volts={self.telemetry.voltage_volts},\n'
237 f'\thumidity_mm_hg={self.telemetry.humidity_mm_hg},\n'
238 f'\ttemperature_diode_celsius={self.telemetry.temperature_diode_celsius},\n'
239 f'\tstatus_flag={self.telemetry.status_flag},\n'
240 '),\n'
241 f'int_time={self.int_time},\n'
242 f'signal_percentage={self.signal_percentage},\n'
243 f'dark_counts={self.dark_counts},\n'
244 f'max_counts={self.max_counts},\n'
245 f'spectrum=[{spectrum_values}]'
246 )
248 @property
249 def has_valid_position_fix(self) -> bool:
250 """Return True if location has non-NaN latitude and longitude.
252 This property checks whether the GNSS position fix is available.
253 A valid fix requires both lat and lon to be non-NaN values.
254 """
255 lat = getattr(self.location, 'lat', float('nan'))
256 lon = getattr(self.location, 'lon', float('nan'))
257 return not (math.isnan(lat) or math.isnan(lon))
259 @property
260 def has_valid_solar_zenith(self) -> bool:
261 """Return True if solar_zenith_deg is non-NaN.
263 This property checks whether the solar zenith angle measurement
264 is available.
265 """
266 return not math.isnan(self.solar_zenith_deg)
268 @classmethod
269 def from_raw_data(cls, raw_data: str) -> Self:
270 """Factory method from string to Measurement model."""
271 fields = [field.strip() for field in raw_data.strip().split(',')]
272 if len(fields) != RAW_DATA_FIELD_COUNT:
273 err_msg = f'raw_data must contain {RAW_DATA_FIELD_COUNT} comma-separated fields'
274 raise ValueError(err_msg)
276 payload = {
277 'device_id': fields[0],
278 'serial_number': _normalize_serial_number(fields[1]),
279 'channel_type': fields[2],
280 'utc_time': _parse_utc_timestamp(fields[3]),
281 'location': {
282 'lat': float(fields[4]),
283 'lon': float(fields[5]),
284 },
285 'sat_compass_heading': float(fields[6]),
286 'solar_azimuth_deg': float(fields[7]),
287 'solar_zenith_deg': float(fields[8]),
288 'gear_position_deg': float(fields[9]),
289 'azimuth_deg': float(fields[10]),
290 'relative_azimuth_deg': float(fields[11]),
291 'pitch_start_measurement_deg': float(fields[12]),
292 'roll_start_measurement_deg': float(fields[13]),
293 'telemetry': {
294 'voltage_volts': float(fields[14]),
295 'humidity_mm_hg': float(fields[15]),
296 'temperature_diode_celsius': float(fields[16]),
297 'status_flag': int(fields[17], 2),
298 },
299 'int_time': int(fields[18]),
300 'signal_percentage': float(fields[19]),
301 'dark_counts': int(fields[20]),
302 'max_counts': int(fields[21]),
303 'spectrum': [int(value) for value in fields[RAW_DATA_FIXED_FIELD_COUNT:]],
304 }
305 return cls.model_validate(payload)