Source code for bci_framework.framework.widgets.connection

"""
===========
Connections
===========
"""

import os
import json
import time
import logging

from openbci_stream.acquisition import Cyton, CytonBase
from PySide2.QtCore import Qt, Signal, QThread, Slot
from PySide2.QtGui import QCursor
from PySide2.QtWidgets import QApplication

from ..dialogs import Dialogs
from ...extensions import properties as prop


########################################################################
[docs]class OpenBCIThread(QThread): """Handle the OpenBCI connection.""" connection_ok = Signal() connection_fail = Signal(object) disconnected_ok = Signal() connected = False # ----------------------------------------------------------------------
[docs] def stop(self) -> None: """Kill the trhrad.""" self.terminate()
# ----------------------------------------------------------------------
[docs] def run(self) -> None: """Connect and configure OpenBCI board.""" try: self.openbci = Cyton(self.mode, self.endpoint, host=self.host, capture_stream=False, daisy=prop.DAISY, montage=self.montage, streaming_package_size=self.streaming_package_size) except: self.connection_fail.emit([]) return if self.openbci.is_recycled: self.connected = True self.connection_ok.emit() return try: self.openbci.command(self.sample_rate) self.openbci.command(self.boardmode) except TimeoutError: self.connection_fail.emit( ['* OpenBCI board could not be connected the same network.']) return # Some time this command not take effect boardmode_setted = False for _ in range(10): response = self.openbci.command(self.boardmode) if response and b'Success' in response: logging.info(f'Bardmode setted in {self.boardmode} mode') boardmode_setted = True break time.sleep(0.1) boardmode = self.openbci.boardmode if not boardmode_setted: logging.warning('Boardmode not setted!') self.session_settings(self.montage, self.bias, self.gain, self.srb1, self.adsinput, self.srb2) if not self.checkBox_send_leadoff: self.openbci.leadoff_impedance( self.montage, pchan=pchan, nchan=nchan) if self.checkBox_test_signal: test_signal = self.comboBox_test_signal test_signal = getattr( CytonBase, f"TEST_{test_signal.replace(' ', '_')}") self.openbci.command(test_signal) else: all_channels = set(range(1, 17 if prop.DAISY else 9)) used_channels = set(self.montage.keys()) deactivated = all_channels.difference(used_channels) if deactivated: self.openbci.deactivate_channel(deactivated) if used_channels: self.openbci.activate_channel(used_channels) if not self.streaming(): try: self.openbci.start_stream() self.connected = True self.connection_ok.emit() if self.mode == 'wifi': self.openbci.set_latency(self.tcp_latency) except Exception as e: if 'NoBrokersAvailable' in str(e): self.connection_fail.emit( ['* Kafka is not running on the remote acquisition system.'])
# ----------------------------------------------------------------------
[docs] def session_settings(self, *args) -> None: """Create a session setting to reuse it after an impedance measurement.""" if hasattr(self, 'last_settings'): channels, bias, gain, srb1, adsinput, srb2 = self.last_settings elif args: self.last_settings = args channels, bias, gain, srb1, adsinput, srb2 = args else: return if self.checkBox_default_settings: self.openbci.command(self.openbci.DEFAULT_CHANNELS_SETTINGS) else: self.openbci.channel_settings(channels, power_down=CytonBase.POWER_DOWN_ON, gain=gain, input_type=adsinput, bias=bias, srb2=srb2, srb1=srb1)
# ----------------------------------------------------------------------
[docs] def disconnect(self) -> None: """Disconnect OpenBCI.""" try: # self.openbci.stop_stream() self.openbci.close() except: pass self.disconnected_ok.emit() self.connected = False
########################################################################
[docs]class Connection: """Widget that handle the OpenBCI connection.""" # ---------------------------------------------------------------------- def __init__(self, core): """""" self.parent_frame = core.main self.core = core # self.parent.pushButton_disconnect.hide() self.openbci = OpenBCIThread() self.openbci.streaming = lambda: getattr(self.core, 'streaming', False) self.openbci.connection_ok.connect(self.connection_ok) self.openbci.connection_fail.connect(self.connection_fail) self.openbci.disconnected_ok.connect(self.disconnected_ok) self.config = { 'mode': self.parent_frame.comboBox_connection_mode, 'port': self.parent_frame.comboBox_port, 'ip': self.parent_frame.comboBox_ip, 'host': self.parent_frame.comboBox_host, 'acquisition_sample_rate': self.parent_frame.comboBox_sample_rate, 'streaming_sample_rate': self.parent_frame.comboBox_streaming_sample_rate, 'boardmode': self.parent_frame.comboBox_boardmode, 'gain': self.parent_frame.comboBox_gain, 'input_type': self.parent_frame.comboBox_input_type, 'bias': self.parent_frame.comboBox_bias, 'srb1': self.parent_frame.comboBox_srb1, 'srb2': self.parent_frame.comboBox_srb2, 'pchan': self.parent_frame.comboBox_pchan, 'nchan': self.parent_frame.comboBox_nchan, 'test_signal_type': self.parent_frame.comboBox_test_signal, 'test_signal': self.parent_frame.checkBox_test_signal, 'tcp_latency': self.parent_frame.spinBox_tcp_latency, } self.load_config() self.update_connections() self.update_environ() self.connect() self.core.config.connect_widgets(self.update_config, self.config) # ----------------------------------------------------------------------
[docs] def on_focus(self) -> None: """Try to autoconnect.""" if getattr(self.core, 'streaming', False) and not self.openbci.connected: self.openbci_connect()
# ----------------------------------------------------------------------
[docs] def load_config(self) -> None: """Load widgets.""" self.core.config.load_widgets('connection', self.config) self.core.update_kafka(self.parent_frame.comboBox_host.currentText())
# ----------------------------------------------------------------------
[docs] def update_config(self, *args, **kwargs) -> None: """Save widgets status.""" self.core.config.save_widgets('connection', self.config)
# ----------------------------------------------------------------------
[docs] def connect(self) -> None: """Connect events.""" self.parent_frame.pushButton_connect.clicked.connect(self.on_connect) self.parent_frame.comboBox_connection_mode.activated.connect( self.update_connections) self.parent_frame.comboBox_host.textActivated.connect(self.load_config)
# ----------------------------------------------------------------------
[docs] def update_connections(self) -> None: """Set widgets for connection modes.""" if 'serial' in self.parent_frame.comboBox_connection_mode.currentText().lower(): for index in range(1, self.parent_frame.comboBox_sample_rate.count()): self.parent_frame.comboBox_sample_rate.model().item(index).setEnabled(False) if self.parent_frame.comboBox_sample_rate.currentIndex() >= 1: self.parent_frame.comboBox_sample_rate.setCurrentIndex(0) for index in range(3, self.parent_frame.comboBox_streaming_sample_rate.count()): self.parent_frame.comboBox_streaming_sample_rate.model().item( index).setEnabled(False) if self.parent_frame.comboBox_streaming_sample_rate.currentIndex() >= 3: self.parent_frame.comboBox_streaming_sample_rate.setCurrentIndex( 2) self.parent_frame.label_port_ip.setText('Port') self.parent_frame.comboBox_ip.hide() self.parent_frame.comboBox_port.show() self.parent_frame.label_latency.setEnabled(False) self.parent_frame.spinBox_tcp_latency.setEnabled(False) else: for index in range(1, self.parent_frame.comboBox_sample_rate.count()): self.parent_frame.comboBox_sample_rate.model().item(index).setEnabled(True) for index in range(3, self.parent_frame.comboBox_streaming_sample_rate.count()): self.parent_frame.comboBox_streaming_sample_rate.model().item(index).setEnabled(True) self.parent_frame.label_port_ip.setText('IP') self.parent_frame.comboBox_ip.show() self.parent_frame.comboBox_port.hide() self.parent_frame.label_latency.setEnabled(True) self.parent_frame.spinBox_tcp_latency.setEnabled(True)
# ----------------------------------------------------------------------
[docs] def on_connect(self, toggled: bool) -> None: """Event to handle connection.""" if toggled: # Connect QApplication.setOverrideCursor(QCursor(Qt.WaitCursor)) self.core.update_kafka( self.parent_frame.comboBox_host.currentText()) self.openbci_connect() else: # Disconnect self.openbci.disconnect()
# ----------------------------------------------------------------------
[docs] def openbci_connect(self) -> None: """Recollect values from GUI.""" if 'serial' in self.parent_frame.comboBox_connection_mode.currentText().lower(): mode = 'serial' endpoint = self.parent_frame.comboBox_port.currentText() os.environ['BCISTREAM_CONNECTION'] = json.dumps('serial') else: mode = 'wifi' endpoint = self.parent_frame.comboBox_ip.currentText() os.environ['BCISTREAM_CONNECTION'] = json.dumps('wifi') host = self.parent_frame.comboBox_host.currentText() sample_rate = self.parent_frame.comboBox_sample_rate.currentText() sample_rate = getattr(CytonBase, f"SAMPLE_RATE_{sample_rate}SPS") streaming_sample_rate = self.parent_frame.comboBox_streaming_sample_rate.currentText() boardmode = self.parent_frame.comboBox_boardmode.currentText() boardmode = getattr(CytonBase, f"BOARD_MODE_{boardmode}") gain = self.parent_frame.comboBox_gain.currentText() gain = getattr(CytonBase, f'GAIN_{gain}') adsinput = self.parent_frame.comboBox_input_type.currentText() adsinput = getattr(CytonBase, f'ADSINPUT_{adsinput}') bias = self.parent_frame.comboBox_bias.currentText() bias = getattr(CytonBase, f"BIAS_{bias}") srb1 = self.parent_frame.comboBox_srb1.currentText() srb1 = getattr(CytonBase, f'SRB1_{srb1}') srb2 = self.parent_frame.comboBox_srb2.currentText() srb2 = getattr(CytonBase, f'SRB2_{srb2}') pchan = self.parent_frame.comboBox_pchan.currentText() pchan = getattr(CytonBase, pchan.replace(' ', '_')) nchan = self.parent_frame.comboBox_nchan.currentText() nchan = getattr(CytonBase, nchan.replace(' ', '_')) tcp_latency = self.parent_frame.spinBox_tcp_latency.value() # channels = self.core.montage.get_mne_montage().ch_names # self.openbci = OpenBCIThread() # self.openbci.connection_ok.connect(self.connection_ok) # self.openbci.connection_fail.connect(self.connection_fail) # self.openbci.disconnected_ok.connect(self.disconnected_ok) self.openbci.mode = mode self.openbci.endpoint = endpoint self.openbci.host = host self.openbci.tcp_latency = tcp_latency self.openbci.montage = prop.CHANNELS self.openbci.streaming_package_size = int(streaming_sample_rate) self.openbci.sample_rate = sample_rate self.openbci.boardmode = boardmode self.openbci.adsinput = adsinput self.openbci.bias = bias self.openbci.srb1 = srb1 self.openbci.srb2 = srb2 self.openbci.pchan = pchan self.openbci.nchan = nchan self.openbci.gain = gain self.openbci.checkBox_send_leadoff = self.parent_frame.checkBox_send_leadoff.isChecked() self.openbci.checkBox_test_signal = self.parent_frame.checkBox_test_signal.isChecked() self.openbci.comboBox_test_signal = self.parent_frame.comboBox_test_signal.currentText() self.openbci.checkBox_default_settings = self.parent_frame.checkBox_default_settings.isChecked() self.parent_frame.pushButton_connect.setText('Connecting...') self.parent_frame.pushButton_connect.setEnabled(False) self.update_environ() self.openbci.start()
# ----------------------------------------------------------------------
[docs] def update_environ(self) -> None: """Update environment variables.""" os.environ['BCISTREAM_HOST'] = json.dumps( self.parent_frame.comboBox_host.currentText()) sps = self.parent_frame.comboBox_sample_rate.currentText() if 'k' in sps.lower(): sps = int(sps.lower().replace('k', '')) * 1000 else: sps = int(sps) os.environ['BCISTREAM_SAMPLE_RATE'] = json.dumps(sps) os.environ['BCISTREAM_STREAMING_PACKAGE_SIZE'] = json.dumps( int(self.parent_frame.comboBox_streaming_sample_rate.currentText())) os.environ['BCISTREAM_BOARDMODE'] = json.dumps( self.parent_frame.comboBox_boardmode.currentText().lower())
# ----------------------------------------------------------------------
[docs] @Slot() def connection_ok(self) -> None: """Event for OpenBCI connected.""" QApplication.restoreOverrideCursor() self.parent_frame.pushButton_connect.setText('Disconnect') self.parent_frame.pushButton_connect.setEnabled(True) self.parent_frame.pushButton_connect.setChecked(True)
# ----------------------------------------------------------------------
[docs] @Slot() def connection_fail(self, reasons): """Event for OpenBCI failed connection.""" QApplication.restoreOverrideCursor() checks = [] if reasons: checks.extend(reasons) else: if 'serial' in self.parent_frame.comboBox_connection_mode.currentText().lower(): checks.extend(['* Check that USB dongle were connected.', '* Verify serial permissions.', ]) if self.parent_frame.comboBox_host.currentText() != 'localhost': checks.extend([f'* The server could not be running, or running on a different IP that {self.parent_frame.comboBox_host.currentText()}', '* This machine must have access to the server or running on the same network.', '* The daemons `stream_eeg` `stream_rpyc` could not be running', # todo ]) if hasattr(self.core, 'conection_message'): checks.extend([self.core.conection_message]) self.core.conection_message = "" checks = '\n'.join(checks) Dialogs.critical_message( self.parent_frame, 'Connection error', f"{checks}") self.parent_frame.pushButton_connect.setText('Connect') self.parent_frame.pushButton_connect.setEnabled(True) self.parent_frame.pushButton_connect.setChecked(False)
# ----------------------------------------------------------------------
[docs] @Slot() def disconnected_ok(self) -> None: """OpenBCI disconnected.""" self.core.stop_kafka() if self.parent_frame.checkBox_test_signal.isChecked(): self.parent_frame.groupBox_settings.setEnabled(False) self.parent_frame.groupBox_leadoff_impedance.setEnabled(False) self.parent_frame.pushButton_connect.setText('Connect') self.parent_frame.pushButton_connect.setEnabled(True) self.parent_frame.pushButton_connect.setChecked(False) self.core.status_bar(right_message=('Disconnected', None)) self.parent_frame.checkBox_view_impedances.setChecked(False) self.parent_frame.stackedWidget_montage.setCurrentIndex(0)