Coverage for arrakis/channel.py: 93.9%
66 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
1# Copyright (c) 2022, California Institute of Technology and contributors
2#
3# You should have received a copy of the licensing terms for this
4# software included in the file "LICENSE" located in the top-level
5# directory of this package. If you did not, you can view a copy at
6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE
8"""Channel information."""
10from __future__ import annotations
12import json
13from dataclasses import asdict, dataclass
14from functools import cached_property
15from typing import TYPE_CHECKING
17import numpy
19if TYPE_CHECKING:
20 import pyarrow
23@dataclass(frozen=True)
24class Channel:
25 """Metadata associated with a channel.
27 Channels have the form {domain}:*.
29 Parameters
30 ----------
31 name : str
32 The name associated with this channel.
33 data_type : numpy.dtype
34 The data type associated with this channel.
35 sample_rate : float
36 The sampling rate associated with this channel.
37 time : int, optional
38 The timestamp when this metadata became active.
39 publisher : str
40 The publisher associated with this channel.
41 partition_id : str, optional
42 The partition ID associated with this channel.
43 expected_latency: int, optional
44 Expected publication latency for this channel's data, in
45 seconds.
47 """
49 name: str
50 data_type: numpy.dtype | str
51 sample_rate: float
52 time: int | None = None
53 publisher: str | None = None
54 partition_id: str | None = None
55 expected_latency: int | None = None
57 @property
58 def dtype(self):
59 return self.data_type
61 def __post_init__(self) -> None:
62 # cast to numpy dtype object, as raw types like numpy.float64 are not
63 object.__setattr__(self, "data_type", numpy.dtype(self.data_type))
64 self.validate()
66 def validate(self) -> None:
67 components = self.name.split(":")
68 if len(components) != 2:
69 msg = "channel is malformed, needs to be in the form {domain}:*"
70 raise ValueError(msg)
72 def __repr__(self) -> str:
73 return f"<{self.name}, {self.sample_rate} Hz, {self.data_type}>"
75 def __str__(self) -> str:
76 return self.name
78 def __eq__(self, other) -> bool:
79 # name, data type and sample rate are always required to match
80 is_equal = (
81 self.name == other.name
82 and self.dtype == other.dtype
83 and self.sample_rate == other.sample_rate
84 )
86 # optional fields match only if both are defined
87 if self.time is not None and other.time is not None:
88 is_equal &= self.time == other.time
89 if self.publisher and other.publisher:
90 is_equal &= self.publisher == other.publisher
91 if self.partition_id and other.partition_id:
92 is_equal &= self.partition_id == other.partition_id
94 return is_equal
96 @cached_property
97 def domain(self) -> str:
98 """The domain associated with this channel."""
99 return self.name.split(":", 1)[0]
101 def to_json(self, time: int | None = None) -> str:
102 """Serialize channel metadata to JSON.
104 Parameters
105 ----------
106 time : int, optional
107 If specified, the timestamp when this metadata became active.
109 """
110 # generate dict from dataclass and adjust fields
111 # to be JSON compatible. In addition, store the
112 # channel name, as well as updating the timestamp
113 # if passed in.
114 obj = asdict(self)
115 obj["data_type"] = numpy.dtype(self.data_type).name
116 if time is not None:
117 obj["time"] = time
118 obj = {k: v for k, v in obj.items() if v is not None}
119 return json.dumps(obj)
121 @classmethod
122 def from_json(cls, payload: str) -> Channel:
123 """Create a Channel from its JSON representation.
125 Parameters
126 ----------
127 payload : str
128 The JSON-serialized channel.
130 Returns
131 -------
132 Channel
133 The newly created channel.
135 """
136 obj = json.loads(payload)
137 obj["data_type"] = numpy.dtype(obj["data_type"])
138 return cls(**obj)
140 @classmethod
141 def from_field(cls, field: pyarrow.field) -> Channel:
142 """Create a Channel from Arrow Flight field metadata.
144 Parameters
145 ----------
146 field : pyarrow.field
147 The channel field containing relevant metadata.
149 Returns
150 -------
151 Channel
152 The newly created channel.
154 """
155 data_type = numpy.dtype(_list_dtype_to_str(field.type))
156 sample_rate = float(field.metadata[b"rate"].decode())
157 return cls(field.name, data_type, sample_rate)
160def _list_dtype_to_str(dtype: pyarrow.ListType) -> str:
161 """Return a string representation of the list's inner data type.
163 Note that this does not always match the string representation
164 of Arrow's internal data types, to match the behavior across
165 different languages.
167 Parameters
168 ----------
169 dtype : pyarrow.ListType
170 The list data type to inspect.
172 Returns
173 -------
174 str
175 A string representation of the list's inner data type.
177 """
178 inner_dtype = str(dtype.value_type)
179 if inner_dtype == "float":
180 return "float32"
181 if inner_dtype == "double":
182 return "float64"
183 return inner_dtype