Coverage for src / tracekit / exporters / hdf5.py: 63%
70 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""HDF5 export functionality.
3This module provides trace export to HDF5 format with metadata attributes.
6Example:
7 >>> from tracekit.exporters.hdf5 import export_hdf5
8 >>> export_hdf5(trace, "output.h5")
10References:
11 HDF5 specification (https://www.hdfgroup.org/)
12"""
14from __future__ import annotations
16from datetime import datetime
17from pathlib import Path
18from typing import Any
20import numpy as np
22try:
23 import h5py
25 HAS_H5PY = True
26except ImportError:
27 HAS_H5PY = False
29from tracekit.core.types import DigitalTrace, WaveformTrace
32def export_hdf5(
33 data: WaveformTrace | DigitalTrace | dict[str, WaveformTrace | DigitalTrace],
34 path: str | Path,
35 *,
36 compression: str | None = "gzip",
37 compression_opts: int = 4,
38 include_metadata: bool = True,
39) -> None:
40 """Export data to HDF5 format.
42 Args:
43 data: Data to export. Can be:
44 - Single WaveformTrace or DigitalTrace
45 - Dictionary mapping names to traces
46 path: Output file path.
47 compression: Compression algorithm ("gzip", "lzf", None).
48 compression_opts: Compression level (1-9 for gzip).
49 include_metadata: Include trace metadata as attributes.
51 Raises:
52 ImportError: If h5py is not installed.
54 Example:
55 >>> export_hdf5(trace, "waveform.h5")
56 >>> export_hdf5({"ch1": ch1, "ch2": ch2}, "channels.h5")
57 """
58 if not HAS_H5PY:
59 raise ImportError("h5py is required for HDF5 export. Install with: pip install h5py")
61 path = Path(path)
63 if isinstance(data, WaveformTrace | DigitalTrace):
64 data = {"trace": data}
66 with h5py.File(path, "w") as f:
67 # Add file-level metadata
68 f.attrs["created"] = datetime.now().isoformat()
69 f.attrs["tracekit_version"] = "1.0"
70 f.attrs["format"] = "tracekit_hdf5"
72 for name, trace in data.items():
73 _write_trace_dataset(
74 f,
75 name,
76 trace,
77 compression,
78 compression_opts,
79 include_metadata,
80 )
83def _write_trace_dataset(
84 f: h5py.File,
85 name: str,
86 trace: WaveformTrace | DigitalTrace,
87 compression: str | None,
88 compression_opts: int,
89 include_metadata: bool,
90) -> None:
91 """Write trace to HDF5 dataset.
93 Args:
94 f: HDF5 file object.
95 name: Dataset name.
96 trace: Trace to write.
97 compression: Compression algorithm.
98 compression_opts: Compression level.
99 include_metadata: Include metadata as attributes.
100 """
101 # Create dataset
102 dtype = np.float64 if isinstance(trace, WaveformTrace) else np.bool_
104 kwargs = {}
105 if compression:
106 kwargs["compression"] = compression
107 if compression == "gzip": 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was always true
108 kwargs["compression_opts"] = compression_opts # type: ignore[assignment]
110 ds = f.create_dataset(name, data=trace.data.astype(dtype), **kwargs)
112 # Add metadata attributes
113 if include_metadata: 113 ↛ exitline 113 didn't return from function '_write_trace_dataset' because the condition on line 113 was always true
114 meta = trace.metadata
116 ds.attrs["sample_rate"] = meta.sample_rate
117 ds.attrs["time_base"] = meta.time_base
119 if meta.vertical_scale is not None:
120 ds.attrs["vertical_scale"] = meta.vertical_scale
122 if meta.vertical_offset is not None:
123 ds.attrs["vertical_offset"] = meta.vertical_offset
125 if meta.acquisition_time is not None:
126 ds.attrs["acquisition_time"] = meta.acquisition_time.isoformat()
128 if meta.source_file is not None:
129 ds.attrs["source_file"] = str(meta.source_file)
131 if meta.channel_name is not None:
132 ds.attrs["channel_name"] = meta.channel_name
134 if meta.trigger_info: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 for key, value in meta.trigger_info.items():
136 ds.attrs[f"trigger_{key}"] = value
138 # Type indicator
139 ds.attrs["trace_type"] = "waveform" if isinstance(trace, WaveformTrace) else "digital"
142def export_measurement_results(
143 results: dict[str, Any],
144 path: str | Path,
145 *,
146 group_name: str = "measurements",
147) -> None:
148 """Export measurement results to HDF5.
150 Args:
151 results: Dictionary of measurement results.
152 path: Output file path.
153 group_name: HDF5 group name for results.
155 Raises:
156 ImportError: If h5py is not installed.
158 Example:
159 >>> results = measure(trace)
160 >>> export_measurement_results(results, "measurements.h5")
161 """
162 if not HAS_H5PY:
163 raise ImportError("h5py is required for HDF5 export")
165 path = Path(path)
167 with h5py.File(path, "a") as f:
168 grp = f.require_group(group_name)
170 for name, value in results.items():
171 if isinstance(value, dict):
172 # Nested dict (value/unit pairs)
173 sub_grp = grp.require_group(name)
174 for k, v in value.items():
175 if isinstance(v, np.ndarray):
176 sub_grp.create_dataset(k, data=v)
177 else:
178 sub_grp.attrs[k] = v
179 elif isinstance(value, np.ndarray):
180 grp.create_dataset(name, data=value)
181 else:
182 grp.attrs[name] = value
185def append_trace(
186 path: str | Path,
187 name: str,
188 trace: WaveformTrace | DigitalTrace,
189 *,
190 compression: str | None = "gzip",
191) -> None:
192 """Append trace to existing HDF5 file.
194 Args:
195 path: HDF5 file path.
196 name: Dataset name for new trace.
197 trace: Trace to append.
198 compression: Compression algorithm.
200 Raises:
201 ImportError: If h5py is not installed.
203 Example:
204 >>> append_trace("data.h5", "ch3", channel3_trace)
205 """
206 if not HAS_H5PY:
207 raise ImportError("h5py is required for HDF5 export")
209 path = Path(path)
211 with h5py.File(path, "a") as f:
212 _write_trace_dataset(f, name, trace, compression, 4, True)
215__all__ = [
216 "append_trace",
217 "export_hdf5",
218 "export_measurement_results",
219]