Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ channel.py: 67%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4 

5from collections import OrderedDict 

6from typing import Annotated 

7 

8import numpy as np 

9from loguru import logger 

10from pydantic import ( 

11 AliasChoices, 

12 computed_field, 

13 Field, 

14 field_validator, 

15 PrivateAttr, 

16 ValidationInfo, 

17) 

18 

19from mt_metadata import NULL_VALUES 

20from mt_metadata.base import helpers, MetadataBase 

21from mt_metadata.common import ( 

22 BasicLocation, 

23 Comment, 

24 DataQuality, 

25 Fdsn, 

26 Instrument, 

27 TimePeriod, 

28) 

29from mt_metadata.common.units import get_unit_object, Unit 

30from mt_metadata.timeseries import AppliedFilter 

31from mt_metadata.timeseries.filters import ChannelResponse 

32from mt_metadata.utils.exceptions import MTSchemaError 

33from mt_metadata.utils.validators import validate_name 

34 

35 

36# ===================================================== 

37 

38 

39# this is a channel base for channels that have multiple sensors and locations like an 

40# electric dipole. 

41class ChannelBase(MetadataBase): 

42 _channel_type: str = PrivateAttr("base") 

43 channel_number: Annotated[ 

44 int, 

45 Field( 

46 default=0, 

47 description="Channel number on the data logger.", 

48 alias=None, 

49 json_schema_extra={ 

50 "units": None, 

51 "required": True, 

52 "examples": ["1"], 

53 }, 

54 ), 

55 ] 

56 

57 channel_id: Annotated[ 

58 str | None, 

59 Field( 

60 default=None, 

61 description="channel id given by the user or data logger", 

62 alias=None, 

63 json_schema_extra={ 

64 "units": None, 

65 "required": False, 

66 "examples": ["1001.11"], 

67 }, 

68 ), 

69 ] 

70 

71 comments: Annotated[ 

72 Comment, 

73 Field( 

74 default_factory=Comment, 

75 description="Any comments about the channel.", 

76 alias=None, 

77 json_schema_extra={ 

78 "units": None, 

79 "required": False, 

80 "examples": ["ambient air temperature was chilly, ice on cables"], 

81 }, 

82 ), 

83 ] 

84 

85 component: Annotated[ 

86 str, 

87 Field( 

88 default="auxiliary_default", 

89 description="Name of the component measured, can be uppercase and/or lowercase. For now electric channels should start with an 'e' and magnetic channels start with an 'h', followed by the component. If there are multiples of the same channel the name could include an integer. {type}{component}{number} --> Ex01.", 

90 alias=None, 

91 pattern=r"\w+", # At least one word character 

92 json_schema_extra={ 

93 "units": None, 

94 "required": True, 

95 "examples": ["ex"], 

96 }, 

97 ), 

98 ] 

99 

100 measurement_azimuth: Annotated[ 

101 float, 

102 Field( 

103 default=0.0, 

104 description="Horizontal azimuth of the channel in measurement coordinate system spcified in station.orientation.reference_frame. Default reference frame is a geographic right-handed coordinate system with north=0, east=90, vertical=+ downward.", 

105 validation_alias=AliasChoices("measurement_azimuth", "azimuth"), 

106 json_schema_extra={ 

107 "units": "degrees", 

108 "required": True, 

109 "examples": [0.0], 

110 }, 

111 ), 

112 ] 

113 

114 measurement_tilt: Annotated[ 

115 float, 

116 Field( 

117 default=0.0, 

118 description="Vertical tilt of the channel in measurement coordinate system specified in station.orientation.reference_frame. Default reference frame is a geographic right-handed coordinate system with north=0, east=90, vertical=+ downward.", 

119 validation_alias=AliasChoices("measurement_tilt", "dip"), 

120 json_schema_extra={ 

121 "units": "degrees", 

122 "required": True, 

123 "examples": [0], 

124 }, 

125 ), 

126 ] 

127 

128 sample_rate: Annotated[ 

129 float, 

130 Field( 

131 default=0.0, 

132 description="Digital sample rate", 

133 validation_alias=AliasChoices("sample_rate", "sampling_rate"), 

134 json_schema_extra={ 

135 "units": "samples per second", 

136 "required": True, 

137 "examples": [8.0], 

138 }, 

139 ), 

140 ] 

141 

142 translated_azimuth: Annotated[ 

143 float | None, 

144 Field( 

145 default=None, 

146 description="Horizontal azimuth of the channel in translated coordinate system, this should only be used for derived product. For instance if you collected your data in geomagnetic coordinates and then translated them to geographic coordinates you would set measurement_azimuth=0, translated_azimuth=-12.5 for a declination angle of N12.5E.", 

147 alias=None, 

148 json_schema_extra={ 

149 "units": "degrees", 

150 "required": False, 

151 "examples": [0.0], 

152 }, 

153 ), 

154 ] 

155 

156 translated_tilt: Annotated[ 

157 float | None, 

158 Field( 

159 default=None, 

160 description="Tilt of channel in translated coordinate system, this should only be used for derived product. For instance if you collected your data using a tripod you would set measurement_tilt=45, translated_tilt=0 for a vertical component.", 

161 alias=None, 

162 json_schema_extra={ 

163 "units": "degrees", 

164 "required": False, 

165 "examples": [0.0], 

166 }, 

167 ), 

168 ] 

169 

170 type: Annotated[ 

171 str, 

172 Field( 

173 default="base", 

174 description="Data type for the channel, should be a descriptive word that a user can understand.", 

175 alias=None, 

176 json_schema_extra={ 

177 "units": None, 

178 "required": True, 

179 "examples": ["temperature"], 

180 }, 

181 ), 

182 ] 

183 

184 units: Annotated[ 

185 str, 

186 Field( 

187 default="", 

188 description="Units of the data, should be in SI units and represented as the full name of the unit all lowercase. If a complex unit use 'per' and '-'.", 

189 alias=None, 

190 json_schema_extra={ 

191 "units": None, 

192 "required": True, 

193 "examples": ["celsius"], 

194 }, 

195 ), 

196 ] 

197 

198 data_quality: Annotated[ 

199 DataQuality, 

200 Field( 

201 default_factory=DataQuality, 

202 description="Data quality for the channel.", 

203 alias=None, 

204 json_schema_extra={ 

205 "units": None, 

206 "required": False, 

207 "examples": ["DataQuality()"], 

208 }, 

209 ), 

210 ] 

211 

212 filters: Annotated[ 

213 list[AppliedFilter], 

214 Field( 

215 default_factory=list, 

216 description="Filter data for the channel.", 

217 alias=None, 

218 json_schema_extra={ 

219 "units": None, 

220 "required": True, 

221 "examples": [ 

222 "AppliedFilter(name='filter_name', applied=True, stage=1)" 

223 ], 

224 }, 

225 ), 

226 ] 

227 

228 time_period: Annotated[ 

229 TimePeriod, 

230 Field( 

231 default_factory=TimePeriod, 

232 description="Time period for the channel.", 

233 alias=None, 

234 json_schema_extra={ 

235 "units": None, 

236 "required": False, 

237 "examples": ["TimePeriod(start='2020-01-01', end='2020-12-31')"], 

238 }, 

239 ), 

240 ] 

241 

242 fdsn: Annotated[ 

243 Fdsn, 

244 Field( 

245 default_factory=Fdsn, 

246 description="FDSN information for the channel.", 

247 alias=None, 

248 json_schema_extra={ 

249 "units": None, 

250 "required": False, 

251 "examples": ["Fdsn()"], 

252 }, 

253 ), 

254 ] 

255 

256 @field_validator("component", mode="before") 

257 @classmethod 

258 def validate_component(cls, value: str) -> str: 

259 """make sure the value is all lower case""" 

260 if not isinstance(value, str): 

261 raise TypeError(f"Component must be a string not {type(value)}") 

262 

263 return value.lower() 

264 

265 @field_validator("comments", mode="before") 

266 @classmethod 

267 def validate_comments(cls, value, info: ValidationInfo) -> Comment: 

268 """ 

269 Validate that the value is a valid comment. 

270 """ 

271 if isinstance(value, (str, list)): 

272 return Comment(value=value) 

273 return value 

274 

275 @field_validator("units", mode="before") 

276 @classmethod 

277 def validate_units(cls, value: str, info: ValidationInfo) -> str: 

278 """ 

279 validate units base on input string will return the long name 

280 

281 Parameters 

282 ---------- 

283 value : units string 

284 unit string separated by either '/' for division or ' ' for 

285 multiplication. Or 'per' and ' ', respectively 

286 info : ValidationInfo 

287 _description_ 

288 

289 Returns 

290 ------- 

291 str 

292 return the long descriptive name of the unit. For example 'kilometers'. 

293 """ 

294 if value in [None, ""]: 

295 return "" 

296 try: 

297 unit_object = get_unit_object(value) 

298 return unit_object.name 

299 except ValueError as error: 

300 raise KeyError(error) 

301 except KeyError as error: 

302 raise KeyError(error) 

303 

304 @field_validator("type", mode="before") 

305 @classmethod 

306 def validate_type(cls, value, info: ValidationInfo) -> str: 

307 """ 

308 Validate that the type channel 

309 """ 

310 # Get the expected filter type based on the actual class 

311 # Make sure derived classes define their own _filter_type as class variable 

312 expected_type = getattr(cls, "_channel_type", "base").default 

313 

314 if value != expected_type: 

315 logger.warning( 

316 f"Channel type is set to {value}, but should be " 

317 f"{expected_type} for {cls.__name__}." 

318 ) 

319 return expected_type 

320 

321 @field_validator("filters", mode="before") 

322 @classmethod 

323 def parse_filters_string(cls, value): 

324 """Parse string representation of filters into list of AppliedFilter objects""" 

325 if isinstance(value, str): 

326 import ast 

327 import json 

328 import re 

329 from collections import OrderedDict 

330 

331 # Handle string representation of list of dicts 

332 if value.strip().startswith("[") and value.strip().endswith("]"): 

333 try: 

334 # First try json.loads for JSON-style strings (handles true/false/null) 

335 parsed_data = json.loads(value) 

336 except (ValueError, json.JSONDecodeError): 

337 try: 

338 # Fall back to ast.literal_eval for Python-style literals 

339 parsed_data = ast.literal_eval(value) 

340 except (ValueError, SyntaxError): 

341 try: 

342 # Handle OrderedDict function calls with eval and safe namespace 

343 # Clean up newlines between dictionary items by adding commas 

344 cleaned = re.sub(r"}\s+\{", "}, {", value) 

345 

346 # Use eval with a restricted namespace 

347 safe_namespace = { 

348 "__builtins__": {}, 

349 "OrderedDict": OrderedDict, 

350 "True": True, 

351 "False": False, 

352 "None": None, 

353 "true": True, # JSON-style booleans 

354 "false": False, 

355 "null": None, 

356 } 

357 

358 parsed_data = eval(cleaned, safe_namespace) 

359 except Exception as e: 

360 logger.warning(f"Failed to parse filters string: {e}") 

361 return [] 

362 

363 # Convert to AppliedFilter objects 

364 filters = [] 

365 for item in parsed_data: 

366 if isinstance(item, dict) and "applied_filter" in item: 

367 filter_data = item["applied_filter"] 

368 # Convert OrderedDict to regular dict if needed 

369 if hasattr(filter_data, "items"): 

370 filter_data = dict(filter_data) 

371 filters.append(AppliedFilter(**filter_data)) 

372 elif isinstance(item, dict): 

373 # Direct dict representation 

374 filters.append(AppliedFilter(**item)) 

375 

376 return filters 

377 

378 # Handle single filter string representations 

379 elif value.strip(): 

380 logger.warning(f"Unknown filter string format: {value}") 

381 return [] 

382 elif isinstance(value, list): 

383 # Assume list of AppliedFilter objects or dicts 

384 filters = [] 

385 for item in value: 

386 if isinstance(item, AppliedFilter): 

387 filters.append(item) 

388 elif isinstance(item, dict): 

389 filters.append(AppliedFilter(**item)) 

390 else: 

391 logger.warning(f"Unknown filter list item type: {type(item)}") 

392 return filters 

393 

394 return value 

395 

396 @field_validator("filters", mode="after") 

397 @classmethod 

398 def validate_filters(cls, value, info): 

399 """sort the filters by stage number and check for duplicates""" 

400 # Get the instance being validated 

401 instance = info.data if hasattr(info, "data") else None 

402 

403 # Sort filters by stage number, treating None as 0 

404 value.sort(key=lambda f: f.stage if f.stage is not None else 0) 

405 

406 # TEMPORARILY DISABLED: Check for duplicates 

407 # There's a known issue in MTH5 serialization that causes filter duplication 

408 # This will be re-enabled once the MTH5 duplication bug is fixed 

409 # TODO: Re-enable duplicate validation after fixing MTH5 filter duplication 

410 # seen = set() 

411 # for f in value: 

412 # if f.name in seen: 

413 # raise ValueError(f"Duplicate filter found: {f.name}") 

414 # seen.add(f.name) 

415 

416 return value 

417 

418 def add_filter( 

419 self, 

420 applied_filter: AppliedFilter | None = None, 

421 name: str | None = None, 

422 applied: bool = True, 

423 stage: int | None = None, 

424 comments: Comment | str | None = None, 

425 ) -> None: 

426 """ 

427 Add a filter to the filter list. 

428 

429 Parameters 

430 ---------- 

431 name : str 

432 Name of the filter. 

433 applied : bool, optional 

434 Whether the filter has been applied, by default True. 

435 stage : int | None, optional 

436 Stage of the filter in the processing chain, by default None. 

437 """ 

438 if applied_filter is not None: 

439 if not isinstance(applied_filter, AppliedFilter): 

440 raise TypeError("applied_filter must be an instance of AppliedFilter") 

441 

442 # Check if filter with this name already exists 

443 if any(f.name == applied_filter.name for f in self.filters): 

444 logger.debug( 

445 f"Filter '{applied_filter.name}' already exists, skipping duplicate" 

446 ) 

447 return 

448 

449 if applied_filter.stage is None: 

450 applied_filter.stage = len(self.filters) + 1 

451 self.filters.append(applied_filter) 

452 else: 

453 if name is None: 

454 raise ValueError("name must be provided if applied_filter is None") 

455 if not isinstance(name, str): 

456 raise TypeError("name must be a string") 

457 

458 # Check if filter with this name already exists 

459 if any(f.name == name for f in self.filters): 

460 logger.debug(f"Filter '{name}' already exists, skipping duplicate") 

461 return 

462 

463 if stage is None: 

464 stage = len(self.filters) + 1 

465 

466 # Build kwargs for AppliedFilter, excluding None comments to use default 

467 filter_kwargs = {"name": name, "applied": applied, "stage": stage} 

468 if comments is not None: 

469 filter_kwargs["comments"] = comments 

470 

471 self.filters.append(AppliedFilter(**filter_kwargs)) 

472 

473 # Sort filters and validate for duplicates 

474 self._sort_filters() 

475 # Note: Skipping duplicate validation since we already check above 

476 

477 @computed_field 

478 @property 

479 def filter_names(self) -> list[str]: 

480 """ 

481 List of filter names applied to the channel. 

482 

483 Returns 

484 ------- 

485 list[str] 

486 List of filter names. 

487 """ 

488 return [f.name for f in self.filters] 

489 

490 def remove_filter(self, name: str, reset_stages: bool = True) -> None: 

491 """ 

492 Remove a filter from the filter list. 

493 

494 Parameters 

495 ---------- 

496 name : str 

497 Name of the filter to remove. 

498 reset_stages : bool, optional 

499 Whether to reset the stages of the remaining filters, by default True. 

500 """ 

501 

502 new_list = [] 

503 for f in self.filters: 

504 if f.name == name: 

505 continue 

506 if reset_stages: 

507 f.stage = len(new_list) + 1 

508 new_list.append(f) 

509 self.filters = new_list 

510 

511 def get_filter(self, name: str) -> AppliedFilter | None: 

512 """ 

513 Get a filter from the filter list by name. 

514 

515 Parameters 

516 ---------- 

517 name : str 

518 Name of the filter to get. 

519 

520 Returns 

521 ------- 

522 AppliedFilter | None 

523 The filter with the given name, or None if not found. 

524 """ 

525 for f in self.filters: 

526 if f.name == name: 

527 return f 

528 logger.warning(f"Could not find filter {name} in channel filters") 

529 return None 

530 

531 def _sort_filters(self) -> None: 

532 """ 

533 Sort the list of filters applied to the channel by stage number. 

534 

535 Returns 

536 ------- 

537 None 

538 """ 

539 # Sort filters by stage number, treating None as 0 

540 self.filters.sort(key=lambda f: f.stage if f.stage is not None else 0) 

541 

542 def _validate_no_duplicates(self) -> None: 

543 """ 

544 Check for duplicate filter names and raise an error if found. 

545 

546 Returns 

547 ------- 

548 None 

549 

550 Raises 

551 ------ 

552 ValueError 

553 If duplicate filter names are found. 

554 """ 

555 seen = set() 

556 for f in self.filters: 

557 if f.name in seen: 

558 raise ValueError(f"Duplicate filter found: {f.name}") 

559 seen.add(f.name) 

560 

561 def channel_response(self, filters_dict): 

562 """ 

563 full channel response from a dictionary of filter objects 

564 """ 

565 

566 mt_filter_list = [] 

567 for applied_filter in self.filters: 

568 try: 

569 mt_filter = filters_dict[applied_filter.name] 

570 mt_filter_list.append(mt_filter) 

571 except KeyError: 

572 msg = f"Could not find {applied_filter.name} in filters dictionary, skipping" 

573 logger.error(msg) 

574 continue 

575 # compute instrument sensitivity and units in/out 

576 return ChannelResponse(filters_list=mt_filter_list) 

577 

578 @property 

579 def unit_object(self) -> Unit: 

580 """ 

581 Some channels have a unit object that is used to convert between units. 

582 This is a property that returns the unit object for the channel. 

583 The unit object is created using the units attribute of the channel. 

584 The unit object is used to convert between units and to get the unit 

585 

586 Returns 

587 ------- 

588 Unit 

589 BaseModel object with unit attributes 

590 """ 

591 return get_unit_object(self.units) 

592 

593 def _validate_filtered_applied( 

594 self, applied: list | np.typing.NDArray | str | None 

595 ) -> list: 

596 applied_values = _applied_values_map(treat_null_values_as=False) 

597 # the returned type from a hdf5 dataset is a numpy array. 

598 if isinstance(applied, np.ndarray): 

599 return applied.tolist() 

600 

601 # sets an empty list to one default value 

602 if isinstance(applied, list) and len(applied) == 0: 

603 return [] 

604 

605 # Handle string case 

606 if isinstance(applied, str): 

607 # Handle simple strings 

608 if applied in applied_values.keys(): 

609 return [ 

610 applied_values[applied], 

611 ] 

612 

613 # Handle string-lists (e.g. from json) 

614 if applied.find("[") >= 0: 

615 applied = applied.replace("[", "").replace("]", "") 

616 if applied.count(",") > 0: 

617 return [ss.strip().lower() for ss in applied.split(",")] 

618 else: 

619 return [ss.lower() for ss in applied.split()] 

620 elif isinstance(applied, list): 

621 return applied 

622 elif isinstance(applied, tuple): 

623 return list(applied) 

624 else: 

625 msg = f"Input applied cannot be of type {type(applied)}" 

626 logger.error(msg) 

627 raise MTSchemaError(msg) 

628 

629 def _validate_filtered_name( 

630 self, names: list | np.typing.NDArray | str | None 

631 ) -> list: 

632 if names is None: 

633 return [] 

634 

635 if isinstance(names, str): 

636 return [ss.strip().lower() for ss in names.split(",")] 

637 elif isinstance(names, list): 

638 return [ss.strip().lower() for ss in names] 

639 elif isinstance(names, np.ndarray): 

640 names = names.astype(np.str_) 

641 return [ss.strip().lower() for ss in names] 

642 else: 

643 msg = "names must be a string or list of strings not {0}, type {1}" 

644 logger.error(msg.format(names, type(names))) 

645 raise MTSchemaError(msg.format(names, type(names))) 

646 

647 def _find_filter_keys(self, meta_dict: dict) -> str | None: 

648 """ 

649 Search for filter-related keys in the meta_dict. 

650 

651 Parameters 

652 ---------- 

653 meta_dict : dict 

654 Dictionary to search for filter keys. 

655 

656 Returns 

657 ------- 

658 str | None 

659 Returns 'filter' if any keys have 'filter' as base (before '.'), 

660 'filtered' if any keys have 'filtered' as base (before '.'), 

661 or None if no filter-related keys are found. 

662 """ 

663 keys = list(meta_dict.keys()) 

664 

665 for key in keys: 

666 # Split by '.' and check the base key 

667 base_key = key.split(".")[0] 

668 

669 # Check for 'filter' base (legacy format) 

670 if base_key == "filter": 

671 return "filter" 

672 

673 # Check for 'filtered' base (old format) 

674 if base_key == "filtered": 

675 return "filtered" 

676 

677 return None 

678 

679 def from_dict(self, meta_dict: dict, skip_none: bool = False) -> None: 

680 """ 

681 fill attributes from a dictionary but need to make it 

682 backwards compatible with accepting filtered.applied and 

683 filtered.name as lists. 

684 

685 Parameters 

686 ---------- 

687 meta_dict : dict 

688 dictionary of attributes to set. 

689 skip_none : bool, optional 

690 If True, skip attributes with None values, by default False. 

691 

692 Raises 

693 ------- 

694 MTSchemaError 

695 If the input dictionary is not valid. 

696 

697 """ 

698 if not isinstance(meta_dict, (dict, OrderedDict)): 

699 msg = f"Input must be a dictionary not {type(meta_dict)}" 

700 logger.error(msg) 

701 raise MTSchemaError(msg) 

702 keys = list(meta_dict.keys()) 

703 if len(keys) == 1: 

704 if isinstance(meta_dict[keys[0]], (dict, OrderedDict)): 

705 class_name = keys[0] 

706 if class_name.lower() != validate_name(self.__class__.__name__): 

707 msg = ( 

708 "name of input dictionary is not the same as class type " 

709 f"input = {class_name}, class type = {self.__class__.__name__}" 

710 ) 

711 logger.debug(msg, class_name, self.__class__.__name__) 

712 meta_dict = helpers.flatten_dict(meta_dict[class_name]) 

713 else: 

714 meta_dict = helpers.flatten_dict(meta_dict) 

715 

716 else: 

717 logger.debug( 

718 f"Assuming input dictionary is of type {self.__class__.__name__}", 

719 ) 

720 meta_dict = helpers.flatten_dict(meta_dict) 

721 

722 # Use helper method to detect filter format 

723 filter_format = self._find_filter_keys(meta_dict) 

724 

725 # Handle different filter formats based on detection 

726 if filter_format == "filtered": 

727 # Handle old format filters using f-string formatting 

728 old_format_applied = meta_dict.pop(f"{filter_format}.applied", None) 

729 old_format_names = meta_dict.pop(f"{filter_format}.name", None) 

730 

731 if old_format_applied is not None and old_format_names is not None: 

732 filter_applied = self._validate_filtered_applied(old_format_applied) 

733 filter_name = self._validate_filtered_name(old_format_names) 

734 if filter_applied and filter_name: 

735 logger.warning( 

736 f"{filter_format}.applied and {filter_format}.name are deprecated, use filters as a list of AppliedFilter objects instead" 

737 ) 

738 if len(filter_applied) != len(filter_name): 

739 msg = ( 

740 f"{filter_format}.applied and {filter_format}.name must be the same length, " 

741 f"got {len(filter_applied)} and {len(filter_name)}" 

742 ) 

743 logger.error(msg) 

744 raise MTSchemaError(msg) 

745 for name, applied in zip(filter_name, filter_applied): 

746 self.add_filter(name=name, applied=applied) 

747 

748 elif filter_format == "filter": 

749 # Handle legacy single 'filter' attribute - just remove and warn 

750 legacy_filter = meta_dict.pop(filter_format, None) 

751 if legacy_filter is not None: 

752 logger.warning( 

753 f"The '{filter_format}' attribute is deprecated and will be ignored. Use 'filters' as a list of AppliedFilter objects instead." 

754 ) 

755 

756 # Handle new format filters separately to combine with old format 

757 new_format_filters = meta_dict.pop("filters", None) 

758 

759 for name, value in meta_dict.items(): 

760 if skip_none: 

761 if value in NULL_VALUES: 

762 continue 

763 self.update_attribute(name, value) 

764 

765 # Process new format filters after other attributes, adding to existing filters 

766 if new_format_filters is not None: 

767 if isinstance(new_format_filters, str): 

768 self.filters = new_format_filters 

769 else: 

770 for filter_dict in new_format_filters: 

771 if isinstance(filter_dict, dict): 

772 # Create AppliedFilter from dict using from_dict method to handle nested attributes 

773 applied_filter = AppliedFilter() 

774 applied_filter.from_dict(filter_dict) 

775 self.add_filter(applied_filter=applied_filter) 

776 elif isinstance(filter_dict, AppliedFilter): 

777 self.add_filter(applied_filter=filter_dict) 

778 elif isinstance(filter_dict, str): 

779 logger.warning( 

780 f"String filter format not supported in add_filters: {filter_dict}" 

781 ) 

782 else: 

783 logger.warning(f"Unknown filter format: {type(filter_dict)}") 

784 

785 

786# this would be a normal channel that has a single sensor and location. 

787class Channel(ChannelBase): 

788 sensor: Annotated[ 

789 Instrument, 

790 Field( 

791 default_factory=Instrument, 

792 description="Sensor for the channel.", 

793 alias=None, 

794 json_schema_extra={ 

795 "units": None, 

796 "required": False, 

797 "examples": "Instrument()", 

798 }, 

799 ), 

800 ] 

801 

802 location: Annotated[ 

803 BasicLocation, 

804 Field( 

805 default_factory=BasicLocation, 

806 description="Location information for the channel.", 

807 alias=None, 

808 json_schema_extra={ 

809 "units": None, 

810 "required": False, 

811 "examples": [ 

812 "BasicLocation(latitude=0.0, longitude=0.0, elevation=0.0)" 

813 ], 

814 }, 

815 ), 

816 ] 

817 

818 

819def _applied_values_map(treat_null_values_as: bool = True) -> dict: 

820 """ 

821 helper function to simplify logic in applied setter. 

822 

823 Notes: 

824 The logic in the setter was getting quite complicated handling many types. 

825 A reasonable solution seemed to be to map each of the allowed values to a bool 

826 via dict and then use this dict when setting applied values. 

827 

828 :return: dict 

829 Mapping of all tolerated single-values for setting applied booleans 

830 """ 

831 null_values = [None, "none", "None", "NONE", "null"] 

832 null_values_map = {x: treat_null_values_as for x in null_values} 

833 true_values = [True, 1, "1", "True", "true"] 

834 true_values_map = {x: True for x in true_values} 

835 false_values = [False, 0, "0", "False", "false"] 

836 false_values_map = {x: False for x in false_values} 

837 values_map = {**null_values_map, **true_values_map, **false_values_map} 

838 return values_map