Source code for bci_framework.kafka_scripts.record

"""
======
Record
======

Kafka consumer to save EEG streamings into HDF5 format using the `Data storage
handler <https://openbci-stream.readthedocs.io/en/latest/notebooks/07-data_storage_handler.html>`_
defined in `OpenBCI-Stream <https://openbci-stream.readthedocs.io/en/latest/index.html>`_.
"""

import signal
import atexit
import os
import sys
from typing import TypeVar

if home := os.getenv('BCISTREAM_HOME'):
    sys.stderr = open(os.path.join(home, 'records', 'log.stderr'), 'w')
    sys.stdout = open(os.path.join(home, 'records', 'log.stdout'), 'w')

from datetime import datetime
from openbci_stream.utils import HDF5Writer

from bci_framework.extensions import properties as prop
from bci_framework.extensions.data_analysis.utils import loop_consumer

KafkaStream = TypeVar('kafka-stream')


########################################################################
[docs]class RecordTransformer: """This consumer is basically an extension.""" # ---------------------------------------------------------------------- def __init__(self): """""" now = datetime.now() filename = now.strftime('%x-%X').replace('/', '_').replace(':', '_') records_dir = os.path.join(os.environ.get( 'BCISTREAM_HOME', '~/.bciframework'), 'records') os.makedirs(records_dir, exist_ok=True) self.writer = HDF5Writer(os.path.join( records_dir, f'record-{filename}.h5')) header = {'sample_rate': prop.SAMPLE_RATE, 'streaming_sample_rate': prop.STREAMING_PACKAGE_SIZE, 'datetime': now.timestamp(), 'montage': prop.MONTAGE_NAME, 'channels': prop.CHANNELS, } self.writer.add_header(header, prop.HOST) # trying to finish well signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) atexit.register(self.stop) self.save_data() # ---------------------------------------------------------------------- @loop_consumer('eeg', 'marker', 'annotation') def save_data(self, kafka_stream: KafkaStream, topic: str, **kwargs) -> None: """Write data on every strem package. Parameters ---------- kafka_stream Kafka stream with deserialized data. topic The topic of the stream. """ if not self.writer.f.isopen: return if topic == 'eeg': dt = kafka_stream.value['context']['binary_created'] eeg, aux = kafka_stream.value['data'] self.writer.add_eeg(eeg, dt) self.writer.add_aux(aux) # print(dt) elif topic == 'marker': dt = kafka_stream.value['datetime'] marker = kafka_stream.value['marker'] self.writer.add_marker(marker, dt) elif topic == 'annotation': onset = kafka_stream.value['onset'] duration = kafka_stream.value['duration'] description = kafka_stream.value['description'] self.writer.add_annotation(onset, duration, description) # ---------------------------------------------------------------------- def stop(self, *args, **kwargs) -> None: """""" self.writer.close()
if __name__ == '__main__': RecordTransformer()