import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt
import tqdm
from multiprocess.pool import Pool
from math import ceil
from typing import Tuple, Any
import piecewise_regression as pr
import csv
from ada.data_containers._base import _ActiData
from ada.data_containers._long import _Long, _LongContainer
[docs]
class DFAResults(_LongContainer):
"""A container for results of fitting DFA model to the actigraphic data. Do not construct manually! The following parameters are available in the fit_params:
- exponent: the exponent of the whole data over all fluctuation range;
- exponent error: the error of exponent estimation;
- scale range: range of scales used during the fitting;
- segments: segments in which the data was divided;
- trend degree: trend degree used during the detrending prior to fitting model.
"""
def __init__(self, scales: np.ndarray, fluctuations: np.ndarray, exponent: Tuple[float, float], scale_range: Tuple[float, float], crossovers: Tuple[dict | None, np.ndarray], degree: int, id: str):
self._fit_params = {'exponent': exponent[0],
'exponent error': exponent[1],
'scale range': (scale_range[0] / 3600, scale_range[1] / 3600),
'segments': crossovers[0],
'trend degree': degree}
self._data = np.empty((3, len(scales)))
self._data[0] = scales
self._data[1] = fluctuations
self._data[2] = crossovers[1] # fitted lines
self._fit_params['id'] = id
[docs]
def plot(self, out_path: str | None = None):
"""Plot the semi-logarithmic fluctuations plot.
Args:
out_path (str | None, optional): Path to save the figure. If None, it will be shown in interactive mode. Defaults to None.
"""
plt.figure("Fluctuations plot")
plt.semilogx(self._data[0], np.log(self._data[1]))
plt.semilogx(self._data[0], self._data[2])
plt.xlabel('Scale [h]')
plt.ylabel('Fluctuations logarithm')
plt.vlines(self.crossovers[0], np.min(self._data[2]), np.max(self._data[2]), colors='red', alpha=0.5)
if out_path is None:
plt.show()
else:
plt.savefig(out_path)
plt.close()
@staticmethod
def _construct_from_load(data: np.ndarray, fit: dict) -> "DFAResults":
out = DFAResults(np.array([]), np.array([]), (0., 0.), (0., 0.), (None, np.array([])), 1, '')
out._data = data
out._fit_params = fit
return out
[docs]
@staticmethod
def load_file(path: str) -> "DFAResults":
"""Load model fitting results from .ada.long file (created via export method).
Args:
path (str): Path to file.
Returns:
DFAResults: Object containing fitted model.
"""
data, fit_params = DFAResults._generic_load(path)
return DFAResults._construct_from_load(data, fit_params)
[docs]
def export(self, out_path: str):
"""Save data (both parameters of model and fluctuations data) to compressed generic file.
Args:
out_path (str): Path to the out file.
"""
to_export = [self._data, self._fit_params]
DFAResults._generic_export(out_path, to_export)
[docs]
def save_csv(self, out_path: str):
"""Save parameters of fitted model to human-readable csv.
Args:
out_path (str): Path to the out file.
"""
with open(out_path, 'w', newline='') as f:
writer = csv.writer(f)
for key in ['exponent', 'exponent error', 'scale range', 'trend degree']:
writer.writerow([key, self._fit_params[key]])
for _ in range(3):
writer.writerow('')
writer.writerow(['piecewise fit info', ''])
writer.writerow('')
for key in self._fit_params['segments'].keys():
writer.writerow([key, ''])
for subkey in self._fit_params['segments'][key]:
writer.writerow([subkey, self._fit_params['segments'][key][subkey]])
writer.writerow('')
@property
def scales(self) -> np.ndarray:
"""Array of scale for which fluctuations were computed."""
return self._data[0]
@property
def fluctuations(self) -> np.ndarray:
"""Array of obtained fluctuations at different scales."""
return self._data[1]
@property
def piecewise_fit(self) -> np.ndarray:
"""Array with reconstruction of fitted piecewise linear function."""
return self._data[2]
@property
def crossovers(self) -> Tuple[list[float], list[Tuple[float, float]]]:
"""List with positions of crossovers (in hours) and list of their confidence intervals."""
cross = []
ci = []
for key in self._fit_params['segments']:
if 'crossover' in key:
cross.append(self._fit_params['segments'][key]['crossover'])
ci.append(self._fit_params['segments'][key]['ci'])
return cross, ci
@property
def exponents(self) -> Tuple[list[float], list[float]]:
"""List with exponents and list of their errors."""
expo = []
err = []
for key in self._fit_params['segments']:
if 'segment' in key:
expo.append(self._fit_params['segments'][key]['exponent'])
err.append(self._fit_params['segments'][key]['exponent_error'])
return expo, err
@property
def id(self) -> str:
"""ID of a recording to which long estimate was fit."""
return self._fit_params['id']
[docs]
class DFA(_Long):
"""A class for estimating Hurst exponent of the data using Detrended Fluctuation Analysis.
Attributes:
n_windows (int): Number of scales that will be taken into account during computation. Defaults to 200.
scale_range (Tuple[float, float]): Range of scales in hours. Defaults to (1 / 6, 48).
trend_degree (int): Degree of polynomial function describing trend in the integrated data (degree of trend in original data is equal to degree of trend in integrated data - 1). Defaults to 1.
n_crossovers (int | None): Number of crossover points expected in the fluctuation graphs, assuming that the data is described by n_crossovers + 1 fractals. If None, optimal number of crossovers in the range 0-10 will be found using Bayes Inforation Crieteria. Defaults to 1.
"""
def __init__(self, n_windows: int = 200, scale_range: Tuple[float, float] = (1 / 6, 48), trend_degree: int = 1, n_crossovers: int | None = 1):
self._n_windows = n_windows
self._scale_range = (scale_range[0] * 3600, scale_range[1] * 3600)
self._trend_degree = trend_degree
self._n_crossovers = n_crossovers
def _calc_fluctuation(self, window: np.ndarray) -> float:
x = np.arange(0, len(window), dtype=float)
a = np.vstack([x, np.ones_like(x)]).T
_, residuals, _, _ = np.linalg.lstsq(a, window, rcond=None)
return residuals[0] / len(window)
def _calc_fluctuation_poly(self, window: np.ndarray) -> float:
x = np.arange(0, len(window), dtype=float)
_, residuals = np.polynomial.polynomial.Polynomial.fit(x, window, self._trend_degree, full=True)
return float(residuals[0]) / len(window)
[docs]
def fit(self, data: _ActiData, ch_name: str | None = None):
"""Fits the specified by the object fractal model to provided data.
Args:
data (_ActiData): Actigraphic data to be fitted with cosinor method.
ch_name (str | None, optional): Channel to which model will be fitted. If None, it will be to_score in case of Raw or Epoched data and score in case of ScoredShort data. Defaults to None.
Returns:
DFAResults: Object containing fitting results.
"""
y = self._get_data(data, ch_name)
integrated = np.cumsum(y - np.mean(y))
window_lengths = np.geomspace(self._scale_range[0] * data.fs, self._scale_range[1] * data.fs, self._n_windows, dtype=int)
fluctuations = np.empty_like(window_lengths, dtype=float)
fluctuation_func = self._calc_fluctuation if self._trend_degree == 1 else self._calc_fluctuation_poly
# calculating fluctuations with respect to scale
pool = Pool()
for i, n in tqdm.tqdm(enumerate(window_lengths), desc="Calculating DFA for different scales", total=len(window_lengths)):
trimmed = integrated[:int((len(integrated) // n) * n)]
trimmed_reversed = integrated[len(integrated) - len(trimmed):]
n_segments = len(trimmed) // n
windowed = np.split(trimmed, n_segments)
windowed_reversed = np.split(trimmed_reversed, n_segments)
chunksize = ceil(len(windowed) / len(pool._pool))
temp = np.array(list(pool.imap(fluctuation_func, windowed, chunksize=chunksize)))
temp_reversed = np.array(list(pool.imap(fluctuation_func, windowed_reversed, chunksize=chunksize)))
fluctuations[i] = np.sqrt((np.mean(temp) + np.mean(temp_reversed)) / 2)
# fit line to the obtained fluctuations (Hurst exponent estimation)
lin_fit = st.linregress(np.log(window_lengths), np.log(fluctuations))
alpha = (lin_fit.slope, lin_fit.stderr)
# search for scale at which slope changes
if self._n_crossovers is not None:
fit = pr.Fit(np.log(window_lengths), np.log(fluctuations), n_breakpoints=self._n_crossovers)
n_crossovers = self._n_crossovers
crossovers = fit.get_results()['estimates']
y_pred: np.ndarray | None = fit.predict(np.log(window_lengths))
else:
selection = pr.ModelSelection(np.log(window_lengths), np.log(fluctuations), max_breakpoints=10)
best_idx = np.nanargmin([e['bic'] if e['bic'] is not None else np.nan for e in selection.model_summaries])
if best_idx > 0:
fit = selection.models[best_idx - 1]
y_pred = fit.predict(np.log(window_lengths))
else:
y_pred = None
n_crossovers = selection.model_summaries[best_idx]['n_breakpoints']
crossovers = selection.model_summaries[best_idx]['estimates']
if y_pred is not None:
crossovers_out: dict[Any, Any] = {}
crossovers_tupple: Tuple[dict | None, np.ndarray]
for i in range(1, n_crossovers + 2):
temp_1: dict[str, Any] = {'exponent': crossovers[f'alpha{i}']['estimate'],
'exponent_error': crossovers[f'alpha{i}']['se']}
crossovers_out[f'segment {i}'] = temp_1
for i in range(1, n_crossovers + 1):
temp_1 = {'crossover': np.exp(crossovers[f'breakpoint{i}']['estimate']) / data.fs / 3600,
'ci': (np.exp(crossovers[f'breakpoint{i}']['confidence_interval'][0]) / data.fs / 3600,
np.exp(crossovers[f'breakpoint{i}']['confidence_interval'][1]) / data.fs / 3600)}
crossovers_out[f'crossover {i}'] = temp_1
crossovers_tupple = (crossovers_out, y_pred)
else:
if self._n_crossovers is not None:
print("Crossovers model did not converge. Assuming no crossovers.")
temp = lin_fit.slope * np.log(window_lengths) + lin_fit.intercept
crossovers_tupple = (None, temp)
return DFAResults(window_lengths / data.fs / 3600, fluctuations, alpha, self._scale_range, crossovers_tupple, self._trend_degree, data.id)
[docs]
class SimpleNonparametricResults(_LongContainer):
"""Container for simple nonparametric metrics. Do not construct manually! Following fields are available in fit params (coresponding to the fitted metrics):
- IS
- IV
- M10
- L5
"""
def __init__(self, interdaily_stability, intradaily_variability, m10, l5, id):
self._fit_params = {'IS': interdaily_stability, 'IV': intradaily_variability, 'M10': m10, 'L5': l5, 'id': id}
[docs]
def plot(self, out_path: str | None = None):
"""There is absolutely nothing to plot when it comes to this metrics."""
pass
[docs]
def export(self, out_path: str):
"""Save data to compressed generic file.
Args:
out_path (str): Path to the out file.
"""
to_export = [np.array(None), self._fit_params]
self._generic_export(out_path, to_export)
[docs]
@staticmethod
def load_file(path) -> "SimpleNonparametricResults":
"""Load model fitting results from .ada.long file (created via export method).
Args:
path (str): Path to file.
Returns:
SimpleNonparametricResults: Object containing fitted model.
"""
data, fit_params = SimpleNonparametricResults._generic_load(path)
return SimpleNonparametricResults._construct_from_load(data, fit_params)
@staticmethod
def _construct_from_load(data: np.ndarray[Tuple[Any], np.dtype[Any]], fit: dict) -> "SimpleNonparametricResults":
return SimpleNonparametricResults(fit['IS'], fit['IV'], fit['M10'], fit['L5'], fit['id'])
[docs]
def save_csv(self, out_path: str):
"""Save parameters of fitted model to human-readable csv.
Args:
out_path (str): Path to the out file.
"""
with open(out_path, 'w', newline='') as f:
writer = csv.writer(f)
for key in ['IS', 'IV', 'M10', 'L5']:
writer.writerow([key, self._fit_params[key]])
@property
def id(self) -> str:
"""ID of a recording to which long estimate was fit."""
return self._fit_params['id']
[docs]
class SimpleNonparametric(_Long):
"""A class for estimating basic metrics reflecting 24 hour periodicity in the data. These are:
- Interdaily stability (IS): reflects how similar days are to each other in 24 hour windows. Contained in [0, 1] with higher values meaning higher regularity.
- Intradaily variability (IV): reflects variability of activity levels during the day; higher values mean higher fragmentation (potential circadian disturbances).
- M10: mean activity in the most active 10 hours during each day.
- L5: mean activity in the least active 5 hours during each day.
For details see Witting W, Kwa IH, Eikelenboom P, Mirmiran M, Swaab DF. Alterations in the circadian rest-activity rhythm in aging and Alzheimer's disease. Biol Psychiatry. 1990 Mar 15;27(6):563-72.
"""
def __init__(self):
self.window_size = 60 # in minutes for some reason
[docs]
def fit(self, data: _ActiData, ch_name: str | None = None):
"""Computes basic nonparametric metrics estimating 24 hour periodicity in the data.
Args:
data (_ActiData): Input data.
ch_name (str | None, optional): Channel on which computation will be done. If None, default will be used. Defaults to None.
Returns:
SimpleNonparametricResults: Container with all the results.
"""
y = self._get_data(data, ch_name)
# Interdaily stability
n_windows = int(len(y) / (self.window_size * data.fs * 60))
trimmed = y[:int(n_windows * data.fs * 60 * self.window_size)]
windowed = np.mean(np.array(np.split(trimmed, n_windows)), axis=1)
interdaily_stability = np.var([np.mean(windowed[i::24]) for i in range(24)], ddof=0) / np.var(windowed, ddof=0)
intradaily_variability = np.sum(np.diff(y) ** 2) / np.var(y, ddof=0) / (len(y) - 1)
# l5 and m10
hour_means = [np.mean(windowed[i::24]) for i in range(24)]
hour_means.sort()
l5 = np.mean(hour_means[:5])
m10 = np.mean(hour_means[14:])
return SimpleNonparametricResults(interdaily_stability, intradaily_variability, m10, l5, data.id)
# TODO maybe some day. Not really that urgent.
# http://info.ifpan.edu.pl/ON-2/on26/on26sub/plikipdf/boepj20.pdf
# https://arxiv.org/pdf/1805.08931
# class RescaledRange(_Long):
# def __init__(self, scale_range: Tuple[float, float] = (1 / 6, 48)):
# self._scale_range = scale_range