import numpy as np
import pprint
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from abc import abstractmethod
import scipy.signal as ss
import scipy.stats as st
import tqdm
from typing import Tuple
import csv
import sys
import inspect
from ada.data_containers._base import _ActiData, _Raw, _Epoched
from ada.data_containers.scored import ScoredShort
from ada.data_containers.generic import GenericData
from ada.data_containers._long import _Long, _LongContainer
[docs]
class CosinorResults(_LongContainer):
"""A container for storing results of cosinor model fitting. Do not construct manually! Fit params gives access to the following:
- base freq, n harmonic indexed from 1: dicts containing keys: period, amplitude, phase
- amplitude: summary amplitude of fitted line
- constant: MESOR
- f-test: pvalue from F-test
- bootstrap: pvalue from bootstrap or None
- method: used model
- preprocessing: whether it was used
- channel: to which the model was fit
"""
def __init__(self, sine_params: np.ndarray, period: float, amplitude: float, pvals: Tuple[float, float | None], method: str, ch: str | None, preprocessing: bool, id: str, generalized_params: dict | None = None):
keys = ['base freq']
for i in range(1, (len(sine_params) - 1) // 2):
keys.append(f"{i} harmonic")
self._fit_params = {}
for i, key in enumerate(keys):
self._fit_params[key] = {'period': period / 3600 / (i + 1),
'amplitude': sine_params[i],
'phase': sine_params[i + len(keys)]}
if generalized_params is not None:
for subkey in generalized_params.keys():
self._fit_params[key][subkey] = generalized_params[subkey]
self._fit_params['amplitude'] = amplitude
self._fit_params['constant'] = sine_params[-1]
self._fit_params['f-test'] = pvals[0]
self._fit_params['bootstrap'] = pvals[1]
self._fit_params['method'] = method
self._fit_params['preprocessing'] = preprocessing
self._fit_params['channel'] = ch if ch is not None else 'default'
self._fit_params['id'] = id
def __repr__(self):
return pprint.pformat(self._fit_params, indent=4, sort_dicts=False)
@staticmethod
def _construct_from_load(data: np.ndarray, fit: dict) -> "CosinorResults":
out = CosinorResults(np.arange(3), 0., 0., (0., 0.), '', None, True, '')
out._fit_params = fit
return out
@property
def id(self) -> str:
"""ID of a recording to which long estimate was fit."""
return self.fit_params['id']
[docs]
@staticmethod
def load_file(path: str) -> "CosinorResults":
"""Load model fitting results from .ada.long file (created via export method).
Args:
path (str): Path to file.
Returns:
CosinorResults: Object containing fitted model.
"""
data, fit_params = CosinorResults._generic_load(path)
return CosinorResults._construct_from_load(data, fit_params)
[docs]
def export(self, out_path: str):
"""Save data (parameters of the model) to compressed generic file.
Args:
out_path (str): Path to the out file.
"""
to_export = [np.array([None]), self._fit_params]
CosinorResults._generic_export(out_path, to_export)
[docs]
def save_csv(self, out_path: str):
"""Save parameters of fitted model to human-readable csv.
Args:
out_path (str): Path to the out file.
"""
def sinebased(self, writer):
for key in ['amplitude', 'constant', 'f-test', 'bootstrap', 'method', 'preprocessing', 'channel']:
writer.writerow([key, self._fit_params[key]])
for _ in range(3):
writer.writerow('')
writer.writerow(['fitted sines', ''])
writer.writerow('')
keys = [e for e in self._fit_params.keys() if ('base' in e) or ('harmonic' in e)]
for key in keys:
writer.writerow([key, ''])
for subkey in self._fit_params[key].keys():
writer.writerow([subkey, self._fit_params[key][subkey]])
writer.writerow('')
def generalized(self, writer):
for key in ['amplitude', 'constant', 'f-test', 'bootstrap', 'method', 'preprocessing', 'channel']:
writer.writerow([key, self._fit_params[key]])
for _ in range(3):
writer.writerow('')
writer.writerow(['fitted function', ''])
writer.writerow('')
for key in self._fit_params['base freq'].keys():
writer.writerow([key, self._fit_params['base freq'][key]])
with open(out_path, 'w', newline='') as f:
writer = csv.writer(f)
if self.fit_params['method'] in ['Linear', 'Nonlinear']:
sinebased(self, writer)
else:
generalized(self, writer)
# TODO this override should be fixed in a more sensible way, honestly no idea how exactly now
[docs]
def plot(self, out_path: str | None, data: _ActiData): # type: ignore[override]
"""Plot the provided data together with fitted model.
Args:
out_path (str | None): Path to save the plot. If None, the plot will be opened in interactive window. Defaults to None.
data (_ActiData): Data to which the model was fit.
"""
name_to_method = dict([e for e in inspect.getmembers(sys.modules[__name__], lambda member: inspect.isclass(member) and member.__module__ == __name__) if e[0][0] != '_'])
amps = [e['amplitude'] for e in self._fit_params.values() if isinstance(e, dict)]
phases = [e['phase'] for e in self._fit_params.values() if isinstance(e, dict)]
const = self._fit_params['constant']
period = self._fit_params['base freq']['period'] * 3600
if self._fit_params['bootstrap'] is not None:
color = 'red' if self._fit_params['bootstrap'] <= 0.05 else 'gray'
else:
color = 'red' if self._fit_params['f-test'] <= 0.05 else 'gray'
ch = None if self._fit_params['channel'] == 'default' else self._fit_params['channel']
algorithm = name_to_method[self._fit_params['method']]
x, y = algorithm._get_data(data, ch)
if self._fit_params['preprocessing']:
y = algorithm._preprocess(y, data.fs)
if algorithm in (Linear, Nonlinear):
func = Linear._fitting_func(x, amps, phases, const, period)
else:
specific_keys = [e for e in self._fit_params['base freq'].keys() if e not in ['amplitude', 'phase', 'period']]
specific_params = [self._fit_params['base freq'][key] for key in specific_keys]
func = algorithm._fitting_func(x, amps[0], phases[0], const, period, *specific_params)
plt.figure(figsize=(20, 10))
plt.plot(x / 3600, y)
plt.plot(x / 3600, func, color=color)
plt.xlabel('Timestamp [h]')
if out_path is None:
plt.show()
else:
plt.savefig(out_path)
plt.close()
def _get_func(self):
name_to_method = dict([e for e in inspect.getmembers(sys.modules[__name__], lambda member: inspect.isclass(member) and member.__module__ == __name__) if e[0][0] != '_'])
amps = [e['amplitude'] for e in self._fit_params.values() if isinstance(e, dict)]
phases = [e['phase'] for e in self._fit_params.values() if isinstance(e, dict)]
const = self._fit_params['constant']
period = self._fit_params['base freq']['period'] * 3600
algorithm = name_to_method[self._fit_params['method']]
if algorithm in (Linear, Nonlinear):
func = lambda x: Linear._fitting_func(x, amps, phases, const, period)
else:
specific_keys = [e for e in self._fit_params['base freq'].keys() if e not in ['amplitude', 'phase', 'period']]
specific_params = [self._fit_params['base freq'][key] for key in specific_keys]
func = lambda x: algorithm._fitting_func(x, amps[0], phases[0], const, period, *specific_params)
return func
class _Cosinor(_Long):
def __init__(self, test_periods: list[float] | float = 24, preprocessing: bool = False, use_bootstrap: bool = False):
self._preprocessing = preprocessing
self._use_bootstrap = use_bootstrap
if not isinstance(test_periods, list):
test_periods = [test_periods]
self._test_periods = [e * 3600 for e in test_periods]
@staticmethod
def _get_data(data: _ActiData, ch_name: str | None) -> Tuple[np.ndarray, np.ndarray]: # type: ignore[override]
if ch_name is None:
if isinstance(data, (_Raw, _Epoched, GenericData)):
return data.timestamp, data.to_score
elif isinstance(data, ScoredShort):
return data.timestamp, data.score
else:
raise ValueError("Invalid data type")
else:
return data.timestamp, data.data[data.channel_names.index(ch_name)]
@staticmethod
def _preprocess(data: np.ndarray, fs: float) -> np.ndarray:
sos = ss.iirdesign(1 / (3600 * 9), 1 / (3600 * 3), 2, 30, ftype='butter', output='sos', fs=fs)
out = ss.sosfiltfilt(sos, data - np.mean(data))
return out
@abstractmethod
def fit(self, data: _ActiData, ch_name: str | None = None) -> list[CosinorResults]:
pass
@staticmethod
@abstractmethod
def _fitting_func(x: np.ndarray, *args) -> np.ndarray:
pass
@property
def test_periods(self) -> list[float]:
"""List of periods, that will be fitted to the data."""
return [e / 3600 for e in self._test_periods]
@property
def preprocessing(self) -> bool:
"""Whether preprocessing is applied."""
return self._preprocessing
@property
def use_bootstrap(self) -> bool:
"""Whether bootstrap method is used to compute p-values of amplitudes."""
return self._use_bootstrap
[docs]
class Linear(_Cosinor):
"""A class for fitting linear combination of harmonic sines of known base period to the data using Ordinary Least Squares.
Attributes:
test_periods (list[float] | float): Periods in hours to be fitted to the data. Defaults to 24.
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
n_components (int): Number of harmonics, that will be present in the fitted curve. Defaults to 1 (no harmonics, only base frequency).
"""
def __init__(self, test_periods: list[float] | float = 24, preprocessing: bool = False, use_bootstrap: bool = False, n_components: int = 1):
self._n_components = n_components
super().__init__(test_periods, preprocessing, use_bootstrap)
[docs]
def fit(self, data: _ActiData, ch_name: str | None = None) -> list[CosinorResults]:
"""Fits the specified by the object curves to provided data.
Args:
data (_ActiData): Actigraphic data to be fitted with cosinor method.
ch_name (str | None, optional): Channel to which curve will be fitted. If None, it will be to_score in case of Raw or Epoched data and score in case of ScoredShort data. Defaults to None.
Returns:
list[CosinorResults]: List of objects containing fitting results. Each element corresponds to one period.
"""
x, y = self._get_data(data, ch_name)
if self._preprocessing:
y = self._preprocess(y, data.fs)
out_params = np.empty((len(self._test_periods), 2 * self._n_components + 1))
out_results = []
sines = np.empty((2 * self._n_components, len(x)))
for i, period in enumerate(self._test_periods):
for component in range(self._n_components):
sines[component, :] = np.sin(2 * np.pi * x / (period / (component + 1)))
sines[component + self._n_components, :] = np.cos(2 * np.pi * x / (period / (component + 1)))
func = np.vstack([sines, np.ones_like(x)]).T
params, rss, _, _ = np.linalg.lstsq(func, y, rcond=None)
out_params[i, :] = self._unravel_params(params)
fit = Linear._fitting_func(x, out_params[i, :self._n_components], out_params[i, self._n_components:-1], out_params[i, -1], period)
amplitude = np.abs(np.max(fit) - np.min(fit)) / 2
mss = np.sum((fit - np.mean(y)) ** 2)
f_stat = (mss / (2 * self._n_components)) / (rss / (len(fit) - (2 * self._n_components + 1)))
p_val_f = st.f.sf(f_stat, (2 * self._n_components), len(fit) - (2 * self._n_components + 1))
if self._use_bootstrap:
p_val_bootsrap = self._bootstrap(period, x, y, amplitude)
else:
p_val_bootsrap = None
out_results.append(CosinorResults(out_params[i], period, amplitude, (p_val_f[0], p_val_bootsrap), type(self).__name__, ch_name, self._preprocessing, data.id))
return out_results
# TODO do I want to check constant fit with this method?
def _bootstrap(self, period: float, x: np.ndarray, y: np.ndarray, amplitude: float, n: int = 5000) -> float:
out_params = np.empty((n, 2))
sines = np.empty((2 * self._n_components, len(x)))
for component in range(self._n_components):
sines[component, :] = np.sin(2 * np.pi * x / (period / (component + 1)))
sines[component + self._n_components, :] = np.cos(2 * np.pi * x / (period / (component + 1)))
func = np.vstack([sines, np.ones_like(x)]).T
# noise = np.copy(y)
for i in tqdm.tqdm(range(n), desc="Calculating p-value using boostrap"):
noise = np.random.normal(np.mean(y), np.std(y), len(y))
# np.random.shuffle(noise)
params, _, _, _ = np.linalg.lstsq(func, noise, rcond=None)
params = self._unravel_params(params)
fit = Linear._fitting_func(x, params[:self._n_components], params[self._n_components:-1], params[-1], period)
out_params[i, 0] = np.abs(np.max(fit) - np.min(fit)) / 2 # amplitude
out_params[i, 1] = params[-1] # constant
p = np.flip(1 - np.logspace(0, -16, 20000))
low = st.scoreatpercentile(out_params[:, 0], (1 - p) * 50)
high = st.scoreatpercentile(out_params[:, 0], (1 + p) * 50)
return 1 - float(p[np.nonzero((low >= amplitude) | (amplitude >= high))[0][0]])
def _unravel_params(self, params: np.ndarray) -> np.ndarray:
out = np.empty(2 * self._n_components + 1) # (n_amplitudes + n_phases + 1 for const)
for i in range(self._n_components):
amp1 = params[2 * i] # sine amplitude
amp2 = params[2 * i + 1] # cosine amplitude
amplitude = (amp1 ** 2 + amp2 ** 2) ** .5
if amp1 > 0 and amp2 > 0:
phase = -np.arctan(abs(amp1 / amp2))
elif amp1 > 0 and amp2 < 0:
phase = -np.pi + np.arctan(abs(amp1 / amp2))
elif amp1 < 0 and amp2 > 0:
phase = -2 * np.pi + np.arctan(abs(amp1 / amp2))
elif amp1 < 0 and amp2 < 0:
phase = -np.pi - np.arctan(abs(amp1 / amp2))
# phase should be contained in [-pi, pi]
phase %= 2 * np.pi
if phase > np.pi:
phase -= 2 * np.pi
elif phase < -np.pi:
phase += 2 * np.pi
out[i] = amplitude
out[i + self._n_components] = phase + np.pi / 2 # I messed up sines and cosines? + pi / 2 works like a charm...
out[-1] = params[-1]
return out
@staticmethod
def _fitting_func(x: np.ndarray, amps: np.ndarray | list, phases: np.ndarray | list, const: float, period: float) -> np.ndarray:
out = np.zeros_like(x)
for i, amp in enumerate(amps):
out += amp * np.sin(x * 2 * np.pi / (period / (i + 1)) + phases[i])
return out + const
@property
def n_components(self) -> int:
"""Number of harmonic components fitted to the data."""
return self._n_components
[docs]
class Nonlinear(_Cosinor):
"""A class for fitting linear combination of harmonic sines of unknown base period to the data using nonlinear methods.
Attributes:
test_periods (Tuple[float, float]): Lower and upper bounds for fitting periods, in hours. Defaults to (16, 32).
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
n_components (int): Number of harmonics, that will be present in the fitted curve. Defaults to 1 (no harmonics, only base frequency).
"""
def __init__(self, test_periods: Tuple[float, float] = (16, 32), preprocessing: bool = False, use_bootstrap: bool = False, n_components: int = 1):
self._preprocessing = preprocessing
self._use_bootstrap = use_bootstrap
self._n_components = n_components
self._test_periods = [3600 * test_periods[0], 3600 * test_periods[1]]
[docs]
def fit(self, data: _ActiData, ch_name: str | None = None) -> list[CosinorResults]:
"""Fits the specified by the object curves to provided data.
Args:
data (_ActiData): Actigraphic data to be fitted with cosinor method.
ch_name (str | None, optional): Channel to which curve will be fitted. If None, it will be to_score in case of Raw or Epoched data and score in case of ScoredShort data. Defaults to None.
Returns:
list[CosinorResults]: List of objects containing fitting results. It containes only 1 element.
"""
x, y = self._get_data(data, ch_name)
if self._preprocessing:
y = self._preprocess(y, data.fs)
lower_bound = [self._test_periods[0]] + [-np.inf] * (self._n_components * 2 + 1)
upper_bound = [self._test_periods[1]] + [np.inf] * (self._n_components * 2 + 1)
fitting_func = lambda x, period, const, *args: Nonlinear._fitting_func(x, period, const, self._n_components, *args)
params, _ = curve_fit(fitting_func, x, y, bounds=(lower_bound, upper_bound))
period = params[0]
params_unraveled = self._unravel_params(params)
fit = Nonlinear._fitting_func(x, params_unraveled[-1], period, self._n_components, *params_unraveled[:-1])
amplitude = np.abs(np.max(fit) - np.min(fit)) / 2
mss = np.sum((fit - np.mean(y)) ** 2)
rss = np.sum((fit - y) ** 2)
f_stat = (mss / (2 * self._n_components + 1)) / (rss / (len(fit) - (2 * self._n_components + 2)))
p_val_f = st.f.sf(f_stat, (2 * self._n_components + 1), len(fit) - (2 * self._n_components + 2))
if self._use_bootstrap:
p_val_bootsrap = self._bootstrap(x, y, amplitude)
else:
p_val_bootsrap = None
if isinstance(p_val_f, float):
p_val_f = [p_val_f]
return [CosinorResults(params_unraveled, period, amplitude, (p_val_f[0], p_val_bootsrap), type(self).__name__, ch_name, self._preprocessing, data.id)]
def _unravel_params(self, params: np.ndarray) -> np.ndarray:
out = np.empty(2 * self._n_components + 1) # (n_amplitudes + n_phases + 1 for const)
out[:self._n_components] = params[2:2 + self._n_components] # amplitudes
out[-1] = params[1] # constant
phases = params[2 + self._n_components:]
# phase should be contained in [-pi, pi]
def phase_helper(phase):
phase %= 2 * np.pi
if phase > np.pi:
phase -= 2 * np.pi
elif phase < -np.pi:
phase += 2 * np.pi
return phase
out[self._n_components:2 * self._n_components] = [phase_helper(e) for e in phases]
return out
# TODO do I want to test period and constant here as well?
def _bootstrap(self, x: np.ndarray, y: np.ndarray, amplitude: float, n: int = 5000) -> float:
out_params = np.empty((n, 3))
# noise = np.copy(y)
fitting_func = lambda x, period, const, *args: Nonlinear._fitting_func(x, period, const, self._n_components, *args)
lower_bound = [self._test_periods[0]] + [-np.inf] * (self._n_components * 2 + 1)
upper_bound = [self._test_periods[1]] + [np.inf] * (self._n_components * 2 + 1)
for i in tqdm.tqdm(range(n), desc="Calculating p-value using bootstrap"):
# np.random.shuffle(noise)
noise = np.random.normal(np.mean(y), np.std(y), len(y))
params, _ = curve_fit(fitting_func, x, noise, bounds=(lower_bound, upper_bound))
out_params[i, 2] = params[0] # period
params = self._unravel_params(params)
out_params[i, 1] = params[-1] # const
fit = Nonlinear._fitting_func(x, out_params[i, 2], out_params[i, 1], self._n_components, *params[:-1])
out_params[i, 0] = np.abs(np.max(fit) - np.min(fit)) / 2 # amplitude
p = np.flip(1 - np.logspace(0, -16, 20000))
low = st.scoreatpercentile(out_params[:, 0], (1 - p) * 50)
high = st.scoreatpercentile(out_params[:, 0], (1 + p) * 50)
return 1 - float(p[np.nonzero((low >= amplitude) | (amplitude >= high))[0][0]])
@staticmethod
def _fitting_func(x: np.ndarray, period: float, const: float, n_components: int, *args) -> np.ndarray:
amps = args[:n_components]
phases = args[n_components:]
out = np.zeros_like(x)
for i, amp in enumerate(amps):
out += amp * np.sin(x * 2 * np.pi / (period / (i + 1)) + phases[i])
return out + const
@property
def n_components(self) -> int:
"""Number of harmonic components fitted to the data."""
return self._n_components
class _Generalized(_Cosinor):
def __init__(self, test_periods: Tuple[float, float] = (16, 32), preprocessing: bool = False, use_bootstrap: bool = False):
self._preprocessing = preprocessing
self._use_bootstrap = use_bootstrap
self._test_periods = [3600 * test_periods[0], 3600 * test_periods[1]]
self._lower_bound: list[float | int] = []
self._upper_bound: list[float | int] = []
self._specific_params: dict[str, float] = {'0': 0.}
def fit(self, data: _ActiData, ch_name: str | None = None) -> list[CosinorResults]:
"""Fits the specified by the object curve to provided data.
Args:
data (_ActiData): Actigraphic data to be fitted with cosinor method.
ch_name (str | None, optional): Channel to which curve will be fitted. If None, it will be to_score in case of Raw or Epoched data and score in case of ScoredShort data. Defaults to None.
Returns:
list[CosinorResults]: List of objects containing fitting results. It containes only 1 element.
"""
x, y = self._get_data(data, ch_name)
if self._preprocessing:
y = self._preprocess(y, data.fs)
params, _ = curve_fit(type(self)._fitting_func, x, y, bounds=(self._lower_bound, self._upper_bound))
period = params[3]
specific_params = params[np.where(params == period)[0][0] + 1:] # depends on selected method
params_unraveled = self._unravel_params(params)
fit = type(self)._fitting_func(x, params_unraveled[0], params_unraveled[1], params_unraveled[2], period, *specific_params)
amplitude = params_unraveled[0]
mss = np.sum((fit - np.mean(y)) ** 2)
rss = np.sum((fit - y) ** 2)
n_params = len(inspect.signature(self._fitting_func).parameters) - 1
f_stat = (mss / (n_params - 1)) / (rss / (len(fit) - n_params)) # I'm now rather sure it works for generalized models as well
p_val_f = st.f.sf(f_stat, 4, len(fit) - 5) # here is some papaer about F-statistic in these models: https://www.researchgate.net/profile/Matthew-Marler/post/Has_anyone_worked_on_circadian_rhythm_of_recovery_after_fatigue/attachment/59d62fbfc49f478072ea024c/AS%3A273592158228484%401442240861910/download/2466.PDF
if self._use_bootstrap:
p_val_bootsrap = self._bootstrap(x, y, amplitude)
else:
p_val_bootsrap = None
if isinstance(p_val_f, float):
p_val_f = [p_val_f]
for i, key in enumerate(self._specific_params.keys()):
self._specific_params[key] = specific_params[i]
return [CosinorResults(params_unraveled, period, amplitude, (p_val_f[0], p_val_bootsrap), type(self).__name__, ch_name, self._preprocessing, data.id, self._specific_params)]
def _unravel_params(self, params: np.ndarray) -> np.ndarray:
out = np.empty(3) # amplitude, phase, const
out[0] = params[0]
out[2] = params[2]
# phase should be contained in [-pi, pi]
phase = params[1]
phase %= 2 * np.pi
if phase > np.pi:
phase -= 2 * np.pi
elif phase < -np.pi:
phase += 2 * np.pi
out[1] = phase
return out
# TODO do I want to test period, ratio and constant here as well?
def _bootstrap(self, x: np.ndarray, y: np.ndarray, amplitude: float, n: int = 5000) -> float:
out_params = np.empty((n, 4))
# noise = np.copy(y)
for i in tqdm.tqdm(range(n), desc="Calculating p-value using bootstrap"):
# np.random.shuffle(noise)
try: # well I guess this bootstraping method just dont work well for nonlinear, and this solution is bad. Gotta think about something else or disable it
noise = np.random.normal(np.mean(y), np.std(y), len(y))
params, _ = curve_fit(type(self)._fitting_func, x, noise, bounds=(self._lower_bound, self._upper_bound))
out_params[i, 3] = params[-1] # ratio
out_params[i, 2] = params[-2] # period
params = self._unravel_params(params)
out_params[i, 1] = params[-1] # const
out_params[i, 0] = params[0] # amplitude
except RuntimeError:
return 1
p = np.flip(1 - np.logspace(0, -16, 20000))
low = st.scoreatpercentile(out_params[:, 0], (1 - p) * 50)
high = st.scoreatpercentile(out_params[:, 0], (1 + p) * 50)
return 1 - float(p[np.nonzero((low >= amplitude) | (amplitude >= high))[0][0]])
@staticmethod
@abstractmethod
def _fitting_func(x: np.ndarray, amp: float, phase: float, const: float, period: float, *args) -> np.ndarray:
pass
[docs]
class Saw(_Generalized):
"""A class for fitting saw wave, possibly assymetric, of unknown base period to the data using nonlinear methods.
Attributes:
test_periods (Tuple[float, float]): Lower and upper bounds for fitting periods, in hours. Defaults to (16, 32).
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
"""
def __init__(self, test_periods: Tuple[float, float] = (16, 32), preprocessing: bool = False, use_bootstrap: bool = False):
super().__init__(test_periods, preprocessing, use_bootstrap)
self._lower_bound = [-np.inf] * 3 + [self._test_periods[0]] + [0]
self._upper_bound = [np.inf] * 3 + [self._test_periods[1]] + [1]
self._specific_params = {'ratio': 0.}
@staticmethod
def _fitting_func(x: np.ndarray, amp: float, phase: float, const: float, period: float, ratio: float) -> np.ndarray:
return amp * ss.sawtooth(2 * np.pi * x / period + phase, ratio) + const
[docs]
class Antilogistic(_Generalized):
"""A class for fitting anti-logistic transform of a sine wave, with asymetric and unknown base period using nonlinear methods.
Attributes:
test_periods (Tuple[float, float]): Lower and upper bounds for fitting periods, in hours. Defaults to (16, 32).
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
"""
def __init__(self, test_periods: Tuple[float, float] = (16., 32.), preprocessing: bool = False, use_bootstrap: bool = False):
super().__init__(test_periods, preprocessing, use_bootstrap)
self._lower_bound = [-np.inf] * 3 + [self._test_periods[0]] + [-1, 0]
self._upper_bound = [np.inf] * 3 + [self._test_periods[1]] + [1, np.inf]
self._specific_params = {'alpha': 0., 'beta': 0.}
@staticmethod
def _fitting_func(x: np.ndarray, amp: float, phase: float, const: float, period: float, alpha: float, beta: float) -> np.ndarray:
temp = beta * (np.cos(2 * np.pi * x / period + phase) - alpha)
return amp * np.exp(temp) / (1 + np.exp(temp)) + const
[docs]
class Arctangent(_Generalized):
"""A class for fitting arctangent transform of a sine wave, with asymetric and unknown base period using nonlinear methods.
Attributes:
test_periods (Tuple[float, float]): Lower and upper bounds for fitting periods, in hours. Defaults to (16, 32).
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
"""
def __init__(self, test_periods: Tuple[float, float] = (16, 32), preprocessing: bool = False, use_bootstrap: bool = False):
super().__init__(test_periods, preprocessing, use_bootstrap)
self._lower_bound = [-np.inf] * 3 + [self._test_periods[0]] + [-1, 0]
self._upper_bound = [np.inf] * 3 + [self._test_periods[1]] + [1, np.inf]
self._specific_params = {'alpha': 0., 'beta': 0.}
@staticmethod
def _fitting_func(x: np.ndarray, amp: float, phase: float, const: float, period: float, alpha: float, beta: float) -> np.ndarray:
temp = beta * (np.cos(2 * np.pi * x / period + phase) - alpha)
return amp * np.arctan(temp) / np.pi + const + 0.5
[docs]
class Hill(_Generalized):
"""A class for fitting Hill transform of a sine wave, with asymetric and unknown base period using nonlinear methods.
Attributes:
test_periods (Tuple[float, float]): Lower and upper bounds for fitting periods, in hours. Defaults to (16, 32).
preprocessing (bool): If true, data will be detrended and low-pass filtered before applying cosinor. Defaults to False.
use_bootstrap (bool): If true, significance of fitted amplitude will be assessed using bootstrap. Might take some time for high sampling frequencies. Defaults to False.
"""
def __init__(self, test_periods: Tuple[float, float] = (16, 32), preprocessing: bool = False, use_bootstrap: bool = False):
super().__init__(test_periods, preprocessing, use_bootstrap)
self._lower_bound = [-np.inf] * 3 + [self._test_periods[0]] + [0, 0]
self._upper_bound = [np.inf] * 3 + [self._test_periods[1]] + [1, np.inf]
self._specific_params = {'m': 0., 'gamma': 0.}
@staticmethod
def _fitting_func(x: np.ndarray, amp: float, phase: float, const: float, period: float, m: float, gamma: float) -> np.ndarray:
temp = np.cos(2 * np.pi * x / period + phase)
return amp * (1 + temp) ** gamma / (m ** gamma + (1 + temp) ** gamma) + const