Coverage for arrakis/channel.py: 93.9%

66 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-13 15:09 -0700

1# Copyright (c) 2022, California Institute of Technology and contributors 

2# 

3# You should have received a copy of the licensing terms for this 

4# software included in the file "LICENSE" located in the top-level 

5# directory of this package. If you did not, you can view a copy at 

6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE 

7 

8"""Channel information.""" 

9 

10from __future__ import annotations 

11 

12import json 

13from dataclasses import asdict, dataclass 

14from functools import cached_property 

15from typing import TYPE_CHECKING 

16 

17import numpy 

18 

19if TYPE_CHECKING: 

20 import pyarrow 

21 

22 

23@dataclass(frozen=True) 

24class Channel: 

25 """Metadata associated with a channel. 

26 

27 Channels have the form {domain}:*. 

28 

29 Parameters 

30 ---------- 

31 name : str 

32 The name associated with this channel. 

33 data_type : numpy.dtype 

34 The data type associated with this channel. 

35 sample_rate : float 

36 The sampling rate associated with this channel. 

37 time : int, optional 

38 The timestamp when this metadata became active. 

39 publisher : str 

40 The publisher associated with this channel. 

41 partition_id : str, optional 

42 The partition ID associated with this channel. 

43 expected_latency: int, optional 

44 Expected publication latency for this channel's data, in 

45 seconds. 

46 

47 """ 

48 

49 name: str 

50 data_type: numpy.dtype | str 

51 sample_rate: float 

52 time: int | None = None 

53 publisher: str | None = None 

54 partition_id: str | None = None 

55 expected_latency: int | None = None 

56 

57 @property 

58 def dtype(self): 

59 return self.data_type 

60 

61 def __post_init__(self) -> None: 

62 # cast to numpy dtype object, as raw types like numpy.float64 are not 

63 object.__setattr__(self, "data_type", numpy.dtype(self.data_type)) 

64 self.validate() 

65 

66 def validate(self) -> None: 

67 components = self.name.split(":") 

68 if len(components) != 2: 

69 msg = "channel is malformed, needs to be in the form {domain}:*" 

70 raise ValueError(msg) 

71 

72 def __repr__(self) -> str: 

73 return f"<{self.name}, {self.sample_rate} Hz, {self.data_type}>" 

74 

75 def __str__(self) -> str: 

76 return self.name 

77 

78 def __eq__(self, other) -> bool: 

79 # name, data type and sample rate are always required to match 

80 is_equal = ( 

81 self.name == other.name 

82 and self.dtype == other.dtype 

83 and self.sample_rate == other.sample_rate 

84 ) 

85 

86 # optional fields match only if both are defined 

87 if self.time is not None and other.time is not None: 

88 is_equal &= self.time == other.time 

89 if self.publisher and other.publisher: 

90 is_equal &= self.publisher == other.publisher 

91 if self.partition_id and other.partition_id: 

92 is_equal &= self.partition_id == other.partition_id 

93 

94 return is_equal 

95 

96 @cached_property 

97 def domain(self) -> str: 

98 """The domain associated with this channel.""" 

99 return self.name.split(":", 1)[0] 

100 

101 def to_json(self, time: int | None = None) -> str: 

102 """Serialize channel metadata to JSON. 

103 

104 Parameters 

105 ---------- 

106 time : int, optional 

107 If specified, the timestamp when this metadata became active. 

108 

109 """ 

110 # generate dict from dataclass and adjust fields 

111 # to be JSON compatible. In addition, store the 

112 # channel name, as well as updating the timestamp 

113 # if passed in. 

114 obj = asdict(self) 

115 obj["data_type"] = numpy.dtype(self.data_type).name 

116 if time is not None: 

117 obj["time"] = time 

118 obj = {k: v for k, v in obj.items() if v is not None} 

119 return json.dumps(obj) 

120 

121 @classmethod 

122 def from_json(cls, payload: str) -> Channel: 

123 """Create a Channel from its JSON representation. 

124 

125 Parameters 

126 ---------- 

127 payload : str 

128 The JSON-serialized channel. 

129 

130 Returns 

131 ------- 

132 Channel 

133 The newly created channel. 

134 

135 """ 

136 obj = json.loads(payload) 

137 obj["data_type"] = numpy.dtype(obj["data_type"]) 

138 return cls(**obj) 

139 

140 @classmethod 

141 def from_field(cls, field: pyarrow.field) -> Channel: 

142 """Create a Channel from Arrow Flight field metadata. 

143 

144 Parameters 

145 ---------- 

146 field : pyarrow.field 

147 The channel field containing relevant metadata. 

148 

149 Returns 

150 ------- 

151 Channel 

152 The newly created channel. 

153 

154 """ 

155 data_type = numpy.dtype(_list_dtype_to_str(field.type)) 

156 sample_rate = float(field.metadata[b"rate"].decode()) 

157 return cls(field.name, data_type, sample_rate) 

158 

159 

160def _list_dtype_to_str(dtype: pyarrow.ListType) -> str: 

161 """Return a string representation of the list's inner data type. 

162 

163 Note that this does not always match the string representation 

164 of Arrow's internal data types, to match the behavior across 

165 different languages. 

166 

167 Parameters 

168 ---------- 

169 dtype : pyarrow.ListType 

170 The list data type to inspect. 

171 

172 Returns 

173 ------- 

174 str 

175 A string representation of the list's inner data type. 

176 

177 """ 

178 inner_dtype = str(dtype.value_type) 

179 if inner_dtype == "float": 

180 return "float32" 

181 if inner_dtype == "double": 

182 return "float64" 

183 return inner_dtype