import numpy as np
import warnings
import pprint
from dataclasses import dataclass
from typing import Callable
from ada.data_containers._base import _ActiData
from ada.data_containers.generic import GenericData
[docs]
class ScoredShort(_ActiData):
"""A class for storing and handling results of scoring by the short (nightly) algorithms."""
# __slots__ = ('_epoching_method_metadata', '_scoring_method_metadata', '_original_format_metadata')
def __init__(self, data: np.ndarray, metadata: dict, fs: float, epoching_method_metadata: dict | None, scoring_method_metadata: dict):
self._data = data
self._metadata = metadata
self._fs = fs
self._channel_names = ['score', 'timestamp']
self._epoching_method_metadata = epoching_method_metadata
self._scoring_method_metadata = scoring_method_metadata
self._original_format_metadata = {'data': 0,
'metadata': 1,
'original_format_metadata': 2,
'scoring_method_metadata': 3,
'fs': self._fs,
'channel_names': self._channel_names}
if self._epoching_method_metadata is not None:
self._original_format_metadata['epoching_method_metadata'] = 4
def __repr__(self):
str1 = pprint.pformat(self._metadata, indent=4)
if self._epoching_method_metadata is None:
str3 = "This data was never epoched."
else:
str3 = pprint.pformat(self._epoching_method_metadata, indent=4)
str4 = pprint.pformat(self._scoring_method_metadata, indent=4)
return "Raw metadata:\n{}\n\nEpoching metadata:\n{}\n\nScoring metadata:\n{}".format(str1, str3, str4)
@property
def timestamp(self) -> np.ndarray:
return self._data[1]
@property
def score(self) -> np.ndarray:
"""Vector containing sleep/wake scoring (1=wake)."""
return self._data[0]
@property
def epoching_method_metadata(self) -> dict | None:
"""Metadata asssociated with the epoching method and its parameters. None if data was not epoched."""
if self._epoching_method_metadata is None:
warnings.warn("This data was never epoched.")
return self._epoching_method_metadata
@property
def scoring_method_metadata(self) -> dict:
"""Metadata asssociated with the scoring algorithm and its parameters."""
return self._scoring_method_metadata
@property
def first_sample_timestamp(self) -> float:
"""Unix timestamp of first sample."""
device = GenericData._get_device(self)
return device._convert_timestamp(self.timestamp[0], self._metadata, True)
@property
def id(self) -> str:
return GenericData._get_id(self)
[docs]
def cut_by_samples(self, start_sample: int, end_sample: int) -> "ScoredShort":
"""Create new object with the data cut by given indexes.
Args:
start_sample (int): First sample of output data.
end_sample (int): Sample after the last sample of output data.
Returns:
ScoredShort: Object containing the cutted data.
"""
return ScoredShort(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._epoching_method_metadata, self._scoring_method_metadata)
[docs]
def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "ScoredShort":
"""Create new object with the data cut by given timestamps.
Args:
start_ts (float): Unix timestamp of output data beginning.
end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data.
Returns:
ScoredShort: Object containing the cutted data.
"""
data_cut = GenericData._generic_cut(self, start_ts, end_ts)
return ScoredShort(data_cut, self._metadata, self._fs, self._epoching_method_metadata, self._scoring_method_metadata)
[docs]
@staticmethod
def load_file(path: str) -> "ScoredShort":
"""Loading file saved in the generic format provided by this package.
Args:
path (str): Path to the .ada file.
Returns:
ScoredShort: Object containing data.
"""
data, metadata, fs, _, epoching_method_metadata, scoring_method_metadata = GenericData._generic_load(path)
if scoring_method_metadata is None:
raise ValueError("Provided file is not scored!")
return ScoredShort(data, metadata, fs, epoching_method_metadata, scoring_method_metadata)
[docs]
def export(self, path: str):
"""Exports object data to the generic format. All metadata are preserved in the file.
Args:
path (str): Path to the .ada file.
"""
to_export = [self._data, self._metadata, self._original_format_metadata, self._scoring_method_metadata]
if self._epoching_method_metadata is not None:
to_export.append(self._epoching_method_metadata)
GenericData._generic_export(path, to_export)
[docs]
def total_sleep_time(self) -> float:
"""Time classified as sleep during the recording.
Returns:
float: Total sleep time in minutes.
"""
return (len(self.score) - np.sum(self.score)) / self._fs / 60
[docs]
def sleep_efficiency(self) -> float:
"""Percentage of points classified as sleep in the recording.
Returns:
float: Sleep efficiency.
"""
return (len(self.score) - np.sum(self.score)) / len(self.score) * 100
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7191872/ -- a lot of the below is taken from this article
# https://link.springer.com/article/10.1007/s11325-015-1138-6
# Seems like length of the epoch is debatable and ranges from 5 to 20 minutes more or less...
[docs]
def sleep_onset(self, minutes: int = 20) -> float:
"""Time (in minutes from beginning of the recording) after which first block of N minutes of sleep with at most 1 minute of wake begins.
Args:
minutes (int, optional): Number of consecutive minutes of sleep required. Defaults to 20.
Raises:
RuntimeError: There is no consecutive N minutes of sleep in the data.
Returns:
float: Sleep onset.
"""
epoch_length = int(minutes * 60 * self._fs)
for i in range(len(self.score) - epoch_length):
subepoch = self.score[i:i + epoch_length]
if np.sum(subepoch) <= 60 * self._fs:
return (self.timestamp[i] - self.timestamp[0]) / 60
raise RuntimeError("Your subject was not sleeping maybe?")
[docs]
def sleep_episodes(self) -> int:
"""Number of episodes of continuous sleep, no matter their length
Returns:
int: Number of sleep episodes.
"""
sleep_times = 0
for i in range(1, len(self.score)):
if self.score[i] == 1 and self.score[i - 1] == 0:
sleep_times += 1
if self.score[-1] == 0:
sleep_times += 1
return sleep_times
[docs]
def mean_sleep_episode(self) -> float:
"""Mean length of sleep episode in minutes.
Returns:
float: Mean length of sleep episode in minutes.
"""
return self.total_sleep_time() / self.sleep_episodes()
[docs]
def long_sleep_episodes(self, length: int = 5) -> int:
"""Number of sleep episodes with length greater or equal to the one given in minutes.
Args:
length (int, optional): Minutes of sleep required to sleep episode to be counted. Defaults to 5.
Returns:
int: Number of long sleep episodes.
"""
sleep_points = int(length * 60 * self._fs)
out = 0
sleep = 0
for e in self.score:
if e == 0:
sleep += 1
else:
if sleep >= sleep_points:
out += 1
sleep = 0
return out
[docs]
def sleep_fragmentation_index(self) -> float:
"""Defined as number of wake episodes during time in bed divided by total sleep time.
Returns:
float: Sleep fragmentaion index (1/min).
"""
n = -1
if self.score[0] == 1:
n += 1
if self.score[-1] == 1:
n += 1
return (self.sleep_episodes() + n) / self.total_sleep_time()
# wake after sleep onset: https://link.springer.com/article/10.1111/sbr.12103
# btw here is different definition of sol. Burdel ;))
# So this should depend on sleep onset, but no one is doing it like that???
[docs]
def wake_after_sleep_onset(self, minutes: int = 20) -> float:
"""Time during which subject was awake between sleep onset and offset (here defined as last epoch scored as sleep).
Args:
minutes (int, optional): Length of the time window, in minutes, used to calculate sleep onset (see sleep_onset for explanation). Defaults to 20.
Returns:
float: WASO (in minutes).
"""
sleep_end = np.where(self.score == 0)[0][-1]
sleep_start = int(self.sleep_onset(minutes) * 60 * self._fs)
return np.sum(self.score[sleep_start:sleep_end]) / self._fs / 60
[docs]
def sleep_report(self, minutes: int = 20, length: int = 5):
"""Summary sleep report in stdout and in human-redable format.
Args:
minutes (int, optional): Number of consecutive minutes of sleep required when calculating sleep onset. Defaults to 20.
length (int, optional): Minimal length of long sleep episode in minutes. Defaults to 5.
"""
print(f"Total sleep time: {round(self.total_sleep_time(), 2)} minutes")
print(f"Sleep efficiency: {round(self.sleep_efficiency(), 2)}%")
print(f"Sleep fragmentation index: {round(self.sleep_fragmentation_index(), 2)} [1/min]")
print(f"Number of sleep episodes: {self.sleep_episodes()}")
print(f"Number of long (>{length} min) sleep episodes {self.long_sleep_episodes(length)}")
print(f"Mean lenght of sleep episode: {round(self.mean_sleep_episode(), 2)}")
try:
print(f"Sleep onset (with sleep >{minutes}): {round(self.sleep_onset(minutes), 2)} min")
print(f"WASO (with sleep > {minutes}): {round(self.wake_after_sleep_onset(minutes), 2)} min")
except RuntimeError:
print(f"Subject was not sleeping for consecutive {minutes} during the recording. No sleep onset or WASO computed.")
[docs]
@dataclass(slots=True, eq=False)
class PSGScore:
"""A class for storing and handling PSG staging."""
_psg_stages: list | np.ndarray
_start_timestamp: float
_end_timestamp: float
_epoch_length: float
_collapse_stages: bool | Callable = True
def _sleep_stage_heuristic(self, a) -> int | None:
a = str(a)
if '1' in a.lower():
return 0
elif '2' in a.lower():
return 0
elif '3' in a.lower():
return 0
elif 'art' in a.lower():
return None
elif 'un' in a.lower():
return None
elif 'r' in a.lower():
return 0
else:
return 1
def __post_init__(self):
if callable(self._collapse_stages):
self._psg_stages = np.array([self._collapse_stages(e) for e in self._psg_stages], dtype=float)
self._collapse_stages = True
elif self._collapse_stages:
self._psg_stages = np.array([self._sleep_stage_heuristic(e) for e in self._psg_stages], dtype=float)
[docs]
def resample(self, new_fs: float) -> "PSGScore":
"""Resample scorings to new epoch length (primarly to synchronize them with fs of acigraphic data). Works when epoch length is longer than sample spacing in actigraphic data.
Args:
new_fs (float): Sampling frequency to which sleep/wake scorings will be resampled.
Returns:
PSGScore: New object with resampled scorings.
"""
n_up = int(self._epoch_length * new_fs)
if n_up != self._epoch_length * new_fs:
raise ValueError("WTF BRO???")
psg_stages = np.repeat(self._psg_stages, n_up)
new_end_ts = self._end_timestamp + self._epoch_length - 1 / new_fs
out = PSGScore(psg_stages, self._start_timestamp, new_end_ts, self._epoch_length / n_up, False)
out._collapse_stages = self._collapse_stages
return out
[docs]
def change_epoch(self, epoch_length: int, wake_percentage: float) -> "PSGScore":
"""Change epoch length from the original saved in tags to some other. Intended mainly to switch between 20, 30 and 60 second epochs and to downsample.
Args:
epoch_length (int): Output epoch length in seconds.
wake_percentage (float): Parameter describing how many points inside a new epoch must be wake, so the new epoch will also be wake.
Returns:
PSGScore: New object with new epoch length.
"""
assert self._collapse_stages, "This method will not work for full staging. Use on collapsed sleep/wake scores."
if epoch_length <= self._epoch_length: # This condition might be stupid
return self.resample(1 / epoch_length)
fs = 1 / self.epoch_length
if self._epoch_length > 1:
psg_stages = self.resample(1).psg_stages
fs = 1
else:
psg_stages = self._psg_stages
wake_threshold = epoch_length * fs * wake_percentage
n_epochs = int(len(psg_stages) / epoch_length / fs)
new_stages = np.sum(np.array(np.split(psg_stages[:int(n_epochs * epoch_length * fs)], n_epochs)), axis=1) >= wake_threshold
new_last_ts = self._start_timestamp + epoch_length * (n_epochs - 1)
out = PSGScore(new_stages, self._start_timestamp, new_last_ts, epoch_length, False)
out._collapse_stages = True
return out
[docs]
def export(self, path: str):
"""Save psg scorings as a .ada file.
Args:
path (str): Path to the output file.
"""
metadata = {'start_timestamp': self._start_timestamp,
'end_timestamp': self._end_timestamp,
'epoch_length': self._epoch_length,
'collapse_stages': self._collapse_stages}
if not path.endswith('.ada'):
path = path + '.ada'
with open(path, 'wb') as f:
np.savez_compressed(f, self._psg_stages, np.array(metadata))
[docs]
@staticmethod
def load_file(path: str) -> "PSGScore":
"""Load data saved by the export method.
Args:
path (str): Path to the file.
Returns:
PSGScore: Loaded data.
"""
files = np.load(path, allow_pickle=True)
data = files['arr_0']
metadata = files['arr_1'].item()
out = PSGScore(data, metadata['start_timestamp'], metadata['end_timestamp'], metadata['epoch_length'], False)
out._collapse_stages = metadata['collapse_stages']
return out
@property
def psg_stages(self) -> np.ndarray | list:
"""Sleep/wake scorings."""
return self._psg_stages
@property
def start_timestamp(self) -> float:
"""Unix timestamp of first epoch beginning."""
return self._start_timestamp
@property
def end_timestamp(self) -> float:
"""Unix timestamp of last epoch beginning."""
return self._end_timestamp
@property
def epoch_length(self) -> float:
"""Epoch length in seconds."""
return self._epoch_length