Coverage for arrakis/block.py: 87.2%

226 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-13 15:09 -0700

1# Copyright (c) 2022, California Institute of Technology and contributors 

2# 

3# You should have received a copy of the licensing terms for this 

4# software included in the file "LICENSE" located in the top-level 

5# directory of this package. If you did not, you can view a copy at 

6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE 

7 

8"""Series block representation of timeseries data.""" 

9 

10from __future__ import annotations 

11 

12from collections import defaultdict 

13from dataclasses import dataclass, field 

14from enum import Enum 

15from functools import cached_property 

16from typing import TYPE_CHECKING, Generic, TypeVar 

17 

18import numpy 

19import pyarrow 

20import pyarrow.compute 

21 

22from .channel import Channel 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Generator, Iterable, Iterator, KeysView 

26 from numbers import Real 

27 

28 

29ChannelLike = TypeVar("ChannelLike", bound=Channel) 

30 

31 

32class Time(int, Enum): 

33 SECONDS = 1_000_000_000 

34 MILLISECONDS = 1_000_000 

35 MICROSECONDS = 1_000 

36 NANOSECONDS = 1 

37 s = 1_000_000_000 

38 ms = 1_000_000 

39 us = 1_000 

40 ns = 1 

41 

42 

43class Freq(Enum): 

44 GHz = 1_000_000_000 

45 MHz = 1_000_000 

46 kHz = 1_000 # noqa: N815 

47 Hz = 1 

48 

49 def __rmul__(self, other: Real) -> int: # type: ignore 

50 return int((self.value / other) * Time.s) # type: ignore 

51 

52 

53def time_as_ns(time: float) -> int: 

54 """Convert a timestamp from seconds to nanoseconds. 

55 

56 Parameters 

57 ---------- 

58 time : float 

59 The timestamp to convert, in seconds. 

60 

61 Returns 

62 ------- 

63 int 

64 The converted timestamp, in nanoseconds. 

65 

66 """ 

67 seconds = int(time) * Time.s 

68 nanoseconds = int((time % 1)) * Time.s 

69 return seconds + nanoseconds 

70 

71 

72@dataclass(frozen=True) 

73class Series(Generic[ChannelLike]): 

74 """Single-channel timeseries data for a given timestamp. 

75 

76 Parameters 

77 ---------- 

78 time_ns : int 

79 The timestamp associated with this data, in nanoseconds. 

80 data : numpy.ndarray 

81 The timeseries data. 

82 channel : Channel 

83 Channel metadata associated with this timeseries. 

84 

85 """ 

86 

87 time_ns: int 

88 data: numpy.ndarray | numpy.ma.MaskedArray 

89 channel: ChannelLike 

90 

91 @cached_property 

92 def time(self) -> float: 

93 """Timestamp associated with this data, in seconds.""" 

94 return self.time_ns / Time.s 

95 

96 @property 

97 def t0(self) -> float: 

98 """Timestamp associated with this data, in seconds.""" 

99 return self.time 

100 

101 @cached_property 

102 def duration(self) -> float: 

103 """Series duration in seconds.""" 

104 return self.duration_ns / Time.s 

105 

106 @cached_property 

107 def duration_ns(self) -> int: 

108 """Series duration in nanoseconds.""" 

109 return int((len(self) * Time.s) / self.sample_rate) 

110 

111 @property 

112 def dt(self) -> float: 

113 """The time separation in seconds between successive samples.""" 

114 return 1 / self.sample_rate 

115 

116 @property 

117 def name(self) -> str: 

118 """Channel name.""" 

119 return str(self.channel) 

120 

121 @property 

122 def data_type(self) -> numpy.dtype: 

123 """Data type of the data array's elements.""" 

124 return self.data.dtype 

125 

126 @property 

127 def dtype(self) -> numpy.dtype: 

128 """Data type of the data array's elements.""" 

129 return self.data.dtype 

130 

131 @property 

132 def sample_rate(self) -> float: 

133 """Data rate for this series in samples per second (Hz).""" 

134 return self.channel.sample_rate 

135 

136 @cached_property 

137 def times(self) -> numpy.ndarray: 

138 """The array of times corresponding to all data points in the series.""" 

139 return numpy.arange(len(self)) * self.dt + self.time 

140 

141 @property 

142 def has_nulls(self): 

143 """Whether the timeseries data contains any null values.""" 

144 return numpy.ma.is_masked(self.data) 

145 

146 def __len__(self) -> int: 

147 return len(self.data) 

148 

149 

150@dataclass(frozen=True) 

151class SeriesBlock(Generic[ChannelLike]): 

152 """Series block containing timeseries for channels for a given timestamp. 

153 

154 Parameters 

155 ---------- 

156 time_ns : int 

157 The timestamp associated with this data, in nanoseconds. 

158 data : dict[str, numpy.ndarray] 

159 Mapping between channels and timeseries. 

160 channels : dict[str, Channel] 

161 Channel metadata associated with this data block. 

162 

163 """ 

164 

165 time_ns: int 

166 data: dict[str, numpy.ndarray] | dict[str, numpy.ma.MaskedArray] 

167 channels: dict[str, ChannelLike] = field(default_factory=dict) 

168 _duration_ns: int = field(init=False, default=0) 

169 

170 def __post_init__(self): 

171 # various validation checks 

172 # 

173 # check that the channel lists are consistent 

174 assert set(self.data) == set(self.channels), ( 

175 "data and channels dicts have different keys" 

176 ) 

177 # check that the duration of all Series are consistent 

178 for channel, data in self.data.items(): 

179 duration_ns = int((len(data) * Time.s) / self.channels[channel].sample_rate) 

180 if self._duration_ns == 0: 

181 # NOTE: this is a hacky way to set an attribute of a 

182 # frozen dataclass 

183 object.__setattr__(self, "_duration_ns", duration_ns) 

184 assert duration_ns == self._duration_ns, "Series durations do not agree" 

185 

186 @cached_property 

187 def time(self) -> float: 

188 """Timestamp associated with this block, in seconds.""" 

189 return self.time_ns / Time.s 

190 

191 @property 

192 def t0(self) -> float: 

193 """Timestamp associated with this block, in seconds.""" 

194 return self.time 

195 

196 @cached_property 

197 def duration(self) -> float: 

198 """Duration of this block, in seconds.""" 

199 return self._duration_ns / Time.s 

200 

201 @property 

202 def duration_ns(self) -> int: 

203 """Duration of this block, in nanoseconds.""" 

204 return self._duration_ns 

205 

206 def __getitem__(self, channel: str) -> Series: 

207 return Series(self.time_ns, self.data[channel], self.channels[channel]) 

208 

209 def __len__(self) -> int: 

210 return len(self.data) 

211 

212 def keys(self) -> KeysView[str]: 

213 return self.data.keys() 

214 

215 def items(self) -> Generator[tuple[str, Series], None, None]: 

216 for channel in self.keys(): 

217 yield (channel, self[channel]) 

218 

219 def values(self) -> list[Series]: 

220 return [self[channel] for channel in self.keys()] 

221 

222 def filter(self, channels: list[str] | None = None) -> SeriesBlock: 

223 """Filter a block based on criteria. 

224 

225 FIXME: more info needed 

226 

227 Parameters 

228 ---------- 

229 channels : list[str], optional 

230 If specified, keep only these channels. 

231 

232 Returns 

233 ------- 

234 SeriesBlock 

235 The filtered series. 

236 

237 """ 

238 if not channels: 

239 return self 

240 

241 data = {channel: self.data[channel] for channel in channels} 

242 if self.channels: 

243 channel_dict = {channel: self.channels[channel] for channel in channels} 

244 else: 

245 channel_dict = self.channels 

246 

247 return type(self)(self.time_ns, data, channel_dict) 

248 

249 def create_gaps(self, channels: Iterable[ChannelLike]) -> SeriesBlock: 

250 """Add channels with all null values (gaps). 

251 

252 Parameters 

253 ---------- 

254 channels : Iterable[Channel] 

255 The channels to create gaps for. Any channels currently present 

256 will be ignored. 

257 

258 Returns 

259 ------- 

260 SeriesBlock 

261 The block with additional gaps present. 

262 

263 """ 

264 series_dict = self.data 

265 channel_dict = self.channels 

266 for channel in channels: 

267 if channel in channel_dict: 

268 continue 

269 size = int(channel.sample_rate * self.duration_ns) // Time.s 

270 series_dict[channel.name] = numpy.ma.masked_all(size, dtype=channel.dtype) 

271 channel_dict[channel.name] = channel 

272 

273 return type(self)(self.time_ns, series_dict, channel_dict) 

274 

275 def to_column_batch( 

276 self, schema: pyarrow.Schema | None = None 

277 ) -> pyarrow.RecordBatch: 

278 """Create a row-based record batch from a series block. 

279 

280 Parameters 

281 ---------- 

282 schema : pyarrow.Schema, optional 

283 Pre-defined schema to use for the record batch. If provided, the schema 

284 must be compatible with the block's data (matching channel names and types). 

285 If not provided, a schema will be generated from the block's data. 

286 

287 Returns 

288 ------- 

289 pyarrow.RecordBatch 

290 A record batch, with a 'time' column with the timestamp 

291 and channel columns with all channels to publish. 

292 

293 Raises 

294 ------ 

295 pyarrow.lib.ArrowInvalid 

296 If the provided schema is incompatible with the block's data, such as 

297 missing channels, mismatched channel names, or incompatible data types. 

298 

299 """ 

300 if schema is None: 

301 schema = self._generate_column_schema() 

302 channels = [field.name for field in schema][1:] 

303 

304 return pyarrow.RecordBatch.from_arrays( 

305 [ 

306 pyarrow.array([self.time_ns], type=schema.field("time").type), 

307 *[ 

308 _numpy_to_arrow_list_array( 

309 self.data[channel], schema.field(channel).type 

310 ) 

311 for channel in channels 

312 ], 

313 ], 

314 schema=schema, 

315 ) 

316 

317 def to_row_batches(self, partitions: dict) -> Iterator[pyarrow.RecordBatch]: 

318 """Create column-based record batches from a series block. 

319 

320 Yields 

321 ------- 

322 pyarrow.RecordBatch 

323 Record batches, one per data type. The record batches have a 

324 'time' column with the timestamp, a 'channel' column with 

325 the channel name, and a 'data' column containing the timeseries. 

326 

327 """ 

328 # group channels by partitions 

329 channels_by_part = defaultdict(list) 

330 for channel in self.keys(): 

331 if channel in partitions: 

332 partition = partitions[channel] 

333 channels_by_part[partition].append(channel) 

334 

335 # generate column-based record batches 

336 for partition_id, channels in channels_by_part.items(): 

337 # all channels have the same data type 

338 dtype = self.channels[channels[0]].data_type 

339 schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype)) 

340 series: list[numpy.ndarray] = [ 

341 pyarrow.array(self.data[channel]) for channel in channels 

342 ] 

343 yield ( 

344 partition_id, 

345 pyarrow.RecordBatch.from_arrays( 

346 [ 

347 pyarrow.array( 

348 numpy.full(len(channels), self.time_ns), 

349 type=schema.field("time").type, 

350 ), 

351 pyarrow.array(channels, type=schema.field("channel").type), 

352 pyarrow.array(series, type=schema.field("data").type), 

353 ], 

354 schema=schema, 

355 ), 

356 ) 

357 

358 @classmethod 

359 def from_column_batch( 

360 cls, 

361 batch: pyarrow.RecordBatch, 

362 channels: dict[str, ChannelLike], 

363 ) -> SeriesBlock: 

364 """Create a series block from a record batch. 

365 

366 Parameters 

367 ---------- 

368 batch : pyarrow.RecordBatch 

369 A record batch, with a 'time' column with the timestamp 

370 and channel columns with all channels to publish. 

371 channels : dict[str, Channel] 

372 Channel metadata. The metadata for the channels defined 

373 in the batch will be extracted from this dictionary, so 

374 this dictionary may include metadata for additional 

375 channels now included in the batch. 

376 

377 Returns 

378 ------- 

379 SeriesBlock 

380 The block representation of the record batch. 

381 

382 """ 

383 time = batch.column("time")[0].as_py() 

384 fields: list[pyarrow.field] = list(batch.schema) 

385 channel_names = [field.name for field in fields[1:]] 

386 series_dict = { 

387 channel: _arrow_to_numpy_array( 

388 pyarrow.compute.list_flatten(batch.column(channel)) 

389 ) 

390 for channel in channel_names 

391 } 

392 channel_dict = {channel: channels[channel] for channel in channel_names} 

393 return cls(time, series_dict, channel_dict) 

394 

395 @classmethod 

396 def from_row_batch( 

397 cls, 

398 batch: pyarrow.RecordBatch, 

399 channels: dict[str, ChannelLike], 

400 ) -> SeriesBlock: 

401 """Create a series block from a record batch. 

402 

403 Parameters 

404 ---------- 

405 batch : pyarrow.RecordBatch 

406 A record batch, with a 'time' column with the timestamp, a 

407 'channel' column with the channel name, and a 'data' column 

408 containing the timeseries. 

409 channels : dict[str, Channel] 

410 Channel metadata. The metadata for the channels defined 

411 in the batch will be extracted from this dictionary, so 

412 this dictionary may include metadata for additional 

413 channels now included in the batch. 

414 

415 Returns 

416 ------- 

417 SeriesBlock 

418 The block representation of the record batch. 

419 

420 """ 

421 time = batch.column("time")[0].as_py() 

422 channel_names = batch.column("channel").to_pylist() 

423 data = batch.column("data") 

424 series_dict = {} 

425 channel_dict = {} 

426 for idx, channel in enumerate(channel_names): 

427 series_dict[channel] = _arrow_to_numpy_array(data[idx].values) 

428 channel_dict[channel] = channels[channel] 

429 return cls(time, series_dict, channel_dict) 

430 

431 def _generate_column_schema(self) -> pyarrow.Schema: 

432 fields = [pyarrow.field("time", pyarrow.int64())] 

433 for channel, arr in self.data.items(): 

434 dtype = pyarrow.from_numpy_dtype(arr.dtype) 

435 fields.append(pyarrow.field(channel, pyarrow.list_(dtype))) 

436 return pyarrow.schema(fields) 

437 

438 def _generate_row_schema(self, dtype: pyarrow.DataType) -> pyarrow.Schema: 

439 return pyarrow.schema( 

440 [ 

441 pyarrow.field("time", pyarrow.int64()), 

442 pyarrow.field("channel", pyarrow.string()), 

443 pyarrow.field("data", pyarrow.list_(dtype)), 

444 ] 

445 ) 

446 

447 

448# backwards compatibility with previous name 

449DataBlock = SeriesBlock 

450 

451 

452def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock: 

453 """Join a sequence of timeseries blocks into a single block. 

454 

455 If the SeriesBlock arguments are not sequential in time an 

456 AssertionError will be thrown. 

457 

458 Parameters 

459 ---------- 

460 *blocks : SeriesBlock 

461 The timeseries blocks to concatenate. 

462 

463 Returns 

464 ------- 

465 SeriesBlock 

466 The combined timeseries block. 

467 

468 """ 

469 channel_dict = blocks[0].channels 

470 channel_set = set(channel_dict) 

471 start_time_ns = end_time_ns = blocks[0].time_ns 

472 duration_ns = 0 

473 for block in blocks: 

474 assert set(block.data.keys()) == channel_set, ( 

475 "all blocks must contain the same channel sets" 

476 ) 

477 assert block.time_ns == end_time_ns, ( 

478 f"block start time ({block.time_ns}) does not match " 

479 f"concatenated block end time ({end_time_ns})" 

480 ) 

481 duration_ns += block.duration_ns 

482 end_time_ns += block.duration_ns 

483 series_dict: dict[str, numpy.ndarray] = {} 

484 for channel in channel_set: 

485 series_dict[str(channel)] = numpy.concatenate( 

486 [block[str(channel)].data for block in blocks] 

487 ) 

488 return SeriesBlock(start_time_ns, series_dict, channel_dict) 

489 

490 

491def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock: 

492 """Combine multiple SeriesBlocks from the same time into a single SeriesBlock 

493 

494 Each block must contain a distinct set of channels, and the time 

495 properties of each block must agree, otherwise an AssertionError 

496 will be thrown. 

497 

498 Parameters 

499 ---------- 

500 *blocks : SeriesBlock 

501 The blocks to combine. 

502 

503 Returns 

504 ------- 

505 SeriesBlock 

506 The combined block. 

507 

508 """ 

509 time_ns = blocks[0].time_ns 

510 duration_ns = blocks[0].duration_ns 

511 series_dict: dict[str, numpy.ndarray] = {} 

512 channel_dict: dict[str, Channel] = {} 

513 for block in blocks: 

514 assert block.time_ns == time_ns, "all block times must agree" 

515 assert block.duration_ns == duration_ns, "all block durations must agree" 

516 for channel, series in block.items(): 

517 assert channel not in series_dict, ( 

518 f"channel {channel} has already been included from another block" 

519 ) 

520 series_dict[channel] = series.data 

521 channel_dict[channel] = series.channel 

522 return SeriesBlock(time_ns, series_dict, channel_dict) 

523 

524 

525def _numpy_to_arrow_list_array( 

526 data: numpy.ndarray | numpy.ma.MaskedArray, field_type: pyarrow.DataType 

527) -> pyarrow.Array: 

528 """Convert numpy array to arrow ListArray. 

529 

530 This function provides significant performance improvements for masked arrays 

531 by avoiding double conversion. Benchmarks show 25-159x speedup over the 

532 standard approach of `pyarrow.array([pyarrow.array(data)], type=field_type)`. 

533 

534 Parameters 

535 ---------- 

536 data : numpy.ndarray or numpy.ma.MaskedArray 

537 Input array data to convert. Masked arrays are handled efficiently 

538 with direct Arrow buffer construction. 

539 field_type : pyarrow.DataType 

540 PyArrow list type for the output array. 

541 

542 Returns 

543 ------- 

544 pyarrow.Array 

545 ListArray containing the data as a single-element list, preserving 

546 mask information for masked arrays as Arrow nulls. 

547 

548 Notes 

549 ----- 

550 For masked arrays, this function uses direct ListArray construction from 

551 components instead of double conversion, which eliminates the performance 

552 bottleneck that can cause death spirals under high load conditions. 

553 

554 """ 

555 if isinstance(data, numpy.ma.MaskedArray): 

556 inner_type = field_type.value_type 

557 inner_array = pyarrow.array(data.data, mask=data.mask, type=inner_type) 

558 offsets = pyarrow.array([0, len(data)], type=pyarrow.int32()) 

559 return pyarrow.ListArray.from_arrays(offsets, inner_array, type=field_type) 

560 return pyarrow.array([data], type=field_type) 

561 

562 

563def _arrow_to_numpy_array(arrow_array: pyarrow.Array) -> numpy.ndarray: 

564 """Convert an Arrow array to a numpy ndarray.""" 

565 # no null values 

566 if arrow_array.null_count == 0: 

567 return arrow_array.to_numpy() 

568 

569 bitmap_buffer, data_buffer = arrow_array.buffers() 

570 offset = arrow_array.offset 

571 

572 # compute mask from arrow bitmap 

573 # see https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps 

574 bitmap = numpy.frombuffer(bitmap_buffer, numpy.uint8, len(bitmap_buffer)) 

575 length = len(arrow_array) + offset 

576 mask = numpy.unpackbits(bitmap, bitorder="little")[:length] 

577 if offset > 0: 

578 mask = mask[offset:] 

579 mask = 1 - mask 

580 

581 # create masked array 

582 dtype = _arrow_to_numpy_dtype(arrow_array.type) 

583 data_array = numpy.frombuffer(data_buffer, dtype, length)[offset:] 

584 array = numpy.ma.array(data_array, mask=mask) 

585 assert len(array) == len(arrow_array) 

586 return array 

587 

588 

589def _arrow_to_numpy_dtype(dtype: pyarrow.DataType) -> numpy.dtype: 

590 """Return the numpy dtype equivalent to its Arrow dtype.""" 

591 arrow_dtype = str(dtype) 

592 if arrow_dtype == "float": 

593 return numpy.dtype("float32") 

594 if arrow_dtype == "double": 

595 return numpy.dtype("float64") 

596 return numpy.dtype(arrow_dtype)