Coverage for arrakis/block.py: 87.2%
226 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
1# Copyright (c) 2022, California Institute of Technology and contributors
2#
3# You should have received a copy of the licensing terms for this
4# software included in the file "LICENSE" located in the top-level
5# directory of this package. If you did not, you can view a copy at
6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE
8"""Series block representation of timeseries data."""
10from __future__ import annotations
12from collections import defaultdict
13from dataclasses import dataclass, field
14from enum import Enum
15from functools import cached_property
16from typing import TYPE_CHECKING, Generic, TypeVar
18import numpy
19import pyarrow
20import pyarrow.compute
22from .channel import Channel
24if TYPE_CHECKING:
25 from collections.abc import Generator, Iterable, Iterator, KeysView
26 from numbers import Real
29ChannelLike = TypeVar("ChannelLike", bound=Channel)
32class Time(int, Enum):
33 SECONDS = 1_000_000_000
34 MILLISECONDS = 1_000_000
35 MICROSECONDS = 1_000
36 NANOSECONDS = 1
37 s = 1_000_000_000
38 ms = 1_000_000
39 us = 1_000
40 ns = 1
43class Freq(Enum):
44 GHz = 1_000_000_000
45 MHz = 1_000_000
46 kHz = 1_000 # noqa: N815
47 Hz = 1
49 def __rmul__(self, other: Real) -> int: # type: ignore
50 return int((self.value / other) * Time.s) # type: ignore
53def time_as_ns(time: float) -> int:
54 """Convert a timestamp from seconds to nanoseconds.
56 Parameters
57 ----------
58 time : float
59 The timestamp to convert, in seconds.
61 Returns
62 -------
63 int
64 The converted timestamp, in nanoseconds.
66 """
67 seconds = int(time) * Time.s
68 nanoseconds = int((time % 1)) * Time.s
69 return seconds + nanoseconds
72@dataclass(frozen=True)
73class Series(Generic[ChannelLike]):
74 """Single-channel timeseries data for a given timestamp.
76 Parameters
77 ----------
78 time_ns : int
79 The timestamp associated with this data, in nanoseconds.
80 data : numpy.ndarray
81 The timeseries data.
82 channel : Channel
83 Channel metadata associated with this timeseries.
85 """
87 time_ns: int
88 data: numpy.ndarray | numpy.ma.MaskedArray
89 channel: ChannelLike
91 @cached_property
92 def time(self) -> float:
93 """Timestamp associated with this data, in seconds."""
94 return self.time_ns / Time.s
96 @property
97 def t0(self) -> float:
98 """Timestamp associated with this data, in seconds."""
99 return self.time
101 @cached_property
102 def duration(self) -> float:
103 """Series duration in seconds."""
104 return self.duration_ns / Time.s
106 @cached_property
107 def duration_ns(self) -> int:
108 """Series duration in nanoseconds."""
109 return int((len(self) * Time.s) / self.sample_rate)
111 @property
112 def dt(self) -> float:
113 """The time separation in seconds between successive samples."""
114 return 1 / self.sample_rate
116 @property
117 def name(self) -> str:
118 """Channel name."""
119 return str(self.channel)
121 @property
122 def data_type(self) -> numpy.dtype:
123 """Data type of the data array's elements."""
124 return self.data.dtype
126 @property
127 def dtype(self) -> numpy.dtype:
128 """Data type of the data array's elements."""
129 return self.data.dtype
131 @property
132 def sample_rate(self) -> float:
133 """Data rate for this series in samples per second (Hz)."""
134 return self.channel.sample_rate
136 @cached_property
137 def times(self) -> numpy.ndarray:
138 """The array of times corresponding to all data points in the series."""
139 return numpy.arange(len(self)) * self.dt + self.time
141 @property
142 def has_nulls(self):
143 """Whether the timeseries data contains any null values."""
144 return numpy.ma.is_masked(self.data)
146 def __len__(self) -> int:
147 return len(self.data)
150@dataclass(frozen=True)
151class SeriesBlock(Generic[ChannelLike]):
152 """Series block containing timeseries for channels for a given timestamp.
154 Parameters
155 ----------
156 time_ns : int
157 The timestamp associated with this data, in nanoseconds.
158 data : dict[str, numpy.ndarray]
159 Mapping between channels and timeseries.
160 channels : dict[str, Channel]
161 Channel metadata associated with this data block.
163 """
165 time_ns: int
166 data: dict[str, numpy.ndarray] | dict[str, numpy.ma.MaskedArray]
167 channels: dict[str, ChannelLike] = field(default_factory=dict)
168 _duration_ns: int = field(init=False, default=0)
170 def __post_init__(self):
171 # various validation checks
172 #
173 # check that the channel lists are consistent
174 assert set(self.data) == set(self.channels), (
175 "data and channels dicts have different keys"
176 )
177 # check that the duration of all Series are consistent
178 for channel, data in self.data.items():
179 duration_ns = int((len(data) * Time.s) / self.channels[channel].sample_rate)
180 if self._duration_ns == 0:
181 # NOTE: this is a hacky way to set an attribute of a
182 # frozen dataclass
183 object.__setattr__(self, "_duration_ns", duration_ns)
184 assert duration_ns == self._duration_ns, "Series durations do not agree"
186 @cached_property
187 def time(self) -> float:
188 """Timestamp associated with this block, in seconds."""
189 return self.time_ns / Time.s
191 @property
192 def t0(self) -> float:
193 """Timestamp associated with this block, in seconds."""
194 return self.time
196 @cached_property
197 def duration(self) -> float:
198 """Duration of this block, in seconds."""
199 return self._duration_ns / Time.s
201 @property
202 def duration_ns(self) -> int:
203 """Duration of this block, in nanoseconds."""
204 return self._duration_ns
206 def __getitem__(self, channel: str) -> Series:
207 return Series(self.time_ns, self.data[channel], self.channels[channel])
209 def __len__(self) -> int:
210 return len(self.data)
212 def keys(self) -> KeysView[str]:
213 return self.data.keys()
215 def items(self) -> Generator[tuple[str, Series], None, None]:
216 for channel in self.keys():
217 yield (channel, self[channel])
219 def values(self) -> list[Series]:
220 return [self[channel] for channel in self.keys()]
222 def filter(self, channels: list[str] | None = None) -> SeriesBlock:
223 """Filter a block based on criteria.
225 FIXME: more info needed
227 Parameters
228 ----------
229 channels : list[str], optional
230 If specified, keep only these channels.
232 Returns
233 -------
234 SeriesBlock
235 The filtered series.
237 """
238 if not channels:
239 return self
241 data = {channel: self.data[channel] for channel in channels}
242 if self.channels:
243 channel_dict = {channel: self.channels[channel] for channel in channels}
244 else:
245 channel_dict = self.channels
247 return type(self)(self.time_ns, data, channel_dict)
249 def create_gaps(self, channels: Iterable[ChannelLike]) -> SeriesBlock:
250 """Add channels with all null values (gaps).
252 Parameters
253 ----------
254 channels : Iterable[Channel]
255 The channels to create gaps for. Any channels currently present
256 will be ignored.
258 Returns
259 -------
260 SeriesBlock
261 The block with additional gaps present.
263 """
264 series_dict = self.data
265 channel_dict = self.channels
266 for channel in channels:
267 if channel in channel_dict:
268 continue
269 size = int(channel.sample_rate * self.duration_ns) // Time.s
270 series_dict[channel.name] = numpy.ma.masked_all(size, dtype=channel.dtype)
271 channel_dict[channel.name] = channel
273 return type(self)(self.time_ns, series_dict, channel_dict)
275 def to_column_batch(
276 self, schema: pyarrow.Schema | None = None
277 ) -> pyarrow.RecordBatch:
278 """Create a row-based record batch from a series block.
280 Parameters
281 ----------
282 schema : pyarrow.Schema, optional
283 Pre-defined schema to use for the record batch. If provided, the schema
284 must be compatible with the block's data (matching channel names and types).
285 If not provided, a schema will be generated from the block's data.
287 Returns
288 -------
289 pyarrow.RecordBatch
290 A record batch, with a 'time' column with the timestamp
291 and channel columns with all channels to publish.
293 Raises
294 ------
295 pyarrow.lib.ArrowInvalid
296 If the provided schema is incompatible with the block's data, such as
297 missing channels, mismatched channel names, or incompatible data types.
299 """
300 if schema is None:
301 schema = self._generate_column_schema()
302 channels = [field.name for field in schema][1:]
304 return pyarrow.RecordBatch.from_arrays(
305 [
306 pyarrow.array([self.time_ns], type=schema.field("time").type),
307 *[
308 _numpy_to_arrow_list_array(
309 self.data[channel], schema.field(channel).type
310 )
311 for channel in channels
312 ],
313 ],
314 schema=schema,
315 )
317 def to_row_batches(self, partitions: dict) -> Iterator[pyarrow.RecordBatch]:
318 """Create column-based record batches from a series block.
320 Yields
321 -------
322 pyarrow.RecordBatch
323 Record batches, one per data type. The record batches have a
324 'time' column with the timestamp, a 'channel' column with
325 the channel name, and a 'data' column containing the timeseries.
327 """
328 # group channels by partitions
329 channels_by_part = defaultdict(list)
330 for channel in self.keys():
331 if channel in partitions:
332 partition = partitions[channel]
333 channels_by_part[partition].append(channel)
335 # generate column-based record batches
336 for partition_id, channels in channels_by_part.items():
337 # all channels have the same data type
338 dtype = self.channels[channels[0]].data_type
339 schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype))
340 series: list[numpy.ndarray] = [
341 pyarrow.array(self.data[channel]) for channel in channels
342 ]
343 yield (
344 partition_id,
345 pyarrow.RecordBatch.from_arrays(
346 [
347 pyarrow.array(
348 numpy.full(len(channels), self.time_ns),
349 type=schema.field("time").type,
350 ),
351 pyarrow.array(channels, type=schema.field("channel").type),
352 pyarrow.array(series, type=schema.field("data").type),
353 ],
354 schema=schema,
355 ),
356 )
358 @classmethod
359 def from_column_batch(
360 cls,
361 batch: pyarrow.RecordBatch,
362 channels: dict[str, ChannelLike],
363 ) -> SeriesBlock:
364 """Create a series block from a record batch.
366 Parameters
367 ----------
368 batch : pyarrow.RecordBatch
369 A record batch, with a 'time' column with the timestamp
370 and channel columns with all channels to publish.
371 channels : dict[str, Channel]
372 Channel metadata. The metadata for the channels defined
373 in the batch will be extracted from this dictionary, so
374 this dictionary may include metadata for additional
375 channels now included in the batch.
377 Returns
378 -------
379 SeriesBlock
380 The block representation of the record batch.
382 """
383 time = batch.column("time")[0].as_py()
384 fields: list[pyarrow.field] = list(batch.schema)
385 channel_names = [field.name for field in fields[1:]]
386 series_dict = {
387 channel: _arrow_to_numpy_array(
388 pyarrow.compute.list_flatten(batch.column(channel))
389 )
390 for channel in channel_names
391 }
392 channel_dict = {channel: channels[channel] for channel in channel_names}
393 return cls(time, series_dict, channel_dict)
395 @classmethod
396 def from_row_batch(
397 cls,
398 batch: pyarrow.RecordBatch,
399 channels: dict[str, ChannelLike],
400 ) -> SeriesBlock:
401 """Create a series block from a record batch.
403 Parameters
404 ----------
405 batch : pyarrow.RecordBatch
406 A record batch, with a 'time' column with the timestamp, a
407 'channel' column with the channel name, and a 'data' column
408 containing the timeseries.
409 channels : dict[str, Channel]
410 Channel metadata. The metadata for the channels defined
411 in the batch will be extracted from this dictionary, so
412 this dictionary may include metadata for additional
413 channels now included in the batch.
415 Returns
416 -------
417 SeriesBlock
418 The block representation of the record batch.
420 """
421 time = batch.column("time")[0].as_py()
422 channel_names = batch.column("channel").to_pylist()
423 data = batch.column("data")
424 series_dict = {}
425 channel_dict = {}
426 for idx, channel in enumerate(channel_names):
427 series_dict[channel] = _arrow_to_numpy_array(data[idx].values)
428 channel_dict[channel] = channels[channel]
429 return cls(time, series_dict, channel_dict)
431 def _generate_column_schema(self) -> pyarrow.Schema:
432 fields = [pyarrow.field("time", pyarrow.int64())]
433 for channel, arr in self.data.items():
434 dtype = pyarrow.from_numpy_dtype(arr.dtype)
435 fields.append(pyarrow.field(channel, pyarrow.list_(dtype)))
436 return pyarrow.schema(fields)
438 def _generate_row_schema(self, dtype: pyarrow.DataType) -> pyarrow.Schema:
439 return pyarrow.schema(
440 [
441 pyarrow.field("time", pyarrow.int64()),
442 pyarrow.field("channel", pyarrow.string()),
443 pyarrow.field("data", pyarrow.list_(dtype)),
444 ]
445 )
448# backwards compatibility with previous name
449DataBlock = SeriesBlock
452def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock:
453 """Join a sequence of timeseries blocks into a single block.
455 If the SeriesBlock arguments are not sequential in time an
456 AssertionError will be thrown.
458 Parameters
459 ----------
460 *blocks : SeriesBlock
461 The timeseries blocks to concatenate.
463 Returns
464 -------
465 SeriesBlock
466 The combined timeseries block.
468 """
469 channel_dict = blocks[0].channels
470 channel_set = set(channel_dict)
471 start_time_ns = end_time_ns = blocks[0].time_ns
472 duration_ns = 0
473 for block in blocks:
474 assert set(block.data.keys()) == channel_set, (
475 "all blocks must contain the same channel sets"
476 )
477 assert block.time_ns == end_time_ns, (
478 f"block start time ({block.time_ns}) does not match "
479 f"concatenated block end time ({end_time_ns})"
480 )
481 duration_ns += block.duration_ns
482 end_time_ns += block.duration_ns
483 series_dict: dict[str, numpy.ndarray] = {}
484 for channel in channel_set:
485 series_dict[str(channel)] = numpy.concatenate(
486 [block[str(channel)].data for block in blocks]
487 )
488 return SeriesBlock(start_time_ns, series_dict, channel_dict)
491def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock:
492 """Combine multiple SeriesBlocks from the same time into a single SeriesBlock
494 Each block must contain a distinct set of channels, and the time
495 properties of each block must agree, otherwise an AssertionError
496 will be thrown.
498 Parameters
499 ----------
500 *blocks : SeriesBlock
501 The blocks to combine.
503 Returns
504 -------
505 SeriesBlock
506 The combined block.
508 """
509 time_ns = blocks[0].time_ns
510 duration_ns = blocks[0].duration_ns
511 series_dict: dict[str, numpy.ndarray] = {}
512 channel_dict: dict[str, Channel] = {}
513 for block in blocks:
514 assert block.time_ns == time_ns, "all block times must agree"
515 assert block.duration_ns == duration_ns, "all block durations must agree"
516 for channel, series in block.items():
517 assert channel not in series_dict, (
518 f"channel {channel} has already been included from another block"
519 )
520 series_dict[channel] = series.data
521 channel_dict[channel] = series.channel
522 return SeriesBlock(time_ns, series_dict, channel_dict)
525def _numpy_to_arrow_list_array(
526 data: numpy.ndarray | numpy.ma.MaskedArray, field_type: pyarrow.DataType
527) -> pyarrow.Array:
528 """Convert numpy array to arrow ListArray.
530 This function provides significant performance improvements for masked arrays
531 by avoiding double conversion. Benchmarks show 25-159x speedup over the
532 standard approach of `pyarrow.array([pyarrow.array(data)], type=field_type)`.
534 Parameters
535 ----------
536 data : numpy.ndarray or numpy.ma.MaskedArray
537 Input array data to convert. Masked arrays are handled efficiently
538 with direct Arrow buffer construction.
539 field_type : pyarrow.DataType
540 PyArrow list type for the output array.
542 Returns
543 -------
544 pyarrow.Array
545 ListArray containing the data as a single-element list, preserving
546 mask information for masked arrays as Arrow nulls.
548 Notes
549 -----
550 For masked arrays, this function uses direct ListArray construction from
551 components instead of double conversion, which eliminates the performance
552 bottleneck that can cause death spirals under high load conditions.
554 """
555 if isinstance(data, numpy.ma.MaskedArray):
556 inner_type = field_type.value_type
557 inner_array = pyarrow.array(data.data, mask=data.mask, type=inner_type)
558 offsets = pyarrow.array([0, len(data)], type=pyarrow.int32())
559 return pyarrow.ListArray.from_arrays(offsets, inner_array, type=field_type)
560 return pyarrow.array([data], type=field_type)
563def _arrow_to_numpy_array(arrow_array: pyarrow.Array) -> numpy.ndarray:
564 """Convert an Arrow array to a numpy ndarray."""
565 # no null values
566 if arrow_array.null_count == 0:
567 return arrow_array.to_numpy()
569 bitmap_buffer, data_buffer = arrow_array.buffers()
570 offset = arrow_array.offset
572 # compute mask from arrow bitmap
573 # see https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps
574 bitmap = numpy.frombuffer(bitmap_buffer, numpy.uint8, len(bitmap_buffer))
575 length = len(arrow_array) + offset
576 mask = numpy.unpackbits(bitmap, bitorder="little")[:length]
577 if offset > 0:
578 mask = mask[offset:]
579 mask = 1 - mask
581 # create masked array
582 dtype = _arrow_to_numpy_dtype(arrow_array.type)
583 data_array = numpy.frombuffer(data_buffer, dtype, length)[offset:]
584 array = numpy.ma.array(data_array, mask=mask)
585 assert len(array) == len(arrow_array)
586 return array
589def _arrow_to_numpy_dtype(dtype: pyarrow.DataType) -> numpy.dtype:
590 """Return the numpy dtype equivalent to its Arrow dtype."""
591 arrow_dtype = str(dtype)
592 if arrow_dtype == "float":
593 return numpy.dtype("float32")
594 if arrow_dtype == "double":
595 return numpy.dtype("float64")
596 return numpy.dtype(arrow_dtype)