import csv
import os
import tqdm
import itertools
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from datetime import datetime, timedelta
from typing import TextIO
from ada.data_containers._base import _Raw, _Epoched
from ada.io.mesa import Mesa
from ada.data_containers.scored import ScoredShort
from ada.data_containers._long import _LongContainer
from ada.long.cosinor import CosinorResults
from ada.long.nonparametric import DFAResults, SimpleNonparametricResults
from ada.long.ar import ARResults
from ada.io.file_manager import FileManager
[docs]
def summary_sleep_report(data: list[ScoredShort], path: str | TextIO, minutes: int = 20, length: int = 5):
"""Create a summary of sleep metrics for all recordings as a comma-separated CSV. Sleep onset and WASO are None if there were no sleep long enough to calculate them.
Args:
data (list[ScoredShort]): List with objects containing actigraphic sleep/wake scoring.
path (str | TextIO): Path to the output .csv file or an output buffer.
minutes (int, optional): Time of continous sleep (in minutes) needed for a sleep onset estimate. Defaults to 20.
length (int, optional): Length in minutes of sleep episode to be counted as long. Defaults to 5.
"""
if isinstance(path, str):
if os.path.basename(path).split('.')[-1] != 'csv':
path = path + '.csv'
f = open(path, 'w', newline='')
writer = csv.writer(f)
disable = False
else:
writer = csv.writer(path)
disable = True
writer.writerow(['ID', 'Scoring algorithm', 'Total sleep time [min]', 'Sleep efficiency', 'Sleep fragmentation index [1/min]', 'Sleep episodes', 'Long sleep episodes', 'Mean lenght of sleep episode [min]', 'Sleep onset [min]', 'WASO [min]'])
for scored in tqdm.tqdm(data, desc='Calculating sleep metrics', disable=disable):
score: list[float | int | None] = [scored.total_sleep_time(), scored.sleep_efficiency(), scored.sleep_fragmentation_index(), scored.sleep_episodes(), scored.long_sleep_episodes(length), scored.mean_sleep_episode()]
try:
temp: list[float | int | None] = [scored.sleep_onset(minutes), scored.wake_after_sleep_onset(minutes)]
except RuntimeError:
temp = [None, None]
score = score + temp
writer.writerow([scored.id, scored.scoring_method_metadata['algorithm']] + score)
if isinstance(path, str):
f.close()
def _sumary_cosinor(data: list[CosinorResults], writer, disable):
writer.writerow(['ID', 'Algorithm', 'Period [h]', 'Period p-value', 'Amplitude', 'Phase', 'Constant'])
for fit in tqdm.tqdm(data, desc="Writing summary of long metrics", disable=disable):
n_components = len([e for e in fit.fit_params.keys() if ('base' in e) or ('harmonic' in e)])
method = f"{fit.fit_params['method']} cosinor with {n_components} components" if fit.fit_params['method'] in ['Linear', 'Nonlinear'] else f"{fit.fit_params['method']} cosinor"
writer.writerow([fit.id, method, fit.fit_params['base freq']['period'], fit.fit_params['f-test'], fit.fit_params['amplitude'], fit.fit_params['base freq']['phase'], fit.fit_params['constant']])
def _summary_dfa(data: list[DFAResults], writer, disable):
n = len(data[0].crossovers[0])
writer.writerow(['ID', 'Algorithm', 'Exponent', 'Exponent error'] + list(itertools.chain.from_iterable(zip([f"Crossover {i}" for i in range(1, n + 1)], [f"CI {i}" for i in range(1, n + 1)]))))
for fit in tqdm.tqdm(data, desc="Writing summary of long metrics", disable=disable):
row = [fit.id, 'DFA', fit.fit_params['exponent'], fit.fit_params['exponent error']]
for i in range(0, n):
row = row + [fit.crossovers[0][i], fit.crossovers[1][i]]
writer.writerow(row)
def _summary_ar(data: list[ARResults], writer, disable):
writer.writerow(['ID', 'Algorithm', 'Order', 'Period [h]', 'Period CI'])
for fit in tqdm.tqdm(data, desc="Writing summary of long metrics", disable=disable):
writer.writerow([fit.id, 'AR Spectrum', fit.fit_params['order'], fit.fit_params['peak'], fit.fit_params['period ci']])
def _summary_snp(data: list[SimpleNonparametricResults], writer, disable):
writer.writerow(['ID', 'Algorithm', 'Interdaily Stability', 'Intradaily Variability', 'M10', 'L5'])
for fit in tqdm.tqdm(data, desc="Writing summary of long metrics", disable=disable):
writer.writerow([fit.id, 'Simple nonparametric', fit.fit_params['IS'], fit.fit_params['IV'], fit.fit_params['M10'], fit.fit_params['L5']])
[docs]
def summary_long_report(data: list[_LongContainer], path: str | TextIO):
"""Create a summary of parameters of the long model for all recordings as a comma-separated CSV.
Args:
data (list[_LongContainer]): List with objects containing parameters of fitted long model.
path (str | TextIO): Path to the output .csv file or an output buffer.
"""
if isinstance(path, str):
if os.path.basename(path).split('.')[-1] != 'csv':
path = path + '.csv'
f = open(path, 'w', newline='')
writer = csv.writer(f)
disable = False
else:
writer = csv.writer(path)
disable = True
if np.all([isinstance(e, CosinorResults) for e in data]):
_sumary_cosinor(data, writer, disable) # type: ignore[arg-type]
elif np.all([isinstance(e, DFAResults) for e in data]):
_summary_dfa(data, writer, disable) # type: ignore[arg-type]
elif np.all([isinstance(e, ARResults) for e in data]):
_summary_ar(data, writer, disable) # type: ignore[arg-type]
elif np.all([isinstance(e, SimpleNonparametricResults) for e in data]):
_summary_snp(data, writer, disable) # type: ignore[arg-type]
else:
raise RuntimeError("Unsupported method or list contains results of different methods. Use only one long estimator.")
if isinstance(path, str):
f.close()
[docs]
def plot(acti_data: _Raw | _Epoched, acti_score: ScoredShort | None = None, cosinor_fit: CosinorResults | None = None, timestamp: bool = True, window_name: str | None = None):
"""Plot actgigraphic data aggregating information from multiple processing stages.
Args:
acti_data (_Raw | _Epoched): Data to be plotted.
acti_score (ScoredShort | None, optional): Sleep/wake score for the plotted data. Not plotted if None. Defaults to None.
cosinor_fit (CosinorResults | None, optional): Cosinor model fitted to the data. Not plotted if None. Defaults to None.
timestamp (bool, optional): If True, x axis will be labeled using relative timestamp in hours. If False, the absolute dates will be used. Defaults to True.
window_name (str | None, optional): Name of the window in which interactive figure will be displayed. Default (figure number) if None. Defaults to None.
Returns:
matplotlib.figure.Figure: Object with the plot, which can be saved or displayed.
"""
fig = plt.figure(figsize=(12, 6)) if window_name is None else plt.figure(window_name, figsize=(12, 6))
ax = fig.add_subplot(1, 1, 1)
ax.plot(acti_data.timestamp / 3600, acti_data.to_score, color='black', alpha=1)
ax.set_xlim(acti_data.timestamp[0] / 3600, acti_data.timestamp[-1] / 3600)
ax.set_ylim(bottom=0)
ax.set_ylabel('Activity level')
ax.grid(which='major', alpha=0.5)
ax.grid(which='minor', alpha=0.3)
if timestamp:
ax.set_xlabel('Time [h]')
else:
if isinstance(acti_data, Mesa):
ax.set_xlabel('Time [h]')
print('MESA data are stripped of dates, using timestamp instead.')
else:
start_date = datetime.fromtimestamp(acti_data.first_sample_timestamp)
first_day = datetime.fromtimestamp(acti_data.first_sample_timestamp + 24 * 3600)
if start_date.hour >= 16:
first_tick = '00:00'
elif start_date.hour < 8:
first_tick = '08:00'
else:
first_tick = '16:00'
# this shit means labels can be off by 1 sample, but I think I don't care?
# just don't use pathologically long epochs like 1 hour and everything will be fine.
start_seconds = int(first_tick.split(':')[0]) * 3600 + int(first_tick.split(':')[1]) * 60 if first_tick != '00:00' else 24 * 3600
sample_diff = int((start_seconds - start_date.hour * 3600 - start_date.minute * 60 - start_date.second - start_date.microsecond * 1e-6) * acti_data.fs)
ticks = np.arange(acti_data.timestamp[sample_diff] / 3600, acti_data.timestamp[-1] / 3600, 8)
minor_ticks = np.arange(acti_data.timestamp[sample_diff] / 3600, acti_data.timestamp[-1] / 3600, 1)
pre_ticks = np.arange(acti_data.timestamp[sample_diff] / 3600, 0, -1)
minor_ticks = np.concatenate((pre_ticks, minor_ticks))
labels = (np.arange(0, 8 * len(ticks), 8) + int(first_tick.split(':')[0])) % 24
days = [first_day + timedelta(days=i) for i in range(len(np.argwhere(labels == 0)))]
ax.set_xticks(ticks, [f"{e:02}:00" if e != 0 else f"{e:02}:00\n{days.pop(0).strftime('%Y-%m-%d')}" for e in labels], rotation=60)
ax.set_xticks(minor_ticks, minor=True)
ax.set_xlabel('Time of the day')
if acti_score is not None:
ax.pcolorfast(ax.get_xlim(), ax.get_ylim(), acti_score.score[np.newaxis], cmap=ListedColormap(['blue', 'white']), alpha=0.3)
if cosinor_fit is not None:
func = cosinor_fit._get_func()
ax.plot(acti_data.timestamp / 3600, func(acti_data.timestamp), color='red', alpha=1) # this is a bit misleading if preprocessing was True...
fig.tight_layout()
return fig
[docs]
def daily_profile(acti_data: list[_Raw | _Epoched] | _Raw | _Epoched, wlen: int | None = 15):
"""Daily profile averaging activity from multiple days over a 24 hours. Can plot multiple subjects. Plotting epoched data (rather than Raw) is recommended.
Args:
acti_data (list[_Raw | _Epoched] | _Raw | _Epoched): Data to be plotted.
wlen (int | None, optional): Length of a window (in minutes) in which data will be smoothed by moving average. No smoothing if None. Defaults to 15.
Returns:
matplotlib.figure.Figure: Object with the plot, which can be saved or displayed.
"""
if not isinstance(acti_data, list):
acti_data = [acti_data]
fig = plt.figure("Daily profile", figsize=(12, 6))
ax = fig.add_subplot(1, 1, 1)
for acti in acti_data:
start_date = datetime.fromtimestamp(acti.first_sample_timestamp)
shift = int((start_date.hour * 3600 + start_date.minute * 60 + start_date.second + start_date.microsecond * 1e-6) * acti.fs)
# n_days = int(len(acti) / 24 / 3600 / acti.fs)
# data = acti.to_score[:int(len(acti.to_score) / n_days) * n_days]
# print(data.reshape(n_days, -1).shape)
# data = np.mean(data.reshape(n_days, -1), axis=0)
data = np.array([np.mean(acti.to_score[i::int(24 * 3600 * acti.fs)]) for i in range(int(24 * 3600 * acti.fs))])
if wlen is not None:
data = np.convolve(data, np.ones(int(wlen * 60 * acti.fs)) / int(wlen * 60 * acti.fs), mode='same')
data = np.roll(data, shift)
time = np.arange(0, len(data)) / 3600 / acti.fs
ax.plot(time, data, label=acti.id)
ticks = np.arange(0, 24, 1)
labels = [f"{e:02}:00" for e in range(24)]
ax.grid(alpha=0.3)
ax.set_xticks(ticks, labels, rotation=60)
ax.set_xlabel('Time of the day')
ax.set_ylabel('Mean activity')
ax.set_xlim(ticks[0], ticks[-1] + ticks[1])
ax.set_ylim(bottom=0)
if len(acti_data) > 1:
ax.legend()
fig.tight_layout()
return fig
# metadata_summary('/home/lairmaster/Git/P61_left wrist_038144_2022-05-26 14-34-44.ada')