Data analysis¶
This feature able the user to develop real-time data analysis, consist of the complete Python-powered environment, with a set of custom methods for agile development.
Extensions > New Extension > Data analysis
Bare minimum¶
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if __name__ == '__main__':
Analysis()
Data stream access¶
The data stream is accessed asynchronously with the loop_consumer
decorator from bci_framework.extensions.data_analysis
, this decorator requires the Kafka topics to access.
There is 2 topics availables for loop_consumer
: eeg
and marker
.
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stream()
@loop_consumer('eeg', 'marker')
def stream(self):
print('Incoming data...')
if __name__ == '__main__':
Analysis()
The decorated method receives 3 optional arguments: data
, topic
and frame
.
(eeg, aux)
if topic is eeg
, marker_value
if topic is marker
stream
object from Kafka.data.topic
[ ]:
@loop_consumer('eeg')
def stream(self, data, topic, frame, latency):
eeg, aux = data
print(f'Incoming data #{frame}')
print(f'EEG{eeg.shape}')
print(f'AUX{aux.shape}')
print(f'Topic: {topic}')
print(f'Latency: {latency}')
The above code will execute every data stream input, and the below code will execute only when a marker is streamed.
[ ]:
@loop_consumer('marker')
def stream(self, data, topic, frame):
marker_value = data
print(f'Incoming marker: {marker_value}')
Is not possible, use the decorator loop_consumer
in more than one place, so the argument topic
could be used to create a flow control.
[ ]:
@loop_consumer('eeg', 'marker')
def stream(self, data, topic, frame):
if topic == 'eeg':
eeg, aux = data
print("EEG data incomming..")
elif topic == 'marker':
marker_value = data
print("Marker incomming..")
Simulate data stream¶
Using fake_loop_consumer
instead of loop_consumer
is possible to create a fake data stream.
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, fake_loop_consumer
import logging
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stream()
@fake_loop_consumer('eeg')
def stream(self):
logging.debug('Incoming data...')
if __name__ == '__main__':
Analysis()
Built in methods¶
Buffer / Sliding window¶
We can use self.create_buffer
to implement an automatic buffer with a fixed time view, for example, a buffer of 30 seconds:
[ ]:
self.create_buffer(seconds=30)
The data can be accesed with self.buffer_eeg
and self.buffer_aux
[ ]:
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_buffer(seconds=30)
self.stream()
@loop_consumer('eeg')
def stream(self):
eeg = self.buffer_eeg
aux = self.buffer_aux
The self.create_buffer
method receives other arguments like aux_shape
, fill
and samples
.
[ ]:
self.create_buffer(seconds=30, aux_shape=3, fill=0, resampling=1000)
Resampling¶
The resampling is defined when the buffer is created, with the argument resampling
this value is not strictly used, instead a near and optimal value is calculated based on the sampling rate and the buffer size.
[ ]:
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_buffer(seconds=30, resampling=1000)
self.stream()
@loop_consumer('eeg')
def stream(self):
eeg = self.buffer_eeg_resampled
aux = self.buffer_aux_resampled
print(f'EEG{eeg.shape}')
print(f'AUX{aux.shape}')
The resampling will not affect the buffer, the both data are accessible all the time.
Data slicing referenced by markers¶
Consist of a method that crops the available data with a marker reference. The decorator @marker_slicing
do the trick. Receives the markers
, a t0
that indicate how many seconds to crop before the marker and the duration
of the slice.
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, marker_slicing
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Needs to be greater than the duration of the slice.
self.create_buffer(seconds=30, aux_shape=3)
self.slicing()
@marker_slicing(['Right', 'Left'], t0=-2, duration=6)
def slicing(self, eeg, aux, timestamp, marker):
print(eeg.shape)
print(aux.shape)
print(timestamp.shape)
print(marker)
print()
if __name__ == '__main__':
Analysis()
The above code will be executed each time that and slice is available. The buffer
must be greater than the duration of the desired slice.
channels, time
).aux, time
).timestamp
vector.marker
that trigger the crop.Receive markers¶
The markers can be accessed specifying the topic marker
in the loop_consumer
[ ]:
@loop_consumer('marker')
Send commands, annotations and feedbacks¶
The commands are used to communicate outputs into the real world, or other systems, they can also be read in the Stimuli delivery to create neurofeedback applications. To activate this feature just add the enable_produser
argument as True
into the DataAnalysis
subclass.
[ ]:
if __name__ == '__main__':
Analysis(enable_produser=True)
Once activate the producer, the methods self.send_command
, self.send_feedback
and self.end_annotation
are available.
[ ]:
@loop_consumer('eeg')
def stream(self):
eeg = self.buffer_eeg_resampled
aux = self.buffer_aux_resampled
[...] # amazing data analysis
self.send_command('MyCommand', value=45)
The self.send_annotation
also receive the optional argument duration
.
[ ]:
self.send_annotation('Data record start')
self.send_annotation('The subject yawn', duration=5)
The self.send_feedback receive any kind of Python data structure.
[ ]:
feed = {'var1': 0,
'var2': True,
'var3': 'Right',
}
self.send_feedback(**feed)
self.send_feedback(a=0, b=2.3, c='Left')
A generic producer also is available:
[ ]:
self.generic_produser(topic, data)
Communication between analysis process¶
Let’s build a script that will acts like Kafka transformer, this script reads the raw EEG data, calculate their EEG spectrum using Fourier and inject back again into the stream. This can be other advanced processing tasks, like classifications using neural networks.
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_buffer(seconds=30, resampling=1000)
self.stream()
@loop_consumer('eeg')
def stream(self):
W, EEG = fourier(self.buffer_eeg, fs=prop.SAMPLE_RATE, axis=1)
data = {'amplitude': EEG,
'frequency': W}
self.generic_produser('spectrum', data)
if __name__ == '__main__':
Analysis(enable_produser=True)
Now, in another script, we will write a Kafka consumer this script will consume from the previously created stream.
[ ]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier
class Analysis(DataAnalysis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stream()
@loop_consumer('spectrum')
def stream(self, data):
data = data.value['data']
EEG = data['amplitude']
W = data['frequency']
if __name__ == '__main__':
Analysis()
This examples are available by default in the framework extensions explorer.
The spectrum
topic must be created before to use the topic:
[ ]:
kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic feedback
Framework integration¶
BCI-Framework can execute any number of scripts as an independent process, the system will handle the interruption and show information about the CPU and memory usage.
Data analysis > Data analysis