"""
=====================
Visual working memory
=====================
"""
from bci_framework.extensions.stimuli_delivery import StimuliAPI
from bci_framework.extensions.stimuli_delivery.utils import keypress
from bci_framework.extensions.stimuli_delivery.utils import Widgets as w
from bci_framework.extensions.stimuli_delivery.utils import Tone as t
from bci_framework.extensions.stimuli_delivery.utils import Units as u
from browser import document, timer, html, window
from points import get_points
import time
import random
from typing import Literal
import logging
COLORS = [
'#1f77b4',
'#ff7f0e',
'#2ca02c',
'#d62728',
'#9467bd',
'#8c564b',
'#e377c2',
'#7f7f7f',
'#bcbd22',
'#17becf',
]
UNICODE_CUES = {
'Right': '🡪',
'Left': '🡨',
}
########################################################################
[docs]class VisualWorkingMemory(StimuliAPI):
"""Visual working memory: change detection task."""
# ----------------------------------------------------------------------
def __init__(self, *args, **kwargs):
""""""
super().__init__(*args, **kwargs)
self.add_stylesheet('styles.css')
self.build_areas()
self.show_cross()
self.dashboard <= w.label(
'Visual working memory - Change detection task', 'headline4')
self.dashboard <= html.BR()
# Markers
self.dashboard <= w.label('Squares', 'headline4')
self.dashboard <= w.slider(
label='Squares size:',
min=0.1,
max=5,
step=0.01,
value=1.17,
unit='dva',
id='size'
)
self.dashboard <= w.slider(
label='Minimum distance between Squares:',
min=1,
max=10,
step=0.01,
value=3.5,
unit='dva',
id='distance'
)
# Intervals
self.dashboard <= w.label('Intervals', 'headline4')
self.dashboard <= w.slider(
label='Inter trial break:',
min=0,
max=3000,
value=2000,
unit='ms',
id='break',
)
self.dashboard <= w.slider(
label='Arrow cue:',
min=100,
max=1000,
value=200,
unit='ms',
id='cue',
)
self.dashboard <= w.slider(
label='Memory array:',
min=50,
max=500,
value=100,
unit='ms',
id='memory_array',
)
self.dashboard <= w.slider(
label='Retention interval:',
min=500,
max=1500,
value=900,
unit='ms',
id='retention'
)
self.dashboard <= w.slider(
label='Test array:',
min=500,
max=5000,
value=2000,
unit='ms',
id='test_array'
)
# Trial
self.dashboard <= w.label('Trial', 'headline4')
self.dashboard <= w.checkbox(
label='Task level:',
options=[[f'{i} squares', i in [1, 2, 4]] for i in range(1, 7)],
on_change=None,
id='task_level',
)
self.dashboard <= w.slider(
label='Numbers of trials per level:',
min=1,
max=500,
value=100,
discrete=True,
markers=True,
id='trials'
)
# Presentation
self.dashboard <= w.label('Presentation', 'headline4')
self.dashboard <= w.slider(
label='Distance from monitor:',
min=0.1,
max=3,
step=0.01,
value=0.3,
unit='m',
id='d'
)
self.dashboard <= w.slider(
label='Monitor DPI:',
min=1,
max=450,
step=1,
value=141,
unit='dpi',
id='dpi'
)
self.dashboard <= w.switch(
label='Record EEG',
checked=False,
id='record',
)
self.dashboard <= w.switch(
label='External marker synchronizer',
checked=False,
on_change=self.synchronizer,
id='record',
)
self.dashboard <= w.button(
'Test shapes', raised=False, outlined=True, on_click=self.test_shapes)
self.dashboard <= w.button('Start run', on_click=self.start)
self.dashboard <= w.button('Stop run', on_click=self.stop)
# ----------------------------------------------------------------------
[docs] def test_shapes(self) -> None:
"""Preview the size and distribution of the squares."""
self.clear()
keypress(self.handle_response)
u(d=w.get_value('d'), dpi=w.get_value('dpi'))
self.build_squares_area()
self.soa(cue='Right', shapes=4, change=True)
self.memory_array(cue='Right', shapes=4, change=True)
# ----------------------------------------------------------------------
[docs] def start(self):
"""Start the run.
A run consist in a consecutive trials execution.
"""
if w.get_value('record'):
self.start_record()
self.build_trials()
timer.set_timeout(lambda: self.run_pipeline(
self.pipeline_trial, self.trials, callback=self.clear), 2000)
# ----------------------------------------------------------------------
[docs] def stop(self):
"""Stop pipeline execution."""
self.stop_pipeline()
self.clear()
if w.get_value('record'):
timer.set_timeout(self.stop_record, 2000)
# ----------------------------------------------------------------------
[docs] def build_trials(self) -> None:
"""Define the `trials` and `pipeline trials`.
The `trials` consist (in this case) in a list of cues.
The `pipeline trials` is a set of couples `(callable, duration)` that
define a single trial, this list of functions are executed asynchronously
and repeated for each trial.
"""
levels = [int(v[:1]) for v in w.get_value('task_level')]
trials = levels * w.get_value('trials')
cues = [random.choice(['Right', 'Left']) for _ in trials]
changes = [random.choice([0, 1]) for _ in trials]
trials = list(zip(trials, cues, changes))
self.trials = [{'cue': cue,
'shapes': shapes,
'change': bool(change),
} for shapes, cue, change in trials]
random.shuffle(self.trials)
self.pipeline_trial = [
(self.soa, w.get_value('break')),
(self.cue, w.get_value('cue')),
(self.memory_array, w.get_value('memory_array')),
(self.retention, w.get_value('retention')),
(self.test_array, w.get_value('test_array')),
]
# ----------------------------------------------------------------------
[docs] def soa(self, cue: Literal['Right', 'Left'], shapes: int, change: bool) -> None:
"""Stimulus onset asynchronously."""
self.clear()
u(d=w.get_value('d'), dpi=w.get_value('dpi'))
self.build_squares_area()
self.build_markers(shapes)
if change:
self.prepare_shuffle(cue)
# ----------------------------------------------------------------------
[docs] def cue(self, cue: Literal['Right', 'Left'], shapes: int, change: bool) -> None:
"""Show the cue to indicate the hemifield target."""
if not hasattr(self, 'cue_placeholder'):
self.cue_placeholder = html.SPAN('', id='cue', style={
'font-size': u.dva(8),
'padding-top': f'calc(50vh - {u.dva(8, scale=1/50)})',
})
self.stimuli_area <= self.cue_placeholder
self.cue_placeholder.html = UNICODE_CUES[cue]
self.cue_placeholder.style = {'display': 'flex'}
# ----------------------------------------------------------------------
[docs] def memory_array(self, cue: Literal['Right', 'Left'], shapes: int, change: bool) -> None:
"""Show the initial array."""
if hasattr(self, 'cue_placeholder'):
self.cue_placeholder.style = {'display': 'none'}
self._set_visible_markers(True)
# ----------------------------------------------------------------------
[docs] def retention(self, cue: Literal['Right', 'Left'], shapes: int, change: bool) -> None:
"""Remove the array."""
self._set_visible_markers(False)
# ----------------------------------------------------------------------
[docs] def test_array(self, cue: Literal['Right', 'Left'], shapes: int, change: bool) -> None:
"""Show the array again."""
if change: # Show an array with differences
f = self.shuffled_colors
for i, color in enumerate(f):
if cue == 'Right':
self.markers_r[i][0].style = {'background-color': color}
elif cue == 'Left':
self.markers_l[i][0].style = {'background-color': color}
self._set_visible_markers(True)
if not hasattr(self, 'button_different'):
self.button_different = w.button('Different (q)', unelevated=False, outlined=True, on_click=lambda: self.manual_trial(
'DIFFERENT'), Class='test-button', id='different')
self.button_identical = w.button('Identical (p)', unelevated=False, outlined=True, on_click=lambda: self.manual_trial(
'IDENTICAL'), Class='test-button', id='identical')
self.stimuli_area <= self.button_different
self.stimuli_area <= self.button_identical
self.button_identical.style = {'display': 'block'}
self.button_different.style = {'display': 'block'}
keypress(self.handle_response)
# ----------------------------------------------------------------------
[docs] def handle_response(self, response: str) -> None:
"""Capture the subject keyboard response."""
if response == 'q':
print("DIFFFERENT")
elif response == 'p':
print("IDENTICAL")
else:
print("NO RESPONSE")
# ----------------------------------------------------------------------
[docs] def _set_visible_markers(self, visible: bool) -> None:
"""Toggle the squares visibility."""
if hasattr(self, 'markers_r'):
right = [marker for marker, color in self.markers_r]
left = [marker for marker, color in self.markers_l]
for marker in right + left:
if visible:
marker.style = {'display': 'block'}
else:
marker.style = {'display': 'none'}
# ----------------------------------------------------------------------
[docs] def clear(self) -> None:
"""Remove all elements from view."""
self._set_visible_markers(False)
if hasattr(self, 'cue_placeholder'):
self.cue_placeholder.style = {'display': 'none'}
if hasattr(self, 'button_different'):
self.button_identical.style = {'display': 'none'}
self.button_different.style = {'display': 'none'}
# ----------------------------------------------------------------------
[docs] def build_markers(self, shapes: int) -> None:
"""Display the squares."""
size = w.get_value('size')
distance = w.get_value('distance')
points_r = get_points(shapes, distance, size)
points_l = get_points(shapes, distance, size)
colors = COLORS + COLORS
random.shuffle(colors)
for element in document.select('.bci-marker'):
element.remove()
self.markers_r = []
for i, point in enumerate(points_r):
self.markers_r.append([html.DIV('', Class='bci-marker', id=f'right_{i}', style={
'background-color': colors[i],
'width': u.dva(size),
'height': u.dva(size),
'left': f'calc({u.dva(point[0])} - {u.dva(size, scale=0.5)})',
'top': f'calc({u.dva(point[1])} - {u.dva(size, scale=0.5)})',
'position': 'absolute',
'display': 'none',
}), colors[i]])
document.select_one(
'.markers_r-placeholder') <= self.markers_r[-1][0]
self.markers_l = []
for j, point in enumerate(points_l):
self.markers_l.append([html.DIV('', Class='bci-marker', id=f'left_{i}', style={
'background-color': colors[i + j + 1],
'width': u.dva(size),
'height': u.dva(size),
'left': f'calc({u.dva(point[0])} - {u.dva(size, scale=0.5)})',
'top': f'calc({u.dva(point[1])} - {u.dva(size, scale=0.5)})',
'position': 'absolute',
'display': 'none',
}), colors[i + j + 1]])
document.select_one(
'.markers_l-placeholder') <= self.markers_l[-1][0]
# ----------------------------------------------------------------------
[docs] def prepare_shuffle(self, cue: Literal['Right', 'Left'], differences: int = 1) -> None:
"""Makes sure of changing the color of the squares."""
right = [color for marker, color in self.markers_r]
left = [color for marker, color in self.markers_l]
single = [s for s in (right + left) if (right + left).count(s) == 1]
usable = list(set(COLORS) - set(right + left)) + single
if cue == 'Right':
original = right[:differences]
elif cue == 'Left':
original = left[:differences]
mix = original + usable
while [a == b for a, b in zip(original, mix[:differences])].count(True):
random.shuffle(mix)
self.shuffled_colors = mix[:differences]
# ----------------------------------------------------------------------
[docs] def synchronizer(self, value: bool) -> None:
"""Show or hide synchronizer."""
if value:
self.show_synchronizer()
else:
self.hide_synchronizer()
# ----------------------------------------------------------------------
[docs] def build_squares_area(self) -> None:
"""Create a space to positioning the squares."""
if element := document.select_one('.markers_l-placeholder'):
element.remove()
if element := document.select_one('.markers_r-placeholder'):
element.remove()
self.stimuli_area <= html.DIV(Class='markers_l-placeholder',
style={
'width': u.dva(7.2),
'height': u.dva(13.15),
'margin-top': f'calc(50vh - {u.dva(13.15, scale=0.5)})',
'margin-left': f'calc(50% - {u.dva(7.2, scale=0.5)} - {u.dva(5.4)})',
'background-color': '#f3f3f3',
'position': 'absolute',
'z-index': 10,
})
self.stimuli_area <= html.DIV(Class='markers_r-placeholder',
style={
'width': u.dva(7.2),
'height': u.dva(13.15),
'margin-top': f'calc(50vh - {u.dva(13.15, scale=0.5)})',
'margin-left': f'calc(50% - {u.dva(7.2, scale=0.5)} + {u.dva(5.4)})',
'background-color': '#f3f3f3',
'position': 'absolute',
'z-index': 10,
})
if __name__ == '__main__':
VisualWorkingMemory()