Coverage for .tox/py313/lib/python3.13/site-packages/pydalec/instrument.py: 93%
86 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-26 22:48 +0200
1"""Synchronous client for communicating with a DALEC instrument."""
3from __future__ import annotations
5import time
6from collections import deque
7from dataclasses import dataclass
8from typing import Any, Callable, TypeVar
10from pydalec.errors import (
11 PyDalecNoPositionDataError,
12 PyDalecNoSolarZenithDataError,
13)
14from pydalec.transport.mock import MockTransport
15from pydalec.transport.tcp import TCPTransport
18class DalecStatus:
19 """Instrument status."""
21 def __init__(self):
22 """Initialize the status with default values."""
23 self.measuring: bool = False
26@dataclass(frozen=True)
27class Location:
28 """Simple latitude/longitude position fix."""
30 lat: float
31 lon: float
33 def __str__(self) -> str:
34 """Return string representation of the class.
36 Returns:
37 str: representation of the class, including lat and lon values.
38 """
39 lat: str = (
40 f'{self.lat}\N{{DEGREE SIGN}}N' if self.lat >= 0 else f'{-self.lat}\N{{DEGREE SIGN}}S'
41 )
42 lon: str = (
43 f'{self.lon}\N{{DEGREE SIGN}}E' if self.lon >= 0 else f'{-self.lon}\N{{DEGREE SIGN}}W'
44 )
45 return f'Location(lat={lat}, lon={lon})'
48class Dalec:
49 """Client API for synchronous DALEC commands."""
51 def __init__(self, transport):
52 """Initialize the client with a transport implementation."""
53 self.transport = transport
54 self.status = DalecStatus()
56 @classmethod
57 def connect_tcp(
58 cls,
59 host: str = '127.0.0.1',
60 port: int = 23,
61 data_root_dir: str | None = None,
62 max_file_size_kb: int = 51200,
63 ):
64 """Create a client connected to a DALEC TCP endpoint."""
65 return cls(
66 transport=TCPTransport(
67 host,
68 port,
69 data_root_dir=data_root_dir,
70 max_file_size_kb=max_file_size_kb,
71 )
72 )
74 @classmethod
75 def connect_mock(
76 cls,
77 delay: float = 0.0,
78 error_rate: float = 0.0,
79 data_root_dir: str | None = None,
80 max_file_size_kb: int = 51200,
81 ):
82 """Create a client using the in-memory mock transport."""
83 return cls(
84 MockTransport(
85 delay=delay,
86 error_rate=error_rate,
87 data_root_dir=data_root_dir,
88 max_file_size_kb=max_file_size_kb,
89 )
90 )
92 def disconnect(self) -> None:
93 """Close the transport connection."""
94 self.transport.disconnect()
96 @property
97 def connected(self) -> bool:
98 """Return True if the client is currently connected to the instrument."""
99 return self.transport.connected
101 def connect(self) -> None:
102 """Reconnect to the instrument if currently disconnected."""
103 if not self.connected:
104 self.transport.connect()
106 @property
107 def measurement_log(self):
108 """Return a list of recent measurements from the instrument."""
109 return list(self.transport.measurement_log)
111 def __repr__(self) -> str:
112 """Representation of DALEC instance.
114 Returns:
115 str: formatted string showing the transport information of the DALEC instance.
116 """
117 return self.__str__()
119 def __str__(self) -> str:
120 """Return string representation of the class.
122 Returns:
123 str: representation of the class, including transport information.
124 """
125 return f'DALEC at {self.transport}'
127 def start_measurements(self) -> None:
128 """Start the background process for making and receiving measurements."""
129 self.transport.start_measurements()
130 self.status.measuring = True
132 def stop_measurements(self) -> None:
133 """Stop the background process for making and receiving measurements."""
134 self.transport.stop_measurements()
135 self.status.measuring = False
137 T = TypeVar('T')
139 def _get_measurement_field(
140 self,
141 timeout_secs: float,
142 is_valid: Callable[[Any], bool],
143 extract_value: Callable[[Any], T],
144 timeout_error_type: type[Exception],
145 timeout_error_template: str,
146 ) -> T:
147 """Poll measurements until a valid value is available or timeout expires."""
148 if timeout_secs <= 0:
149 err_msg = 'timeout must be greater than 0 seconds'
150 raise ValueError(err_msg)
152 was_measuring: bool = self.status.measuring
153 saved_log = deque(
154 self.transport.measurement_log, maxlen=self.transport.measurement_log.maxlen
155 )
156 last_seen_measurement = None
158 if not was_measuring:
159 self.start_measurements()
161 deadline = time.monotonic() + timeout_secs
162 try:
163 while time.monotonic() < deadline:
164 measurements = list(self.transport.measurement_log)
166 if last_seen_measurement is None:
167 new_measurements = measurements
168 else:
169 try:
170 last_index = measurements.index(last_seen_measurement)
171 new_measurements = measurements[last_index + 1 :]
172 except ValueError:
173 new_measurements = measurements
175 for measurement in new_measurements:
176 if is_valid(measurement):
177 return extract_value(measurement)
178 last_seen_measurement = measurement
180 time.sleep(0.05)
181 finally:
182 if not was_measuring:
183 try:
184 self.stop_measurements()
185 finally:
186 self.transport.measurement_log = deque(saved_log, maxlen=saved_log.maxlen)
188 err_msg = timeout_error_template.format(timeout_secs=timeout_secs)
189 raise timeout_error_type(err_msg)
191 def get_location(self, timeout_secs: float = 10.0) -> Location:
192 """Return the first valid GNSS position fix received within timeout."""
193 return self._get_measurement_field(
194 timeout_secs=timeout_secs,
195 is_valid=lambda measurement: measurement.has_valid_position_fix,
196 extract_value=lambda measurement: Location(
197 lat=measurement.location.lat,
198 lon=measurement.location.lon,
199 ),
200 timeout_error_type=PyDalecNoPositionDataError,
201 timeout_error_template='No valid GNSS position fix received within {timeout_secs:.1f}s',
202 )
204 def get_solar_zenith(self, timeout_secs: float = 10.0) -> float:
205 """Return the first valid solar zenith received within timeout."""
206 return self._get_measurement_field(
207 timeout_secs=timeout_secs,
208 is_valid=lambda measurement: measurement.has_valid_solar_zenith,
209 extract_value=lambda measurement: measurement.solar_zenith_deg,
210 timeout_error_type=PyDalecNoSolarZenithDataError,
211 timeout_error_template='No valid solar zenith received within {timeout_secs:.1f}s',
212 )