Source code for ada.long.ar

import numpy as np
import matplotlib.pyplot as plt
import tqdm
import scipy.signal as ss
import pprint
import csv
from typing import Tuple

from ada.data_containers._base import _ActiData
from ada.data_containers._long import _Long, _LongContainer


[docs] class ARResults(_LongContainer): """A class for storing results of AR model fitting. Do not construct manually! The fit_params gives access to the following fields: - coefficients: of fitted model - noise std: standard deviations of model noise - order: order of the model - peak: position of the main peak near to the 24 hours - preprocessing: whether it was used - period ci: confidence interval of the peak position estimated using bootstrap or None. """ def __init__(self, a: np.ndarray, sigma: float, f: np.ndarray, s: np.ndarray, filter: bool, ci: Tuple[float, float] | None, id: str): # find peak near 24 h s_around_24 = s[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))] f_around_24 = f[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))] peak_pos = 1 / f_around_24[np.argmax(s_around_24)] / 3600 self._fit_params = {'coefficients': a, 'noise std': sigma, 'order': len(a), 'peak': peak_pos, 'preprocessing': filter, 'period ci': ci} self._data = np.empty((2, len(f))) self._data[0] = f self._data[1] = s self._fit_params['id'] = id def __repr__(self): to_print = {key: self._fit_params[key] for key in self._fit_params.keys() - {'coefficients', 'noise std'}} return pprint.pformat(to_print, indent=4)
[docs] def plot(self, out_path: str | None = None): """Plot the power spectrum in the low frequency range together with the position of the main peak. Args: out_path (str | None): Path to save the plot. If None, the plot will be opened in interactive window. Defaults to None. """ plt.figure("Autoregressive model spectrum") plt.semilogx(self._data[0], self._data[1]) plt.vlines([1 / 24 / 3600], [0], [np.max(self._data[1])], colors=['red']) plt.title("order {}, peak in {} h".format(self._fit_params['order'], round(self._fit_params['peak'], 3))) plt.xlabel('Frequency [Hz]') plt.ylabel(r'Power density [V$^2$/Hz]') if out_path is None: plt.show() else: plt.savefig(out_path) plt.close()
@staticmethod def _construct_from_load(data: np.ndarray, fit: dict) -> "ARResults": out = ARResults(np.array([]), 0., np.array([1e-5]), np.array([1]), True, None, '') out._data = data out._fit_params = fit return out
[docs] @staticmethod def load_file(path: str) -> "ARResults": """Load model fitting results from .ada.long file (created via export method). Args: path (str): Path to file. Returns: ARResults: Object containing fitted model. """ data, fit_params = ARResults._generic_load(path) return ARResults._construct_from_load(data, fit_params)
[docs] def export(self, out_path: str): """Save data (both parameters of model and spectrum data) to compressed generic file. Args: out_path (str): Path to the out file. """ to_export = [self._data, self._fit_params] ARResults._generic_export(out_path, to_export)
[docs] def save_csv(self, out_path: str): """Save parameters of fitted model to human-readable csv. Args: out_path (str): Path to the out file. """ with open(out_path, 'w', newline='') as f: writer = csv.writer(f) for key in ['order', 'peak', 'period ci', 'preprocessing']: writer.writerow([key, self._fit_params[key]])
@property def freq(self) -> np.ndarray: """Frequency vector (x-axis of spectrum).""" return self._data[0] @property def spectrum(self) -> np.ndarray: """Power density calculated for given frequencies.""" return self._data[1] @property def id(self) -> str: """ID of a recording to which long estimate was fit.""" return self._fit_params['id']
[docs] class Spectrum(_Long): """A class for fitting an autoregressive (AR) model to the actigraphic data, and estimating circadian rhythm on this basis. Args: order (int | None, optional): Order of the model. If None, it will be selected automatically based on a heuristic (order = number of samples spanning 30 hours in the data). Defaults to None. preprocessing (bool, optional): Whether to filter data before model fitting. Defaults to False. use_bootstrap (bool, optional): Whether to estimate period confidence interval using bootrstrap. Might take some time, especially for shorter epoch lenghts. Defaults to False. """ def __init__(self, order: int | None = None, preprocessing: bool = False, use_bootstrap: bool = False): self._filter = preprocessing self._order = order self._use_bootstrap = use_bootstrap
[docs] def fit(self, data: _ActiData, ch_name: str | None = None): """Fit an AR model to the provided actiraphic data. It is highly recomended to epoch them using Downsampler, as other methods might not work at all with AR. Args: data (_ActiData): Actigraphic data. ch_name (str | None, optional): Channel to which model will be fitted. to_score channel, when None. Defaults to None. Returns: ARResults: Object containing results of the model fitting. """ y = self._get_data(data, ch_name) y = y - np.mean(y) if self._filter: sos = ss.iirdesign(1 / (3600 * 9), 1 / (3600 * 3), 2, 30, ftype='butter', output='sos', fs=data.fs) y = ss.sosfiltfilt(sos, y) if self._order is None: order = int(30 * 3600 * data.fs) else: order = self._order n = 75000 f = np.linspace(1 / (8 * 3600), 1 / (40 * 3600), n) a, sigma = Spectrum._ar_params(y, order) _, s = Spectrum._ar_spectrum(a, sigma, n, data.fs, f) if self._use_bootstrap: p_value = self._bootstrap(y, n, f, order, data.fs) else: p_value = None return ARResults(a, sigma, f, s, self._filter, p_value, data.id)
def _bootstrap(self, y: np.ndarray, n: int, f: np.ndarray, order: int, fs: float): n_iterations = int(order / 10) orders = np.arange(-n_iterations // 2, n_iterations // 2) + order out = np.empty(n_iterations) for i, ord in tqdm.tqdm(enumerate(orders), desc="Calculating confidence intervals using bootstrap", total=n_iterations): a, sigma = Spectrum._ar_params(y, ord) _, s = Spectrum._ar_spectrum(a, sigma, n, fs, f) s_around_24 = s[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))] f_around_24 = f[(f >= 1 / (32 * 3600)) & (f <= 1 / (16 * 3600))] peak_pos = 1 / f_around_24[np.argmax(s_around_24)] / 3600 out[i] = peak_pos high = np.nanpercentile(out, 97.5) low = np.nanpercentile(out, 2.5) return (float(low), float(high)) @staticmethod def _ar_params(x: np.ndarray, p: int): n = len(x) ak = np.correlate(x, x, mode='full') norm_ak = np.hstack((np.arange(1, n + 1, 1), np.arange(n - 1, 0, -1))) ak = ak / norm_ak r = ak[n - 1:] rl = r[1:1 + p] rp = np.zeros((p, p)) for i in range(p): aa = ak[n - 1 - i:n - 1 - i + p] rp[i, :] = aa a = np.linalg.solve(rp, rl) sigma = (ak[n - 1] - np.sum(a * ak[n:n + p])) ** 0.5 return a, sigma @staticmethod def _aic_criterium(x: np.ndarray, p_max: int): n = len(x) aic = np.zeros(p_max) for p in range(1, p_max + 1): _, sigma = Spectrum._ar_params(x, p) aic[p - 1] = 2 * p / n + np.log(sigma) return aic @staticmethod def _generate_ar(a: np.ndarray, sigma: float, n: int): x = np.zeros(n) p = len(a) for i in range(p, n): for j in range(len(a)): x[i] += a[j] * x[i - (j + 1)] x[i] += sigma * np.random.randn() return x @staticmethod def _ar_spectrum(a: np.ndarray, sigma: float, n: int, fs: float, f: np.ndarray | None = None): if f is None: f = np.linspace(0, 0.001, n) z = np.exp(1j * 2 * np.pi * f / fs) A = 1 * np.ones(n) + 1j * np.zeros(n) for i in range(len(a)): A -= a[i] * z ** (-(i + 1)) H = 1. / A sp = H * sigma ** 2 * H.conj() sp = sp / fs sp = sp.real return f, sp