import pickle
import logging
import json
from typing import Optional
import numpy as np
from kafka import KafkaProducer
from openbci_stream.utils import interpolate_datetime
from bci_framework.extensions import properties as prop
# from .utils import loop_consumer, fake_loop_consumer, thread_this, subprocess_this, marker_slice
# Set logger
logging.root.name = "DataAnalysis"
logging.getLogger('matplotlib.font_manager').disabled = True
logging.getLogger().setLevel(logging.WARNING)
########################################################################
########################################################################
class DataAnalysis:
""""""
# ----------------------------------------------------------------------
def __init__(self, enable_produser=False):
""""""
self._pivot = None
if enable_produser:
self._enable_commands()
# ----------------------------------------------------------------------
def _enable_commands(self):
""""""
try:
self.kafka_producer = KafkaProducer(
bootstrap_servers=[f'{prop.HOST}:9092'],
compression_type='gzip',
value_serializer=pickle.dumps,
)
except:
logging.error('Kafka not available!')
self.kafka_producer = None
# ----------------------------------------------------------------------
def send_command(self, command: str, value: Optional[dict] = {}) -> None:
""""""
if hasattr(self, 'kafka_producer'):
if self.kafka_producer is None:
logging.error('Kafka not available!')
else:
data = {'command': command,
'value': data,
}
self.kafka_producer.send('command', data)
else:
logging.error(
"To send commands add the argument 'enable_produser=True' on the declaration of EEGStream")
# ----------------------------------------------------------------------
def send_annotation(self, description: str, duration: Optional[int] = 0) -> None:
""""""
if hasattr(self, 'kafka_producer'):
if self.kafka_producer is None:
logging.error('Kafka not available!')
else:
self.kafka_producer.send('annotation', {
'action': 'annotation',
'duration': duration,
'description': description,
})
else:
logging.error(
"To send commands add the argument 'enable_produser=True' on the declaration of EEGStream")
# ----------------------------------------------------------------------
def send_feedback(self, kwargs) -> None:
""""""
self.generic_produser('feedback', kwargs)
# ----------------------------------------------------------------------
def generic_produser(self, topic: str, data: str) -> None:
""""""
if hasattr(self, 'kafka_producer'):
if self.kafka_producer is None:
logging.error('Kafka not available!')
else:
self.kafka_producer.send(topic, data)
# self.kafka_producer.send(topic, 888)
else:
logging.error(
"To send commands add the argument 'enable_produser=True' on the declaration of EEGStream")
# ----------------------------------------------------------------------
def update_buffer(self, eeg: np.ndarray, aux: np.ndarray, timestamp: float) -> None:
"""Uppdate the buffers.
Parameters
----------
eeg
The new EEG array
aux
The new AUX array
"""
c = eeg.shape[1]
if self._pivot is None:
self.buffer_eeg = np.roll(self.buffer_eeg, -c, axis=1)
self.buffer_eeg[:, -c:] = eeg
self.buffer_timestamp_ = np.roll(self.buffer_timestamp_, -c, axis=0)
self.buffer_timestamp_[-c:] = np.zeros(eeg.shape[1])
self.buffer_timestamp_[-1] = timestamp
if hasattr(self, 'buffer_eeg_split'):
self.buffer_eeg_split = np.roll(self.buffer_eeg_split, -c)
if not aux is None:
d = aux.shape[1]
self.buffer_aux = np.roll(self.buffer_aux, -d, axis=1)
self.buffer_aux[:, -d:] = aux
else:
roll = 0
if self._pivot + c >= self.buffer_eeg.shape[1]:
roll = self.buffer_eeg.shape[1] - (self._pivot + c)
self.buffer_eeg = np.roll(self.buffer_eeg, -roll, axis=1)
self.buffer_eeg[:, -eeg.shape[1]:] = eeg
self.buffer_eeg = np.roll(self.buffer_eeg, roll, axis=1)
else:
self.buffer_eeg[:, self._pivot:self._pivot + c] = eeg
self._pivot += c
self._pivot = self._pivot % self.buffer_eeg.shape[1]
if not aux is None:
d = aux.shape[1]
roll = 0
if self._pivot_aux + d >= self.buffer_aux.shape[1]:
roll = self._pivot_aux + d
if roll:
self.buffer_aux = np.roll(self.buffer_aux, -roll, axis=1)
if (self.buffer_aux[:, self._pivot_aux:self._pivot_aux + d]).shape != aux.shape:
l = self.buffer_aux[:,
self._pivot_aux:self._pivot_aux + d].shape[1]
# logging.warning([l, aux.shape[1]])
self.buffer_aux[:,
self._pivot_aux:self._pivot_aux + d] = aux[:, :l]
else:
self.buffer_aux[:,
self._pivot_aux:self._pivot_aux + d] = aux
if roll:
self.buffer_aux = np.roll(self.buffer_aux, roll, axis=1)
self._pivot_aux += d
self._pivot_aux = self._pivot_aux % self.buffer_aux.shape[1]
# ----------------------------------------------------------------------
def _get_factor_near_to(self, x: int, n: Optional[int] = 1000) -> int:
"""Get the integer number factor of `x` nearest to `n`.
This factor is used to fast resampling.
Parameters
----------
x
Samples.
n
Near factor
Returns
-------
int
Factor.
"""
a = np.array([(x) / np.arange(max(1, (x // n) - 10), (x // n) + 10)])[0]
a[a % 1 != 0] = 0
return int(a[np.argmin(np.abs(a - n))])
# ----------------------------------------------------------------------
def _create_resampled_buffer(self, x: int, resampling: Optional[int] = 1000) -> np.ndarray:
""""""
f = self._get_factor_near_to(x, resampling)
self.buffer_eeg_split = np.zeros(x)
index = np.linspace(0, x, f + 1).astype(int)[:-1]
self.buffer_eeg_split[index] = 1
# ----------------------------------------------------------------------
def create_buffer(self, seconds: Optional[int] = 30, aux_shape: Optional[int] = 3, fill: Optional[int] = 0, resampling: Optional[int] = 1000):
"""Create a buffer with fixed time length.
Since the `loop_consumer` iteraror only return the last data package, the
object `buffer_eeg` and `buffer_aux` will retain a longer (in time) data.
Parameters
----------
seconds
How many seconds will content the buffer.
aux_shape
Define the shape of aux array.
fill
Initialize buffet with this value.
resampling
The resampling size.
"""
chs = len(prop.CHANNELS)
time = prop.SAMPLE_RATE * seconds
self._create_resampled_buffer(
prop.SAMPLE_RATE * np.abs(seconds), resampling=resampling)
self.buffer_eeg = np.empty((chs, time))
self.buffer_eeg.fill(fill)
self.buffer_aux = np.empty((aux_shape, time))
self.buffer_aux.fill(fill)
self.buffer_timestamp_ = np.zeros(time)
# ----------------------------------------------------------------------
@property
def buffer_eeg_resampled(self):
""""""
return self.buffer_eeg[:, np.argwhere(self.buffer_eeg_split == 1)][:, :, 0]
# ----------------------------------------------------------------------
@property
def buffer_aux_resampled(self):
""""""
return self.buffer_aux[:, np.argwhere(self.buffer_eeg_split == 1)][:, :, 0]
# ----------------------------------------------------------------------
@property
def buffer_timestamp_resampled(self):
""""""
t = self.buffer_timestamp
if t.shape[0]:
return t[np.argwhere(self.buffer_eeg_split == 1)][:, 0]
else:
return np.array([])
# ----------------------------------------------------------------------
@property
def buffer_timestamp(self):
""""""
try:
return interpolate_datetime(self.buffer_timestamp_)
except:
return self.buffer_timestamp_