import numpy as np
import matplotlib.pyplot as plt
import tqdm
import scipy.signal as ss
import pprint
import csv
from typing import Tuple
from ada.data_containers._base import _ActiData
from ada.data_containers._long import _Long, _LongContainer
[docs]
class ARResults(_LongContainer):
"""A class for storing results of AR model fitting. Do not construct manually! The fit_params gives access to the following fields:
- coefficients: of fitted model
- noise std: standard deviations of model noise
- order: order of the model
- peak: position of the main peak near to the 24 hours
- preprocessing: whether it was used
- period ci: confidence interval of the peak position estimated using bootstrap or None.
"""
def __init__(self, a: np.ndarray, sigma: float, f: np.ndarray, s: np.ndarray, filter: bool, ci: Tuple[float, float] | None, id: str):
# find peak near 24 h
s_around_24 = s[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))]
f_around_24 = f[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))]
peak_pos = 1 / f_around_24[np.argmax(s_around_24)] / 3600
self._fit_params = {'coefficients': a,
'noise std': sigma,
'order': len(a),
'peak': peak_pos,
'preprocessing': filter,
'period ci': ci}
self._data = np.empty((2, len(f)))
self._data[0] = f
self._data[1] = s
self._fit_params['id'] = id
def __repr__(self):
to_print = {key: self._fit_params[key] for key in self._fit_params.keys() - {'coefficients', 'noise std'}}
return pprint.pformat(to_print, indent=4)
[docs]
def plot(self, out_path: str | None = None):
"""Plot the power spectrum in the low frequency range together with the position of the main peak.
Args:
out_path (str | None): Path to save the plot. If None, the plot will be opened in interactive window. Defaults to None.
"""
plt.figure("Autoregressive model spectrum")
plt.semilogx(self._data[0], self._data[1])
plt.vlines([1 / 24 / 3600], [0], [np.max(self._data[1])], colors=['red'])
plt.title("order {}, peak in {} h".format(self._fit_params['order'], round(self._fit_params['peak'], 3)))
plt.xlabel('Frequency [Hz]')
plt.ylabel(r'Power density [V$^2$/Hz]')
if out_path is None:
plt.show()
else:
plt.savefig(out_path)
plt.close()
@staticmethod
def _construct_from_load(data: np.ndarray, fit: dict) -> "ARResults":
out = ARResults(np.array([]), 0., np.array([1e-5]), np.array([1]), True, None, '')
out._data = data
out._fit_params = fit
return out
[docs]
@staticmethod
def load_file(path: str) -> "ARResults":
"""Load model fitting results from .ada.long file (created via export method).
Args:
path (str): Path to file.
Returns:
ARResults: Object containing fitted model.
"""
data, fit_params = ARResults._generic_load(path)
return ARResults._construct_from_load(data, fit_params)
[docs]
def export(self, out_path: str):
"""Save data (both parameters of model and spectrum data) to compressed generic file.
Args:
out_path (str): Path to the out file.
"""
to_export = [self._data, self._fit_params]
ARResults._generic_export(out_path, to_export)
[docs]
def save_csv(self, out_path: str):
"""Save parameters of fitted model to human-readable csv.
Args:
out_path (str): Path to the out file.
"""
with open(out_path, 'w', newline='') as f:
writer = csv.writer(f)
for key in ['order', 'peak', 'period ci', 'preprocessing']:
writer.writerow([key, self._fit_params[key]])
@property
def freq(self) -> np.ndarray:
"""Frequency vector (x-axis of spectrum)."""
return self._data[0]
@property
def spectrum(self) -> np.ndarray:
"""Power density calculated for given frequencies."""
return self._data[1]
@property
def id(self) -> str:
"""ID of a recording to which long estimate was fit."""
return self._fit_params['id']
[docs]
class Spectrum(_Long):
"""A class for fitting an autoregressive (AR) model to the actigraphic data, and estimating circadian rhythm on this basis.
Args:
order (int | None, optional): Order of the model. If None, it will be selected automatically based on a heuristic (order = number of samples spanning 30 hours in the data). Defaults to None.
preprocessing (bool, optional): Whether to filter data before model fitting. Defaults to False.
use_bootstrap (bool, optional): Whether to estimate period confidence interval using bootrstrap. Might take some time, especially for shorter epoch lenghts. Defaults to False.
"""
def __init__(self, order: int | None = None, preprocessing: bool = False, use_bootstrap: bool = False):
self._filter = preprocessing
self._order = order
self._use_bootstrap = use_bootstrap
[docs]
def fit(self, data: _ActiData, ch_name: str | None = None):
"""Fit an AR model to the provided actiraphic data. It is highly recomended to epoch them using Downsampler, as other methods might not work at all with AR.
Args:
data (_ActiData): Actigraphic data.
ch_name (str | None, optional): Channel to which model will be fitted. to_score channel, when None. Defaults to None.
Returns:
ARResults: Object containing results of the model fitting.
"""
y = self._get_data(data, ch_name)
y = y - np.mean(y)
if self._filter:
sos = ss.iirdesign(1 / (3600 * 9), 1 / (3600 * 3), 2, 30, ftype='butter', output='sos', fs=data.fs)
y = ss.sosfiltfilt(sos, y)
if self._order is None:
order = int(30 * 3600 * data.fs)
else:
order = self._order
n = 75000
f = np.linspace(1 / (8 * 3600), 1 / (40 * 3600), n)
a, sigma = Spectrum._ar_params(y, order)
_, s = Spectrum._ar_spectrum(a, sigma, n, data.fs, f)
if self._use_bootstrap:
p_value = self._bootstrap(y, n, f, order, data.fs)
else:
p_value = None
return ARResults(a, sigma, f, s, self._filter, p_value, data.id)
def _bootstrap(self, y: np.ndarray, n: int, f: np.ndarray, order: int, fs: float):
n_iterations = int(order / 10)
orders = np.arange(-n_iterations // 2, n_iterations // 2) + order
out = np.empty(n_iterations)
for i, ord in tqdm.tqdm(enumerate(orders), desc="Calculating confidence intervals using bootstrap", total=n_iterations):
a, sigma = Spectrum._ar_params(y, ord)
_, s = Spectrum._ar_spectrum(a, sigma, n, fs, f)
s_around_24 = s[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))]
f_around_24 = f[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))]
peak_pos = 1 / f_around_24[np.argmax(s_around_24)] / 3600
out[i] = peak_pos
high = np.nanpercentile(out, 97.5)
low = np.nanpercentile(out, 2.5)
return (float(low), float(high))
@staticmethod
def _ar_params(x: np.ndarray, p: int):
n = len(x)
ak = np.correlate(x, x, mode='full')
norm_ak = np.hstack((np.arange(1, n + 1, 1), np.arange(n - 1, 0, -1)))
ak = ak / norm_ak
r = ak[n - 1:]
rl = r[1:1 + p]
rp = np.zeros((p, p))
for i in range(p):
aa = ak[n - 1 - i:n - 1 - i + p]
rp[i, :] = aa
a = np.linalg.solve(rp, rl)
sigma = (ak[n - 1] - np.sum(a * ak[n:n + p])) ** 0.5
return a, sigma
@staticmethod
def _aic_criterium(x: np.ndarray, p_max: int):
n = len(x)
aic = np.zeros(p_max)
for p in range(1, p_max + 1):
_, sigma = Spectrum._ar_params(x, p)
aic[p - 1] = 2 * p / n + np.log(sigma)
return aic
@staticmethod
def _generate_ar(a: np.ndarray, sigma: float, n: int):
x = np.zeros(n)
p = len(a)
for i in range(p, n):
for j in range(len(a)):
x[i] += a[j] * x[i - (j + 1)]
x[i] += sigma * np.random.randn()
return x
@staticmethod
def _ar_spectrum(a: np.ndarray, sigma: float, n: int, fs: float, f: np.ndarray | None = None):
if f is None:
f = np.linspace(0, 0.001, n)
z = np.exp(1j * 2 * np.pi * f / fs)
A = 1 * np.ones(n) + 1j * np.zeros(n)
for i in range(len(a)):
A -= a[i] * z ** (-(i + 1))
H = 1. / A
sp = H * sigma ** 2 * H.conj()
sp = sp / fs
sp = sp.real
return f, sp