Source code for ada.data_containers.scored

import numpy as np
import warnings
import pprint
from dataclasses import dataclass
from typing import Callable

from ada.data_containers._base import _ActiData
from ada.data_containers.generic import GenericData


[docs] class ScoredShort(_ActiData): """A class for storing and handling results of scoring by the short (nightly) algorithms.""" # __slots__ = ('_epoching_method_metadata', '_scoring_method_metadata', '_original_format_metadata') def __init__(self, data: np.ndarray, metadata: dict, fs: float, epoching_method_metadata: dict | None, scoring_method_metadata: dict): self._data = data self._metadata = metadata self._fs = fs self._channel_names = ['score', 'timestamp'] self._epoching_method_metadata = epoching_method_metadata self._scoring_method_metadata = scoring_method_metadata self._original_format_metadata = {'data': 0, 'metadata': 1, 'original_format_metadata': 2, 'scoring_method_metadata': 3, 'fs': self._fs, 'channel_names': self._channel_names} if self._epoching_method_metadata is not None: self._original_format_metadata['epoching_method_metadata'] = 4 def __repr__(self): str1 = pprint.pformat(self._metadata, indent=4) if self._epoching_method_metadata is None: str3 = "This data was never epoched." else: str3 = pprint.pformat(self._epoching_method_metadata, indent=4) str4 = pprint.pformat(self._scoring_method_metadata, indent=4) return "Raw metadata:\n{}\n\nEpoching metadata:\n{}\n\nScoring metadata:\n{}".format(str1, str3, str4) @property def timestamp(self) -> np.ndarray: return self._data[1] @property def score(self) -> np.ndarray: """Vector containing sleep/wake scoring (1=wake).""" return self._data[0] @property def epoching_method_metadata(self) -> dict | None: """Metadata asssociated with the epoching method and its parameters. None if data was not epoched.""" if self._epoching_method_metadata is None: warnings.warn("This data was never epoched.") return self._epoching_method_metadata @property def scoring_method_metadata(self) -> dict: """Metadata asssociated with the scoring algorithm and its parameters.""" return self._scoring_method_metadata @property def first_sample_timestamp(self) -> float: """Unix timestamp of first sample.""" device = GenericData._get_device(self) return device._convert_timestamp(self.timestamp[0], self._metadata, True) @property def id(self) -> str: return GenericData._get_id(self)
[docs] def cut_by_samples(self, start_sample: int, end_sample: int) -> "ScoredShort": """Create new object with the data cut by given indexes. Args: start_sample (int): First sample of output data. end_sample (int): Sample after the last sample of output data. Returns: ScoredShort: Object containing the cutted data. """ return ScoredShort(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._epoching_method_metadata, self._scoring_method_metadata)
[docs] def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "ScoredShort": """Create new object with the data cut by given timestamps. Args: start_ts (float): Unix timestamp of output data beginning. end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data. Returns: ScoredShort: Object containing the cutted data. """ data_cut = GenericData._generic_cut(self, start_ts, end_ts) return ScoredShort(data_cut, self._metadata, self._fs, self._epoching_method_metadata, self._scoring_method_metadata)
[docs] @staticmethod def load_file(path: str) -> "ScoredShort": """Loading file saved in the generic format provided by this package. Args: path (str): Path to the .ada file. Returns: ScoredShort: Object containing data. """ data, metadata, fs, _, epoching_method_metadata, scoring_method_metadata = GenericData._generic_load(path) if scoring_method_metadata is None: raise ValueError("Provided file is not scored!") return ScoredShort(data, metadata, fs, epoching_method_metadata, scoring_method_metadata)
[docs] def export(self, path: str): """Exports object data to the generic format. All metadata are preserved in the file. Args: path (str): Path to the .ada file. """ to_export = [self._data, self._metadata, self._original_format_metadata, self._scoring_method_metadata] if self._epoching_method_metadata is not None: to_export.append(self._epoching_method_metadata) GenericData._generic_export(path, to_export)
[docs] def total_sleep_time(self) -> float: """Time classified as sleep during the recording. Returns: float: Total sleep time in minutes. """ return (len(self.score) - np.sum(self.score)) / self._fs / 60
[docs] def sleep_efficiency(self) -> float: """Percentage of points classified as sleep in the recording. Returns: float: Sleep efficiency. """ return (len(self.score) - np.sum(self.score)) / len(self.score) * 100
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7191872/ -- a lot of the below is taken from this article # https://link.springer.com/article/10.1007/s11325-015-1138-6 # Seems like length of the epoch is debatable and ranges from 5 to 20 minutes more or less...
[docs] def sleep_onset(self, minutes: int = 20) -> float: """Time (in minutes from beginning of the recording) after which first block of N minutes of sleep with at most 1 minute of wake begins. Args: minutes (int, optional): Number of consecutive minutes of sleep required. Defaults to 20. Raises: RuntimeError: There is no consecutive N minutes of sleep in the data. Returns: float: Sleep onset. """ epoch_length = int(minutes * 60 * self._fs) for i in range(len(self.score) - epoch_length): subepoch = self.score[i:i + epoch_length] if np.sum(subepoch) <= 60 * self._fs: return (self.timestamp[i] - self.timestamp[0]) / 60 raise RuntimeError("Your subject was not sleeping maybe?")
[docs] def sleep_episodes(self) -> int: """Number of episodes of continuous sleep, no matter their length Returns: int: Number of sleep episodes. """ sleep_times = 0 for i in range(1, len(self.score)): if self.score[i] == 1 and self.score[i - 1] == 0: sleep_times += 1 if self.score[-1] == 0: sleep_times += 1 return sleep_times
[docs] def mean_sleep_episode(self) -> float: """Mean length of sleep episode in minutes. Returns: float: Mean length of sleep episode in minutes. """ return self.total_sleep_time() / self.sleep_episodes()
[docs] def long_sleep_episodes(self, length: int = 5) -> int: """Number of sleep episodes with length greater or equal to the one given in minutes. Args: length (int, optional): Minutes of sleep required to sleep episode to be counted. Defaults to 5. Returns: int: Number of long sleep episodes. """ sleep_points = int(length * 60 * self._fs) out = 0 sleep = 0 for e in self.score: if e == 0: sleep += 1 else: if sleep >= sleep_points: out += 1 sleep = 0 return out
[docs] def sleep_fragmentation_index(self) -> float: """Defined as number of wake episodes during time in bed divided by total sleep time. Returns: float: Sleep fragmentaion index (1/min). """ n = -1 if self.score[0] == 1: n += 1 if self.score[-1] == 1: n += 1 return (self.sleep_episodes() + n) / self.total_sleep_time()
# wake after sleep onset: https://link.springer.com/article/10.1111/sbr.12103 # btw here is different definition of sol. Burdel ;)) # So this should depend on sleep onset, but no one is doing it like that???
[docs] def wake_after_sleep_onset(self, minutes: int = 20) -> float: """Time during which subject was awake between sleep onset and offset (here defined as last epoch scored as sleep). Args: minutes (int, optional): Length of the time window, in minutes, used to calculate sleep onset (see sleep_onset for explanation). Defaults to 20. Returns: float: WASO (in minutes). """ sleep_end = np.where(self.score == 0)[0][-1] sleep_start = int(self.sleep_onset(minutes) * 60 * self._fs) return np.sum(self.score[sleep_start:sleep_end]) / self._fs / 60
[docs] def sleep_report(self, minutes: int = 20, length: int = 5): """Summary sleep report in stdout and in human-redable format. Args: minutes (int, optional): Number of consecutive minutes of sleep required when calculating sleep onset. Defaults to 20. length (int, optional): Minimal length of long sleep episode in minutes. Defaults to 5. """ print(f"Total sleep time: {round(self.total_sleep_time(), 2)} minutes") print(f"Sleep efficiency: {round(self.sleep_efficiency(), 2)}%") print(f"Sleep fragmentation index: {round(self.sleep_fragmentation_index(), 2)} [1/min]") print(f"Number of sleep episodes: {self.sleep_episodes()}") print(f"Number of long (>{length} min) sleep episodes {self.long_sleep_episodes(length)}") print(f"Mean lenght of sleep episode: {round(self.mean_sleep_episode(), 2)}") try: print(f"Sleep onset (with sleep >{minutes}): {round(self.sleep_onset(minutes), 2)} min") print(f"WASO (with sleep > {minutes}): {round(self.wake_after_sleep_onset(minutes), 2)} min") except RuntimeError: print(f"Subject was not sleeping for consecutive {minutes} during the recording. No sleep onset or WASO computed.")
[docs] @dataclass(slots=True, eq=False) class PSGScore: """A class for storing and handling PSG staging.""" _psg_stages: list | np.ndarray _start_timestamp: float _end_timestamp: float _epoch_length: float _collapse_stages: bool | Callable = True def _sleep_stage_heuristic(self, a) -> int | None: a = str(a) if '1' in a.lower(): return 0 elif '2' in a.lower(): return 0 elif '3' in a.lower(): return 0 elif 'art' in a.lower(): return None elif 'un' in a.lower(): return None elif 'r' in a.lower(): return 0 else: return 1 def __post_init__(self): if callable(self._collapse_stages): self._psg_stages = np.array([self._collapse_stages(e) for e in self._psg_stages], dtype=float) self._collapse_stages = True elif self._collapse_stages: self._psg_stages = np.array([self._sleep_stage_heuristic(e) for e in self._psg_stages], dtype=float)
[docs] def resample(self, new_fs: float) -> "PSGScore": """Resample scorings to new epoch length (primarly to synchronize them with fs of acigraphic data). Works when epoch length is longer than sample spacing in actigraphic data. Args: new_fs (float): Sampling frequency to which sleep/wake scorings will be resampled. Returns: PSGScore: New object with resampled scorings. """ n_up = int(self._epoch_length * new_fs) if n_up != self._epoch_length * new_fs: raise ValueError("WTF BRO???") psg_stages = np.repeat(self._psg_stages, n_up) new_end_ts = self._end_timestamp + self._epoch_length - 1 / new_fs out = PSGScore(psg_stages, self._start_timestamp, new_end_ts, self._epoch_length / n_up, False) out._collapse_stages = self._collapse_stages return out
[docs] def change_epoch(self, epoch_length: int, wake_percentage: float) -> "PSGScore": """Change epoch length from the original saved in tags to some other. Intended mainly to switch between 20, 30 and 60 second epochs and to downsample. Args: epoch_length (int): Output epoch length in seconds. wake_percentage (float): Parameter describing how many points inside a new epoch must be wake, so the new epoch will also be wake. Returns: PSGScore: New object with new epoch length. """ assert self._collapse_stages, "This method will not work for full staging. Use on collapsed sleep/wake scores." if epoch_length <= self._epoch_length: # This condition might be stupid return self.resample(1 / epoch_length) fs = 1 / self.epoch_length if self._epoch_length > 1: psg_stages = self.resample(1).psg_stages fs = 1 else: psg_stages = self._psg_stages wake_threshold = epoch_length * fs * wake_percentage n_epochs = int(len(psg_stages) / epoch_length / fs) new_stages = np.sum(np.array(np.split(psg_stages[:int(n_epochs * epoch_length * fs)], n_epochs)), axis=1) >= wake_threshold new_last_ts = self._start_timestamp + epoch_length * (n_epochs - 1) out = PSGScore(new_stages, self._start_timestamp, new_last_ts, epoch_length, False) out._collapse_stages = True return out
[docs] def export(self, path: str): """Save psg scorings as a .ada file. Args: path (str): Path to the output file. """ metadata = {'start_timestamp': self._start_timestamp, 'end_timestamp': self._end_timestamp, 'epoch_length': self._epoch_length, 'collapse_stages': self._collapse_stages} if not path.endswith('.ada'): path = path + '.ada' with open(path, 'wb') as f: np.savez_compressed(f, self._psg_stages, np.array(metadata))
[docs] @staticmethod def load_file(path: str) -> "PSGScore": """Load data saved by the export method. Args: path (str): Path to the file. Returns: PSGScore: Loaded data. """ files = np.load(path, allow_pickle=True) data = files['arr_0'] metadata = files['arr_1'].item() out = PSGScore(data, metadata['start_timestamp'], metadata['end_timestamp'], metadata['epoch_length'], False) out._collapse_stages = metadata['collapse_stages'] return out
@property def psg_stages(self) -> np.ndarray | list: """Sleep/wake scorings.""" return self._psg_stages @property def start_timestamp(self) -> float: """Unix timestamp of first epoch beginning.""" return self._start_timestamp @property def end_timestamp(self) -> float: """Unix timestamp of last epoch beginning.""" return self._end_timestamp @property def epoch_length(self) -> float: """Epoch length in seconds.""" return self._epoch_length