Data analysis

This feature able the user to develop real-time data analysis, consist of the complete Python-powered environment, with a set of custom methods for agile development.

Extensions > New Extension > Data analysis

Bare minimum

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

if __name__ == '__main__':
    Analysis()

Data stream access

The data stream is accessed asynchronously with the loop_consumer decorator from bci_framework.extensions.data_analysis, this decorator requires the Kafka topics to access.

There is 2 topics availables for loop_consumer: eeg and marker.

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @loop_consumer('eeg', 'marker')
    def stream(self):
        print('Incoming data...')

if __name__ == '__main__':
    Analysis()

The decorated method receives 3 optional arguments: data, topic and frame.

data: (eeg, aux) if topic is eeg, marker_value if topic is marker
kafka_stream: The stream object from Kafka.
topic: The topic of the Kafka stream, this object is available too in the object data.topic
frame: Incremental flag with the counter of streamed data.
latency: The time bewteen acquisition and read.
[ ]:
@loop_consumer('eeg')
def stream(self, data, topic, frame, latency):
    eeg, aux = data

    print(f'Incoming data #{frame}')
    print(f'EEG{eeg.shape}')
    print(f'AUX{aux.shape}')
    print(f'Topic: {topic}')
    print(f'Latency: {latency}')

The above code will execute every data stream input, and the below code will execute only when a marker is streamed.

[ ]:
@loop_consumer('marker')
def stream(self, data, topic, frame):
    marker_value = data

    print(f'Incoming marker: {marker_value}')

Is not possible, use the decorator loop_consumer in more than one place, so the argument topic could be used to create a flow control.

[ ]:
@loop_consumer('eeg', 'marker')
def stream(self, data, topic, frame):

    if topic == 'eeg':
        eeg, aux = data
        print("EEG data incomming..")

    elif topic == 'marker':
        marker_value = data
        print("Marker incomming..")

Simulate data stream

Using fake_loop_consumer instead of loop_consumer is possible to create a fake data stream.

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, fake_loop_consumer
import logging

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @fake_loop_consumer('eeg')
    def stream(self):
        logging.debug('Incoming data...')

if __name__ == '__main__':
    Analysis()

Built in methods

Buffer / Sliding window

We can use self.create_buffer to implement an automatic buffer with a fixed time view, for example, a buffer of 30 seconds:

[ ]:
self.create_buffer(seconds=30)

The data can be accesed with self.buffer_eeg and self.buffer_aux

[ ]:
class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         eeg = self.buffer_eeg
         aux = self.buffer_aux

The self.create_buffer method receives other arguments like aux_shape, fill and samples.

[ ]:
self.create_buffer(seconds=30, aux_shape=3, fill=0, resampling=1000)
aux_shape: The dimension of the auxiliary data, 3 by default.
fill: Initialize buffet with this value, 0 by default.
resampling: This value is used to resampling the data.

Resampling

The resampling is defined when the buffer is created, with the argument resampling this value is not strictly used, instead a near and optimal value is calculated based on the sampling rate and the buffer size.

[ ]:
class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30, resampling=1000)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         eeg = self.buffer_eeg_resampled
         aux = self.buffer_aux_resampled

         print(f'EEG{eeg.shape}')
         print(f'AUX{aux.shape}')

The resampling will not affect the buffer, the both data are accessible all the time.

Data slicing referenced by markers

Consist of a method that crops the available data with a marker reference. The decorator @marker_slicing do the trick. Receives the markers, a t0 that indicate how many seconds to crop before the marker and the duration of the slice.

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, marker_slicing

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Needs to be greater than the duration of the slice.
        self.create_buffer(seconds=30, aux_shape=3)
        self.slicing()

    @marker_slicing(['Right', 'Left'], t0=-2, duration=6)
    def slicing(self, eeg, aux, timestamp, marker):

        print(eeg.shape)
        print(aux.shape)
        print(timestamp.shape)
        print(marker)
        print()

if __name__ == '__main__':
    Analysis()

The above code will be executed each time that and slice is available. The buffer must be greater than the duration of the desired slice.

eeg: The EEG data croped (channels, time).
aux: The AUX data croped (aux, time).
timestamp: The timestamp vector.
marker: The marker that trigger the crop.
latency: The time bewteen acquisition and read.

Receive markers

The markers can be accessed specifying the topic marker in the loop_consumer

[ ]:
@loop_consumer('marker')

Send commands, annotations and feedbacks

The commands are used to communicate outputs into the real world, or other systems, they can also be read in the Stimuli delivery to create neurofeedback applications. To activate this feature just add the enable_produser argument as True into the DataAnalysis subclass.

[ ]:
if __name__ == '__main__':
    Analysis(enable_produser=True)

Once activate the producer, the methods self.send_command, self.send_feedback and self.end_annotationare available.

[ ]:
@loop_consumer('eeg')
def stream(self):
     eeg = self.buffer_eeg_resampled
     aux = self.buffer_aux_resampled

    [...]  # amazing data analysis

    self.send_command('MyCommand', value=45)

The self.send_annotation also receive the optional argument duration.

[ ]:
self.send_annotation('Data record start')
self.send_annotation('The subject yawn', duration=5)

The self.send_feedback receive any kind of Python data structure.

[ ]:
feed = {'var1': 0,
        'var2': True,
        'var3': 'Right',
       }
self.send_feedback(**feed)

self.send_feedback(a=0, b=2.3, c='Left')

A generic producer also is available:

[ ]:
self.generic_produser(topic, data)

Communication between analysis process

Let’s build a script that will acts like Kafka transformer, this script reads the raw EEG data, calculate their EEG spectrum using Fourier and inject back again into the stream. This can be other advanced processing tasks, like classifications using neural networks.

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30, resampling=1000)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         W, EEG = fourier(self.buffer_eeg, fs=prop.SAMPLE_RATE, axis=1)
         data = {'amplitude': EEG,
                 'frequency': W}
         self.generic_produser('spectrum', data)

if __name__ == '__main__':
    Analysis(enable_produser=True)

Now, in another script, we will write a Kafka consumer this script will consume from the previously created stream.

[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @loop_consumer('spectrum')
    def stream(self, data):
        data = data.value['data']

        EEG = data['amplitude']
        W = data['frequency']

if __name__ == '__main__':
    Analysis()

This examples are available by default in the framework extensions explorer.

The spectrum topic must be created before to use the topic:

[ ]:
kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic feedback

Framework integration

BCI-Framework can execute any number of scripts as an independent process, the system will handle the interruption and show information about the CPU and memory usage.

Data analysis > Data analysis

22c9ace80e264a99b3e9041c8bc537d3