Source code for bci_framework.default_extensions.Tutorial_Fourier_stream_consumer.main

"""
=================
Analysis Consumer
=================
"""

from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from typing import TypeVar

KAFKA_STREAM = TypeVar('Kafka')


########################################################################
[docs]class AnalysisConsumer(DataAnalysis): """Analysis Consumer.""" # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): """""" super().__init__(*args, **kwargs) self.stream() # ---------------------------------------------------------------------- @loop_consumer('spectrum') # consume only the `spectrum` topic def stream(self, data: KAFKA_STREAM): """""" data = data.value['data'] # read the data EEG = data['amplitude'] W = data['frequency']
if __name__ == '__main__': AnalysisConsumer()