Source code for ada.long.nonparametric

import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt
import tqdm
from multiprocess.pool import Pool
from math import ceil
from typing import Tuple, Any
import piecewise_regression as pr
import csv

from ada.data_containers._base import _ActiData
from ada.data_containers._long import _Long, _LongContainer


[docs] class DFAResults(_LongContainer): """A container for results of fitting DFA model to the actigraphic data. Do not construct manually! The following parameters are available in the fit_params: - exponent: the exponent of the whole data over all fluctuation range; - exponent error: the error of exponent estimation; - scale range: range of scales used during the fitting; - segments: segments in which the data was divided; - trend degree: trend degree used during the detrending prior to fitting model. """ def __init__(self, scales: np.ndarray, fluctuations: np.ndarray, exponent: Tuple[float, float], scale_range: Tuple[float, float], crossovers: Tuple[dict | None, np.ndarray], degree: int, id: str): self._fit_params = {'exponent': exponent[0], 'exponent error': exponent[1], 'scale range': (scale_range[0] / 3600, scale_range[1] / 3600), 'segments': crossovers[0], 'trend degree': degree} self._data = np.empty((3, len(scales))) self._data[0] = scales self._data[1] = fluctuations self._data[2] = crossovers[1] # fitted lines self._fit_params['id'] = id
[docs] def plot(self, out_path: str | None = None): """Plot the semi-logarithmic fluctuations plot. Args: out_path (str | None, optional): Path to save the figure. If None, it will be shown in interactive mode. Defaults to None. """ plt.figure("Fluctuations plot") plt.semilogx(self._data[0], np.log(self._data[1])) plt.semilogx(self._data[0], self._data[2]) plt.xlabel('Scale [h]') plt.ylabel('Fluctuations logarithm') plt.vlines(self.crossovers[0], np.min(self._data[2]), np.max(self._data[2]), colors='red', alpha=0.5) if out_path is None: plt.show() else: plt.savefig(out_path) plt.close()
@staticmethod def _construct_from_load(data: np.ndarray, fit: dict) -> "DFAResults": out = DFAResults(np.array([]), np.array([]), (0., 0.), (0., 0.), (None, np.array([])), 1, '') out._data = data out._fit_params = fit return out
[docs] @staticmethod def load_file(path: str) -> "DFAResults": """Load model fitting results from .ada.long file (created via export method). Args: path (str): Path to file. Returns: DFAResults: Object containing fitted model. """ data, fit_params = DFAResults._generic_load(path) return DFAResults._construct_from_load(data, fit_params)
[docs] def export(self, out_path: str): """Save data (both parameters of model and fluctuations data) to compressed generic file. Args: out_path (str): Path to the out file. """ to_export = [self._data, self._fit_params] DFAResults._generic_export(out_path, to_export)
[docs] def save_csv(self, out_path: str): """Save parameters of fitted model to human-readable csv. Args: out_path (str): Path to the out file. """ with open(out_path, 'w', newline='') as f: writer = csv.writer(f) for key in ['exponent', 'exponent error', 'scale range', 'trend degree']: writer.writerow([key, self._fit_params[key]]) for _ in range(3): writer.writerow('') writer.writerow(['piecewise fit info', '']) writer.writerow('') for key in self._fit_params['segments'].keys(): writer.writerow([key, '']) for subkey in self._fit_params['segments'][key]: writer.writerow([subkey, self._fit_params['segments'][key][subkey]]) writer.writerow('')
@property def scales(self) -> np.ndarray: """Array of scale for which fluctuations were computed.""" return self._data[0] @property def fluctuations(self) -> np.ndarray: """Array of obtained fluctuations at different scales.""" return self._data[1] @property def piecewise_fit(self) -> np.ndarray: """Array with reconstruction of fitted piecewise linear function.""" return self._data[2] @property def crossovers(self) -> Tuple[list[float], list[Tuple[float, float]]]: """List with positions of crossovers (in hours) and list of their confidence intervals.""" cross = [] ci = [] for key in self._fit_params['segments']: if 'crossover' in key: cross.append(self._fit_params['segments'][key]['crossover']) ci.append(self._fit_params['segments'][key]['ci']) return cross, ci @property def exponents(self) -> Tuple[list[float], list[float]]: """List with exponents and list of their errors.""" expo = [] err = [] for key in self._fit_params['segments']: if 'segment' in key: expo.append(self._fit_params['segments'][key]['exponent']) err.append(self._fit_params['segments'][key]['exponent_error']) return expo, err @property def id(self) -> str: """ID of a recording to which long estimate was fit.""" return self._fit_params['id']
[docs] class DFA(_Long): """A class for estimating Hurst exponent of the data using Detrended Fluctuation Analysis. Attributes: n_windows (int): Number of scales that will be taken into account during computation. Defaults to 200. scale_range (Tuple[float, float]): Range of scales in hours. Defaults to (1 / 6, 48). trend_degree (int): Degree of polynomial function describing trend in the integrated data (degree of trend in original data is equal to degree of trend in integrated data - 1). Defaults to 1. n_crossovers (int | None): Number of crossover points expected in the fluctuation graphs, assuming that the data is described by n_crossovers + 1 fractals. If None, optimal number of crossovers in the range 0-10 will be found using Bayes Inforation Crieteria. Defaults to 1. """ def __init__(self, n_windows: int = 200, scale_range: Tuple[float, float] = (1 / 6, 48), trend_degree: int = 1, n_crossovers: int | None = 1): self._n_windows = n_windows self._scale_range = (scale_range[0] * 3600, scale_range[1] * 3600) self._trend_degree = trend_degree self._n_crossovers = n_crossovers def _calc_fluctuation(self, window: np.ndarray) -> float: x = np.arange(0, len(window), dtype=float) a = np.vstack([x, np.ones_like(x)]).T _, residuals, _, _ = np.linalg.lstsq(a, window, rcond=None) return residuals[0] / len(window) def _calc_fluctuation_poly(self, window: np.ndarray) -> float: x = np.arange(0, len(window), dtype=float) _, residuals = np.polynomial.polynomial.Polynomial.fit(x, window, self._trend_degree, full=True) return float(residuals[0]) / len(window)
[docs] def fit(self, data: _ActiData, ch_name: str | None = None): """Fits the specified by the object fractal model to provided data. Args: data (_ActiData): Actigraphic data to be fitted with cosinor method. ch_name (str | None, optional): Channel to which model will be fitted. If None, it will be to_score in case of Raw or Epoched data and score in case of ScoredShort data. Defaults to None. Returns: DFAResults: Object containing fitting results. """ y = self._get_data(data, ch_name) integrated = np.cumsum(y - np.mean(y)) window_lengths = np.geomspace(self._scale_range[0] * data.fs, self._scale_range[1] * data.fs, self._n_windows, dtype=int) fluctuations = np.empty_like(window_lengths, dtype=float) fluctuation_func = self._calc_fluctuation if self._trend_degree == 1 else self._calc_fluctuation_poly # calculating fluctuations with respect to scale pool = Pool() for i, n in tqdm.tqdm(enumerate(window_lengths), desc="Calculating DFA for different scales", total=len(window_lengths)): trimmed = integrated[:int((len(integrated) // n) * n)] trimmed_reversed = integrated[len(integrated) - len(trimmed):] n_segments = len(trimmed) // n windowed = np.split(trimmed, n_segments) windowed_reversed = np.split(trimmed_reversed, n_segments) chunksize = ceil(len(windowed) / len(pool._pool)) temp = np.array(list(pool.imap(fluctuation_func, windowed, chunksize=chunksize))) temp_reversed = np.array(list(pool.imap(fluctuation_func, windowed_reversed, chunksize=chunksize))) fluctuations[i] = np.sqrt((np.mean(temp) + np.mean(temp_reversed)) / 2) # fit line to the obtained fluctuations (Hurst exponent estimation) lin_fit = st.linregress(np.log(window_lengths), np.log(fluctuations)) alpha = (lin_fit.slope, lin_fit.stderr) # search for scale at which slope changes if self._n_crossovers is not None: fit = pr.Fit(np.log(window_lengths), np.log(fluctuations), n_breakpoints=self._n_crossovers) n_crossovers = self._n_crossovers crossovers = fit.get_results()['estimates'] y_pred: np.ndarray | None = fit.predict(np.log(window_lengths)) else: selection = pr.ModelSelection(np.log(window_lengths), np.log(fluctuations), max_breakpoints=10) best_idx = np.nanargmin([e['bic'] if e['bic'] is not None else np.nan for e in selection.model_summaries]) if best_idx > 0: fit = selection.models[best_idx - 1] y_pred = fit.predict(np.log(window_lengths)) else: y_pred = None n_crossovers = selection.model_summaries[best_idx]['n_breakpoints'] crossovers = selection.model_summaries[best_idx]['estimates'] if y_pred is not None: crossovers_out: dict[Any, Any] = {} crossovers_tupple: Tuple[dict | None, np.ndarray] for i in range(1, n_crossovers + 2): temp_1: dict[str, Any] = {'exponent': crossovers[f'alpha{i}']['estimate'], 'exponent_error': crossovers[f'alpha{i}']['se']} crossovers_out[f'segment {i}'] = temp_1 for i in range(1, n_crossovers + 1): temp_1 = {'crossover': np.exp(crossovers[f'breakpoint{i}']['estimate']) / data.fs / 3600, 'ci': (np.exp(crossovers[f'breakpoint{i}']['confidence_interval'][0]) / data.fs / 3600, np.exp(crossovers[f'breakpoint{i}']['confidence_interval'][1]) / data.fs / 3600)} crossovers_out[f'crossover {i}'] = temp_1 crossovers_tupple = (crossovers_out, y_pred) else: if self._n_crossovers is not None: print("Crossovers model did not converge. Assuming no crossovers.") temp = lin_fit.slope * np.log(window_lengths) + lin_fit.intercept crossovers_tupple = (None, temp) return DFAResults(window_lengths / data.fs / 3600, fluctuations, alpha, self._scale_range, crossovers_tupple, self._trend_degree, data.id)
[docs] class SimpleNonparametricResults(_LongContainer): """Container for simple nonparametric metrics. Do not construct manually! Following fields are available in fit params (coresponding to the fitted metrics): - IS - IV - M10 - L5 """ def __init__(self, interdaily_stability, intradaily_variability, m10, l5, id): self._fit_params = {'IS': interdaily_stability, 'IV': intradaily_variability, 'M10': m10, 'L5': l5, 'id': id}
[docs] def plot(self, out_path: str | None = None): """There is absolutely nothing to plot when it comes to this metrics.""" pass
[docs] def export(self, out_path: str): """Save data to compressed generic file. Args: out_path (str): Path to the out file. """ to_export = [np.array(None), self._fit_params] self._generic_export(out_path, to_export)
[docs] @staticmethod def load_file(path) -> "SimpleNonparametricResults": """Load model fitting results from .ada.long file (created via export method). Args: path (str): Path to file. Returns: SimpleNonparametricResults: Object containing fitted model. """ data, fit_params = SimpleNonparametricResults._generic_load(path) return SimpleNonparametricResults._construct_from_load(data, fit_params)
@staticmethod def _construct_from_load(data: np.ndarray[Tuple[Any], np.dtype[Any]], fit: dict) -> "SimpleNonparametricResults": return SimpleNonparametricResults(fit['IS'], fit['IV'], fit['M10'], fit['L5'], fit['id'])
[docs] def save_csv(self, out_path: str): """Save parameters of fitted model to human-readable csv. Args: out_path (str): Path to the out file. """ with open(out_path, 'w', newline='') as f: writer = csv.writer(f) for key in ['IS', 'IV', 'M10', 'L5']: writer.writerow([key, self._fit_params[key]])
@property def id(self) -> str: """ID of a recording to which long estimate was fit.""" return self._fit_params['id']
[docs] class SimpleNonparametric(_Long): """A class for estimating basic metrics reflecting 24 hour periodicity in the data. These are: - Interdaily stability (IS): reflects how similar days are to each other in 24 hour windows. Contained in [0, 1] with higher values meaning higher regularity. - Intradaily variability (IV): reflects variability of activity levels during the day; higher values mean higher fragmentation (potential circadian disturbances). - M10: mean activity in the most active 10 hours during each day. - L5: mean activity in the least active 5 hours during each day. For details see Witting W, Kwa IH, Eikelenboom P, Mirmiran M, Swaab DF. Alterations in the circadian rest-activity rhythm in aging and Alzheimer's disease. Biol Psychiatry. 1990 Mar 15;27(6):563-72. """ def __init__(self): self.window_size = 60 # in minutes for some reason
[docs] def fit(self, data: _ActiData, ch_name: str | None = None): """Computes basic nonparametric metrics estimating 24 hour periodicity in the data. Args: data (_ActiData): Input data. ch_name (str | None, optional): Channel on which computation will be done. If None, default will be used. Defaults to None. Returns: SimpleNonparametricResults: Container with all the results. """ y = self._get_data(data, ch_name) # Interdaily stability n_windows = int(len(y) / (self.window_size * data.fs * 60)) trimmed = y[:int(n_windows * data.fs * 60 * self.window_size)] windowed = np.mean(np.array(np.split(trimmed, n_windows)), axis=1) interdaily_stability = np.var([np.mean(windowed[i::24]) for i in range(24)], ddof=0) / np.var(windowed, ddof=0) intradaily_variability = np.sum(np.diff(y) ** 2) / np.var(y, ddof=0) / (len(y) - 1) # l5 and m10 hour_means = [np.mean(windowed[i::24]) for i in range(24)] hour_means.sort() l5 = np.mean(hour_means[:5]) m10 = np.mean(hour_means[14:]) return SimpleNonparametricResults(interdaily_stability, intradaily_variability, m10, l5, data.id)
# TODO maybe some day. Not really that urgent. # http://info.ifpan.edu.pl/ON-2/on26/on26sub/plikipdf/boepj20.pdf # https://arxiv.org/pdf/1805.08931 # class RescaledRange(_Long): # def __init__(self, scale_range: Tuple[float, float] = (1 / 6, 48)): # self._scale_range = scale_range