from pygt3x.reader import FileReader
from typing import ClassVar
import numpy as np
import pprint
from dataclasses import fields, dataclass
from ciso8601 import parse_datetime
import os
import csv
from datetime import datetime, timezone, timedelta
import tqdm
import pandas as pd
import re
from ada.data_containers._base import _Raw
[docs]
@dataclass(slots=True, eq=False)
class RawActiGraph(_Raw):
"""A class for storing and handling raw data generated by Actigraph devices."""
_stationary_variance: ClassVar[float] = 0.0004563
_dynamic_range: ClassVar[float] = 8.0
def __repr__(self):
return "Raw metadata:\n{}".format(pprint.pformat(self._metadata, indent=4))
@property
def stationary_variance(self) -> float:
return self._stationary_variance
@property
def dynamic_range(self) -> float:
return self._dynamic_range
@property
def x(self) -> np.ndarray:
return self._data[1]
@property
def y(self) -> np.ndarray:
return self._data[2]
@property
def z(self) -> np.ndarray:
return self._data[3]
@property
def timestamp(self) -> np.ndarray:
return self._data[0]
@property
def vlen(self) -> np.ndarray:
return np.abs(np.sqrt(self._data[0] ** 2 + self._data[1] ** 2 + self._data[2] ** 2) - 1)
@property
def to_score(self) -> np.ndarray:
"""Data to be scored by scoring algorithms. Here equal to vlen."""
return self.vlen
@property
def id(self) -> str:
return self.metadata['subject_name']
@property
def first_sample_timestamp(self) -> float:
return self._metadata['first_sample']
@staticmethod
def _sn_to_type(serial_number):
if serial_number[:3] == 'NEO':
device_type = 'GT3XPlus'
elif serial_number[:3] == 'CLE':
device_type = 'wGT3XPlus'
elif serial_number[:4] == 'MOS0' or serial_number[:4] == 'MOS2':
device_type = 'wGT3X-BT'
elif serial_number[:3] == 'MRA':
device_type = 'ActiSleepPlus'
elif serial_number[:3] == 'MOS3':
device_type = 'wActiSleepPlus'
elif serial_number[:3] == 'MOS4':
device_type = 'wActiSleep-BT'
elif serial_number[:3] == 'TAS':
device_type = 'GT9X Link'
else:
device_type = '???'
return device_type
@staticmethod
def _load_bin(path: str) -> "RawActiGraph":
print(f"Reading from file: {path}")
with FileReader(path) as reader:
metadata = {}
for key in fields(reader.info):
metadata[key.name] = getattr(reader.info, key.name)
data = np.empty((4, reader.acceleration.shape[0]))
acc = reader.to_pandas()
data[1:4] = acc.to_numpy().T[:3]
data[0] = acc.index.to_numpy()
metadata['first_sample'] = float(data[0, 0])
data[0] = data[0] - data[0, 0]
channel_names = ['timestamp', 'x', 'y', 'z']
print(f"Done reading from file: {path}")
return RawActiGraph(data, metadata, float(metadata['sample_rate']), channel_names)
@staticmethod
def _load_csv(path: str) -> "RawActiGraph":
def convert_date(row):
date = '-'.join(row.split(' ')[0].split('.')[::-1])
temp = re.split(r'\W+', row.split(' ')[1])
time = f"{':'.join(temp[:-1])}.{temp[-1]}"
return parse_datetime(f"{date}T{time}+0000").timestamp() # csv does not have TZ info, so we have to asume UTC to be consistent.
temp_data = pd.read_csv(path, names=['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z'])
temp = temp_data.iat[0, 0].split(' ')
fs = float(temp[temp.index('Hz') - 1])
serial_number = temp_data.iat[1, 0].split(':')[-1].strip()
metadata = {'sample_rate': fs,
'firmware': temp[temp.index('Firmware') + 1][1:],
'serial_number': serial_number,
'subject_name': '',
'device_type': RawActiGraph._sn_to_type(serial_number),
'battery_voltage': float(temp_data.iat[8, 0].split(' ')[3]),
'start_date': convert_date(temp_data.iat[11, 0]),
'stop_date': convert_date(temp_data.iat[-1, 0]),
'download_date': convert_date(f"{temp_data.iat[6, 0].split(' ')[-1]} {temp_data.iat[5, 0].split(' ')[-1]}.000"),
'first_sample': convert_date(temp_data.iat[11, 0])}
tqdm.tqdm.pandas(desc=f"Reading from file: {path}")
temp_data.loc[11:, 'Timestamp'] = temp_data.loc[11:, 'Timestamp'].progress_apply(convert_date)
temp_data[11:] = temp_data[11:].replace(',', '.', regex=True)
try:
data = temp_data.iloc[11:].to_numpy(dtype=float).T
except ValueError:
data = temp_data.iloc[11:].replace(',', '.', regex=True).to_numpy(dtype=float).T
data[0] = data[0] - data[0, 0]
return RawActiGraph(data, metadata, fs, ['timestamp', 'x', 'y', 'z'])
[docs]
@staticmethod
def load_file(path: str) -> "RawActiGraph":
"""Loading file generated by the ActiGraph gt3x-compliant device.
Args:
path (str): Path to .gt3x or .csv file.
Raises:
ValueError: Unsupported file format otherwise.
Returns:
RawActiGraph: Object containing data.
"""
if os.path.basename(path).split('.')[-1] == 'gt3x':
return RawActiGraph._load_bin(path)
elif os.path.basename(path).split('.')[-1] == 'csv':
temp = RawActiGraph._load_csv(path)
if RawActiGraph._sn_to_type(temp.metadata['serial_number']) != '???':
return temp
else:
raise ValueError("Unsupported file format, use .gt3x or .csv files only.")
else:
raise ValueError("Unsupported file format, use .gt3x or .csv files only.")
[docs]
def export(self, path: str):
"""Export data to .csv file resembling the ones generated by ActiLife.
Args:
path (str): Path to the output file.
"""
start_date = datetime.fromtimestamp(self.first_sample_timestamp, tz=timezone.utc) # csv does not have TZ info, so we have to asume UTC to be consistent.
download_date = datetime.fromtimestamp(self.metadata['download_date'], tz=timezone.utc)
with open(path, 'w') as f:
writer = csv.writer(f)
writer.writerow([f"------------ Data File Created By ADA v1.0.0 Firmware v{self.metadata['firmware']} date format dd.MM.yyyy at {self.fs} Hz Filter Normal -----------"]) # TODO replace version with auto version after merge
writer.writerow([f"Serial Number: {self.metadata['serial_number']}"])
writer.writerow([f"Start Time {start_date.hour:02}:{start_date.minute:02}:{start_date.second:02}"])
writer.writerow([f"Start Date {start_date.day:02}.{start_date.month:02}.{start_date.year}"])
writer.writerow([f"Epoch Period (hh:mm:ss) {int((1 / self.fs) / 3600):02}:{int((1 / self.fs) / 60):02}:{int(1 / self.fs):02}"])
writer.writerow([f"Download Time {download_date.hour:02}:{download_date.minute:02}:{download_date.second:02}"])
writer.writerow([f"Download Date {download_date.day:02}.{download_date.month:02}.{download_date.year}"])
writer.writerow(["Current Memory Address: 0"])
writer.writerow([f"Current Battery Voltage: {self.metadata['battery_voltage']} Mode = NA"])
writer.writerow(["--------------------------------------------------"])
writer.writerow(['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z'])
for i in tqdm.tqdm(range(self._data.shape[1]), desc=f'Saving to file: {path}'):
date = datetime.fromtimestamp(self._convert_timestamp(self.timestamp[i], self._metadata, True), timezone.utc).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3]
writer.writerow([date, self.x[i], self.y[i], self.z[i]])
@staticmethod
def _convert_timestamp(timestamp: float, metadata: dict, to_unix: bool = False) -> float:
if to_unix:
return timestamp + metadata['first_sample']
else:
return timestamp - metadata['first_sample']
[docs]
def cut_by_samples(self, start_sample: int, end_sample: int) -> "RawActiGraph":
"""Create new object with the data cut by given indexes.
Args:
start_sample (int): First sample of output data.
end_sample (int): Sample after the last sample of output data.
Returns:
RawActiGraph: Object containing the cutted data.
"""
return RawActiGraph(self._data[start_sample:end_sample], self._metadata, self._fs, self._channel_names)
[docs]
def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "RawActiGraph":
"""Create new object with the data cut by given timestamps.
Args:
start_ts (float): Unix timestamp of output data beginning.
end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data.
Returns:
RawActiGraph: Object containing the cutted data.
"""
ts = self.metadata['first_sample']
start_sample = np.where(self._data[0, :] >= start_ts - ts)[0][0]
if end_ts is None:
end_ts = ts + float(self._data[0, -1])
try:
end_sample = np.where(self._data[0, :] >= end_ts - ts)[0][0]
except IndexError:
end_sample = len(self.timestamp)
return self.cut_by_samples(start_sample, end_sample)
[docs]
def cut_by_dates(self, start_date: str, end_date: str | None) -> "RawActiGraph":
"""Create new object with the data cut by given dates.
Args:
start_date (str): ISO-formated date of outputa data beginning.
end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data.
Returns:
RawActiGraph: Object containing the cutted data.
"""
start_ts = parse_datetime(start_date).timestamp()
if end_date is not None:
end_ts = parse_datetime(end_date).timestamp()
else:
end_ts = None
return self.cut_by_timestamp(start_ts, end_ts)
@staticmethod
def _preview_csv_metadata(path: str) -> tuple[dict, list[str]]:
def convert_date(row):
date = '-'.join(row.split(' ')[0].split('.')[::-1])
temp = re.split(r'\W+', row.split(' ')[1])
time = f"{':'.join(temp[:-1])}.{temp[-1]}"
return parse_datetime(f"{date}T{time}+0000").timestamp() # csv does not have TZ info, so we have to asume UTC to be consistent.
temp_data = pd.read_csv(path, names=['Timestamp', 'Accelerometer X', 'Accelerometer Y', 'Accelerometer Z'], nrows=20)
temp = temp_data.iat[0, 0].split(' ')
fs = float(temp[temp.index('Hz') - 1])
serial_number = temp_data.iat[1, 0].split(':')[-1].strip()
metadata = {'sample_rate': fs,
'firmware': temp[temp.index('Firmware') + 1][1:],
'serial_number': serial_number,
'subject_name': '',
'device_type': RawActiGraph._sn_to_type(serial_number),
'battery_voltage': float(temp_data.iat[8, 0].split(' ')[3]),
'start_date': datetime.fromtimestamp(convert_date(temp_data.iat[11, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3],
'stop_date': datetime.fromtimestamp(convert_date(temp_data.iat[-1, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3],
'download_date': datetime.fromtimestamp(convert_date(f"{temp_data.iat[6, 0].split(' ')[-1]} {temp_data.iat[5, 0].split(' ')[-1]}.000")).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3],
'first_sample': datetime.fromtimestamp(convert_date(temp_data.iat[11, 0])).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3]}
return metadata, ['timestamp', 'x', 'y', 'z']
@staticmethod
def _preview_bin_metadata(path: str) -> tuple[dict, list[str]]:
with FileReader(path, num_rows=50) as reader:
metadata = {}
for key in fields(reader.info):
metadata[key.name] = getattr(reader.info, key.name)
for k in ['start_date', 'stop_date']:
metadata[k] = (datetime(1, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=metadata[k] // 10)).strftime("%d.%m.%Y %H:%M:%S.%f")[:-3]
return metadata, ['timestamp', 'x', 'y', 'z']