Source code for ada.io.actigraph

from pygt3x.reader import FileReader
from typing import ClassVar
import numpy as np
import pprint
from dataclasses import fields, dataclass
from ciso8601 import parse_datetime
import os
import csv
from datetime import datetime, timezone, timedelta
import tqdm
import pandas as pd
import re

from ada.data_containers._base import _Raw


[docs] @dataclass(slots=True, eq=False) class RawActiGraph(_Raw): """A class for storing and handling raw data generated by Actigraph devices.""" _stationary_variance: ClassVar[float] = 0.0004563 _dynamic_range: ClassVar[float] = 8.0 def __repr__(self): return "Raw metadata:\n{}".format(pprint.pformat(self._metadata, indent=4)) @property def stationary_variance(self) -> float: return self._stationary_variance @property def dynamic_range(self) -> float: return self._dynamic_range @property def x(self) -> np.ndarray: return self._data[1] @property def y(self) -> np.ndarray: return self._data[2] @property def z(self) -> np.ndarray: return self._data[3] @property def timestamp(self) -> np.ndarray: return self._data[0] @property def vlen(self) -> np.ndarray: return np.abs(np.sqrt(self._data[0] ** 2 + self._data[1] ** 2 + self._data[2] ** 2) - 1) @property def to_score(self) -> np.ndarray: """Data to be scored by scoring algorithms. Here equal to vlen.""" return self.vlen @property def id(self) -> str: return self.metadata['subject_name'] @property def first_sample_timestamp(self) -> float: return self._metadata['first_sample'] @staticmethod def _sn_to_type(serial_number): if serial_number[:3] == 'NEO': device_type = 'GT3XPlus' elif serial_number[:3] == 'CLE': device_type = 'wGT3XPlus' elif serial_number[:4] == 'MOS0' or serial_number[:4] == 'MOS2': device_type = 'wGT3X-BT' elif serial_number[:3] == 'MRA': device_type = 'ActiSleepPlus' elif serial_number[:3] == 'MOS3': device_type = 'wActiSleepPlus' elif serial_number[:3] == 'MOS4': device_type = 'wActiSleep-BT' elif serial_number[:3] == 'TAS': device_type = 'GT9X Link' else: device_type = '???' return device_type @staticmethod def _load_bin(path: str) -> "RawActiGraph": print(f"Reading from file: {path}") with FileReader(path) as reader: metadata = {} for key in fields(reader.info): metadata[key.name] = getattr(reader.info, key.name) data = np.empty((4, reader.acceleration.shape[0])) acc = reader.to_pandas() data[1:4] = acc.to_numpy().T[:3] data[0] = acc.index.to_numpy() metadata['first_sample'] = float(data[0, 0]) data[0] = data[0] - data[0, 0] channel_names = ['timestamp', 'x', 'y', 'z'] print(f"Done reading from file: {path}") return RawActiGraph(data, metadata, float(metadata['sample_rate']), channel_names) @staticmethod def _load_csv(path: str) -> "RawActiGraph": def convert_date(row): date = '-'.join(row.split(' ')[0].split('.')[::-1]) temp = re.split(r'\W+', row.split(' ')[1]) time = f"{':'.join(temp[:-1])}.{temp[-1]}" return parse_datetime(f"{date}T{time}+0000").timestamp() # csv does not have TZ info, so we have to asume UTC to be consistent. temp_data = pd.read_csv(path, names=['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z']) temp = temp_data.iat[0, 0].split(' ') fs = float(temp[temp.index('Hz') - 1]) serial_number = temp_data.iat[1, 0].split(':')[-1].strip() metadata = {'sample_rate': fs, 'firmware': temp[temp.index('Firmware') + 1][1:], 'serial_number': serial_number, 'subject_name': '', 'device_type': RawActiGraph._sn_to_type(serial_number), 'battery_voltage': float(temp_data.iat[8, 0].split(' ')[3]), 'start_date': convert_date(temp_data.iat[11, 0]), 'stop_date': convert_date(temp_data.iat[-1, 0]), 'download_date': convert_date(f"{temp_data.iat[6, 0].split(' ')[-1]} {temp_data.iat[5, 0].split(' ')[-1]}.000"), 'first_sample': convert_date(temp_data.iat[11, 0])} tqdm.tqdm.pandas(desc=f"Reading from file: {path}") temp_data.loc[11:, 'Timestamp'] = temp_data.loc[11:, 'Timestamp'].progress_apply(convert_date) temp_data[11:] = temp_data[11:].replace(',', '.', regex=True) try: data = temp_data.iloc[11:].to_numpy(dtype=float).T except ValueError: data = temp_data.iloc[11:].replace(',', '.', regex=True).to_numpy(dtype=float).T data[0] = data[0] - data[0, 0] return RawActiGraph(data, metadata, fs, ['timestamp', 'x', 'y', 'z'])
[docs] @staticmethod def load_file(path: str) -> "RawActiGraph": """Loading file generated by the ActiGraph gt3x-compliant device. Args: path (str): Path to .gt3x or .csv file. Raises: ValueError: Unsupported file format otherwise. Returns: RawActiGraph: Object containing data. """ if os.path.basename(path).split('.')[-1] == 'gt3x': return RawActiGraph._load_bin(path) elif os.path.basename(path).split('.')[-1] == 'csv': temp = RawActiGraph._load_csv(path) if RawActiGraph._sn_to_type(temp.metadata['serial_number']) != '???': return temp else: raise ValueError("Unsupported file format, use .gt3x or .csv files only.") else: raise ValueError("Unsupported file format, use .gt3x or .csv files only.")
[docs] def export(self, path: str): """Export data to .csv file resembling the ones generated by ActiLife. Args: path (str): Path to the output file. """ start_date = datetime.fromtimestamp(self.first_sample_timestamp, tz=timezone.utc) # csv does not have TZ info, so we have to asume UTC to be consistent. download_date = datetime.fromtimestamp(self.metadata['download_date'], tz=timezone.utc) with open(path, 'w') as f: writer = csv.writer(f) writer.writerow([f"------------ Data File Created By ADA v1.0.0 Firmware v{self.metadata['firmware']} date format dd.MM.yyyy at {self.fs} Hz Filter Normal -----------"]) # TODO replace version with auto version after merge writer.writerow([f"Serial Number: {self.metadata['serial_number']}"]) writer.writerow([f"Start Time {start_date.hour:02}:{start_date.minute:02}:{start_date.second:02}"]) writer.writerow([f"Start Date {start_date.day:02}.{start_date.month:02}.{start_date.year}"]) writer.writerow([f"Epoch Period (hh:mm:ss) {int((1 / self.fs) / 3600):02}:{int((1 / self.fs) / 60):02}:{int(1 / self.fs):02}"]) writer.writerow([f"Download Time {download_date.hour:02}:{download_date.minute:02}:{download_date.second:02}"]) writer.writerow([f"Download Date {download_date.day:02}.{download_date.month:02}.{download_date.year}"]) writer.writerow(["Current Memory Address: 0"]) writer.writerow([f"Current Battery Voltage: {self.metadata['battery_voltage']} Mode = NA"]) writer.writerow(["--------------------------------------------------"]) writer.writerow(['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z']) for i in tqdm.tqdm(range(self._data.shape[1]), desc=f'Saving to file: {path}'): date = datetime.fromtimestamp(self._convert_timestamp(self.timestamp[i], self._metadata, True), timezone.utc).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3] writer.writerow([date, self.x[i], self.y[i], self.z[i]])
@staticmethod def _convert_timestamp(timestamp: float, metadata: dict, to_unix: bool = False) -> float: if to_unix: return timestamp + metadata['first_sample'] else: return timestamp - metadata['first_sample']
[docs] def cut_by_samples(self, start_sample: int, end_sample: int) -> "RawActiGraph": """Create new object with the data cut by given indexes. Args: start_sample (int): First sample of output data. end_sample (int): Sample after the last sample of output data. Returns: RawActiGraph: Object containing the cutted data. """ return RawActiGraph(self._data[start_sample:end_sample], self._metadata, self._fs, self._channel_names)
[docs] def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "RawActiGraph": """Create new object with the data cut by given timestamps. Args: start_ts (float): Unix timestamp of output data beginning. end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data. Returns: RawActiGraph: Object containing the cutted data. """ ts = self.metadata['first_sample'] start_sample = np.where(self._data[0, :] >= start_ts - ts)[0][0] if end_ts is None: end_ts = ts + float(self._data[0, -1]) try: end_sample = np.where(self._data[0, :] >= end_ts - ts)[0][0] except IndexError: end_sample = len(self.timestamp) return self.cut_by_samples(start_sample, end_sample)
[docs] def cut_by_dates(self, start_date: str, end_date: str | None) -> "RawActiGraph": """Create new object with the data cut by given dates. Args: start_date (str): ISO-formated date of outputa data beginning. end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data. Returns: RawActiGraph: Object containing the cutted data. """ start_ts = parse_datetime(start_date).timestamp() if end_date is not None: end_ts = parse_datetime(end_date).timestamp() else: end_ts = None return self.cut_by_timestamp(start_ts, end_ts)
@staticmethod def _preview_csv_metadata(path: str) -> tuple[dict, list[str]]: def convert_date(row): date = '-'.join(row.split(' ')[0].split('.')[::-1]) temp = re.split(r'\W+', row.split(' ')[1]) time = f"{':'.join(temp[:-1])}.{temp[-1]}" return parse_datetime(f"{date}T{time}+0000").timestamp() # csv does not have TZ info, so we have to asume UTC to be consistent. temp_data = pd.read_csv(path, names=['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z'], nrows=20) temp = temp_data.iat[0, 0].split(' ') fs = float(temp[temp.index('Hz') - 1]) serial_number = temp_data.iat[1, 0].split(':')[-1].strip() metadata = {'sample_rate': fs, 'firmware': temp[temp.index('Firmware') + 1][1:], 'serial_number': serial_number, 'subject_name': '', 'device_type': RawActiGraph._sn_to_type(serial_number), 'battery_voltage': float(temp_data.iat[8, 0].split(' ')[3]), 'start_date': datetime.fromtimestamp(convert_date(temp_data.iat[11, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3], 'stop_date': datetime.fromtimestamp(convert_date(temp_data.iat[-1, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3], 'download_date': datetime.fromtimestamp(convert_date(f"{temp_data.iat[6, 0].split(' ')[-1]} {temp_data.iat[5, 0].split(' ')[-1]}.000")).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3], 'first_sample': datetime.fromtimestamp(convert_date(temp_data.iat[11, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3]} return metadata, ['timestamp', 'x', 'y', 'z'] @staticmethod def _preview_bin_metadata(path: str) -> tuple[dict, list[str]]: with FileReader(path, num_rows=50) as reader: metadata = {} for key in fields(reader.info): metadata[key.name] = getattr(reader.info, key.name) for k in ['start_date', 'stop_date']: metadata[k] = (datetime(1, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=metadata[k] // 10)).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3] return metadata, ['timestamp', 'x', 'y', 'z']
[docs] @staticmethod def preview_metadata(path: str) -> tuple[dict, list[str]]: """Preview of metadata without loading the file. Args: path (str): Path to the file Raises: ValueError: Wrong format Returns: tuple[dict, list[str]]: Metadata and channel names. """ if os.path.basename(path).split('.')[-1] == 'gt3x': return RawActiGraph._preview_bin_metadata(path) elif os.path.basename(path).split('.')[-1] == 'csv': return RawActiGraph._preview_csv_metadata(path) else: raise ValueError("Unsupported file format, use .gt3x or .csv files only.")