# -*- coding: utf-8 -*-
"""Module for reading and cutting GeneActive data."""
import numpy as np
from ciso8601 import parse_datetime
import struct
from tqdm import tqdm
import pprint
import csv
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from typing import Tuple, ClassVar
from ada.data_containers._base import _Raw, _Epoched
[docs]
@dataclass(slots=True, eq=False)
class RawGeneActiv(_Raw):
"""A class for storing and handling raw data generated by GeneActiv."""
_stationary_variance: ClassVar[float] = 0.0004563
_dynamic_range: ClassVar[float] = 8.0
def __repr__(self):
return "Raw metadata:\n{}".format(pprint.pformat(self._metadata, indent=4))
@property
def stationary_variance(self) -> float:
return self._stationary_variance
@property
def dynamic_range(self) -> float:
return self._dynamic_range
@property
def x(self) -> np.ndarray:
return self._data[0]
@property
def y(self) -> np.ndarray:
return self._data[1]
@property
def z(self) -> np.ndarray:
return self._data[2]
@property
def lux(self) -> np.ndarray:
"""Lightmeter values in lux."""
return self._data[3]
@property
def button(self) -> np.ndarray:
"""Button values (1=pressed)."""
return self._data[4]
@property
def temperature(self) -> np.ndarray:
"""Recorded temperature in Celsius degrees."""
return self._data[5]
@property
def timestamp(self) -> np.ndarray:
return self._data[6]
@property
def vlen(self) -> np.ndarray:
return np.abs(np.sqrt(self._data[0] ** 2 + self._data[1] ** 2 + self._data[2] ** 2) - 1)
@property
def to_score(self) -> np.ndarray:
"""Data to be scored by scoring algorithms. Here equal to vlen."""
return self.vlen
@property
def id(self) -> str:
return self.metadata['Subject Code']
[docs]
@staticmethod
def load_file(path: str) -> "RawGeneActiv":
"""Loading file generated by the GeneActiv device.
Args:
path (str): Path to .bin or .csv file.
Raises:
ValueError: Unsupported file format otherwise.
Returns:
RawGeneActiv: Object containing data.
"""
if path[-3:] == 'csv':
data, metadata, fs, channel_names = RawGeneActiv._read_csv(path)
return RawGeneActiv(data, metadata, fs, channel_names)
else:
try:
data, metadata, fs, channel_names = RawGeneActiv._read_bin(path)
return RawGeneActiv(data, metadata, fs, channel_names)
except Exception as e:
print(e)
raise ValueError('Unsupported file format')
[docs]
def export(self, path: str):
"""Export object to .csv file fully resembling the manufacturer .csv format.
Args:
path (str): Path to .csv file to which data will be exported.
"""
if 'Number of Pages' in self._metadata.keys():
self._export_bin(path)
else:
self._export_csv(path)
[docs]
def cut_by_samples(self, start_sample: int, end_sample: int) -> "RawGeneActiv":
"""Create new object with the data cut by given indexes.
Args:
start_sample (int): First sample of output data.
end_sample (int): Sample after the last sample of output data.
Returns:
RawGeneActiv: Object containing the cutted data.
"""
return RawGeneActiv(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._channel_names)
[docs]
def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "RawGeneActiv":
"""Create new object with the data cut by given timestamps.
Args:
start_ts (float): Unix timestamp of output data beginning.
end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data.
Returns:
RawGeneActiv: Object containing the cutted data.
"""
ts = self._metadata['Start Time']
tz = self._metadata['Time Zone'][3:].strip()
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
start_sample = np.where(self._data[6, :] >= start_ts - ts)[0][0]
if end_ts is None:
end_ts = ts + float(self._data[6, -1])
try:
end_sample = np.where(self._data[6, :] >= end_ts - ts)[0][0]
except IndexError:
end_sample = len(self.timestamp)
return self.cut_by_samples(start_sample, end_sample)
[docs]
def cut_by_dates(self, start_date: str, end_date: str | None) -> "RawGeneActiv":
"""Create new object with the data cut by given dates.
Args:
start_date (str): ISO-formated date of outputa data beginning.
end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data.
Returns:
RawGeneActiv: Object containing the cutted data.
"""
start_ts = parse_datetime(start_date).timestamp()
if end_date is not None:
end_ts = parse_datetime(end_date).timestamp()
else:
end_ts = None
return self.cut_by_timestamp(start_ts, end_ts)
@staticmethod
def _convert_timestamp(timestamp: float, metadata: dict, to_unix: bool = False) -> float:
ts = metadata['Start Time']
tz = metadata['Time Zone'][3:].strip()
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
if to_unix:
return timestamp + ts
else:
return timestamp - ts
@staticmethod
def _export_csv_metadata(metadata, writer):
def write_sensor_data(axis):
writer.writerow(['Sensor type'] + [axis])
writer.writerow(['Range'] + [metadata[axis + ' Range']])
try:
writer.writerow(['Resolution'] + [metadata[axis + ' Resolution']])
writer.writerow(['Units'] + [metadata[axis + ' Units']])
except KeyError:
writer.writerow(['Resolution'])
writer.writerow(['Units'])
if axis + ' Additional information' in metadata.keys():
writer.writerow(['Additional information'] + [metadata[axis + ' Additional information']])
else:
writer.writerow(['Additional information'])
# writer = csv.writer(f, quoting=csv.QUOTE_NONE, quotechar='', escapechar='*') # Some fields have , (which is also a delimiter) in them, so this line adds * before , and then it works
for e in ['Device Type', 'Device Model', 'Device Unique Serial Code', 'Device Firmware Version', 'Calibration Date']:
writer.writerow([e, metadata[e]])
writer.writerow(['Application name & version ', 'ADA v1.0.0'])
for i in range(4):
writer.writerow('')
for e in ['Measurement Frequency', 'Start Time', 'Last measurement', 'Device Location Code', 'Time Zone']:
writer.writerow([e, metadata[e]])
for i in range(5):
writer.writerow('')
for e in ['Subject Code', 'Date of Birth', 'Sex', 'Height', 'Weight', 'Handedness Code', 'Subject Notes']:
writer.writerow([e, metadata[e]])
for i in range(3):
writer.writerow('')
for e in ['Study Centre', 'Study Code', 'Investigator ID', 'Exercise Type', 'Config Operator ID', 'Config Time', 'Config Notes', 'Extract Operator ID', 'Extract Time', 'Extract Notes']:
writer.writerow([e, metadata[e]])
for i in range(10):
writer.writerow('')
for sensor in ['MEMS accelerometer x-axis', 'MEMS accelerometer y-axis', 'MEMS accelerometer z-axis', 'Lux Photodiode 400nm - 1100nm', 'User button event marker', 'Linear active thermistor']:
write_sensor_data(sensor)
for i in range(20):
writer.writerow('')
@staticmethod
def _export_bin_metadata(metadata, writer):
writer.writerow(['Device Type', metadata['Device Type']])
writer.writerow(['Device Model', metadata['Device Model']])
writer.writerow(['Device Unique Serial Code', metadata['Device Unique Serial Code']])
writer.writerow(['Device Firmware Version', metadata['Device Firmware Version']])
writer.writerow(['Calibration Date', metadata['Calibration Date']])
writer.writerow(['Application name & version', 'ADA v1.0.0'])
for i in range(4):
writer.writerow('')
writer.writerow(['Measurement Frequency', metadata['Measurement Frequency']])
writer.writerow(['Start Time', metadata['Start Time']])
writer.writerow(['Last measurement', ''])
writer.writerow(['Device Location Code', ''])
writer.writerow(['Time Zone', metadata['Time Zone']])
for i in range(5):
writer.writerow('')
writer.writerow(['Subject Code', metadata['Subject Code']])
writer.writerow(['Date of Birth', metadata['Date of Birth']])
writer.writerow(['Sex', metadata['Sex']])
writer.writerow(['Height', metadata['Height']])
writer.writerow(['Weight', metadata['Weight']])
writer.writerow(['Handedness Code', metadata['Handedness Code']])
writer.writerow(['Subject Notes', metadata['Subject Notes']])
for i in range(3):
writer.writerow('')
writer.writerow(['Study Centre', metadata['Study Centre']])
writer.writerow(['Study Code', metadata['Study Code']])
writer.writerow(['Investigator ID', metadata['Investigator ID']])
writer.writerow(['Exercise Type', metadata['Exercise Type']])
writer.writerow(['Config Operator ID', metadata['Config Operator ID']])
writer.writerow(['Config Time', metadata['Config Time']])
writer.writerow(['Config Notes', metadata['Config Notes']])
writer.writerow(['Extract Operator ID', metadata['Extract Operator ID']])
writer.writerow(['Extract Time', metadata['Extract Time']])
writer.writerow(['Extract Notes', metadata['Extract Notes']])
for i in range(10):
writer.writerow('')
for e in ['x', 'y', 'z']:
writer.writerow(['Sensor type', 'MEMS accelerometer ' + e + '-axis'])
writer.writerow(['Range', metadata['Accelerometer Range']])
writer.writerow(['Resolution', metadata['Accelerometer Resolution']])
writer.writerow(['Units', metadata['Accelerometer Units']])
writer.writerow(['Additional information'])
writer.writerow(['Sensor type', 'Lux Photodiode 400nm - 1100nm'])
writer.writerow(['Range', metadata['Light Meter Range']])
writer.writerow(['Resolution', metadata['Light Meter Resolution']])
writer.writerow(['Units', metadata['Light Meter Units']])
writer.writerow(['Additional information'])
writer.writerow(['Sensor type', 'User button event marker'])
writer.writerow(['Range', '1 or 0'])
writer.writerow(['Resolution'])
writer.writerow(['Units'])
writer.writerow(['Additional information', '1=pressed'])
writer.writerow(['Sensor type', 'Linear active thermistor'])
writer.writerow(['Range', metadata['Temperature Sensor Range']])
writer.writerow(['Resolution', metadata['Temperature Sensor Resolution']])
writer.writerow(['Units', metadata['Temperature Sensor Units']])
writer.writerow(['Additional information'])
for i in range(20):
writer.writerow('')
def _export_csv(self, path: str):
with open(path, 'w', newline='') as f:
writer = csv.writer(f)
RawGeneActiv._export_csv_metadata(self.metadata, writer)
ts = self._metadata['Start Time']
tz = self._metadata['Time Zone'][3:].strip()
sign = -1 if tz[0] == '-' else 1
if len(tz.split(':')) == 1:
tzinfo = timezone(sign * timedelta(hours=int(tz[1:])))
else:
temp = tz[1:].split(':')
tzinfo = timezone(sign * timedelta(hours=int(temp[0]), minutes=int(temp[1])))
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
pbar = tqdm(total=self._data.shape[1], position=0, leave=True, desc='Saving to file: {}'.format(path), dynamic_ncols=True)
for i in range(self._data.shape[1]):
row = self._data[:, i]
date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3]
writer.writerow([date] + list(row[:6]))
pbar.update(1)
pbar.close()
def _export_bin(self, file: str):
with open(file, 'w', newline='') as f:
# writer = csv.writer(f, quoting=csv.QUOTE_NONE, quotechar='', escapechar='*') # Some fields have , (which is also a delimiter) in them, so this line adds * before , and then it works
writer = csv.writer(f)
RawGeneActiv._export_bin_metadata(self._metadata, writer)
ts = self._metadata['Start Time']
tz = self._metadata['Time Zone'][3:].strip()
sign = -1 if tz[0] == '-' else 1
hours, minutes = tz[1:].split(':')
tzinfo = timezone(sign * timedelta(hours=int(hours), minutes=int(minutes)))
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
pbar = tqdm(total=self._data.shape[1], position=0, leave=True, desc='Saving to file: {}'.format(file), dynamic_ncols=True)
for i in range(self._data.shape[1]):
row = self._data[:, i]
date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3]
writer.writerow([date] + list(row[:6]))
pbar.update(1)
pbar.close()
f.close()
@staticmethod
def _read_bin(path: str) -> Tuple[np.ndarray, dict, float, list]:
metadata = {}
with open(path, 'rb') as f:
for i in range(59): # lines contain metadata
line = (f.readline()).decode('utf-8').strip()
if ':' in line:
key, value = line.split(':', 1)
metadata[key] = value.strip().replace('\x00', '')
fs = float(metadata['Measurement Frequency'][:-3])
delta = np.linspace(0, 1 / fs * 299, 300) # delta for time stamp interpolation
NP = int(metadata['Number of Pages']) # number of data blocks
data = np.zeros((7, NP * 300))
channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp']
tz = metadata['Time Zone'][3:].strip()
TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp()
c = 0 # counter for data
for i in tqdm(range(NP), position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True):
# useless lines
for k in range(3):
_ = f.readline()
# timestamp of data block
line = f.readline().strip().decode('utf-8')
temp_line_to_parse = line[10:].rsplit(':', 1)[-2] + '.' + line[10:].rsplit(':', 1)[-1]
d = parse_datetime(temp_line_to_parse + tz).timestamp() - TS # timestamp of first sample != timestamp of Start time??? Same problem is in file, its not bug in the code, but some problem with .bin file???
# useless line
_ = f.readline()
# temperature
line = f.readline()[:-2].decode('utf-8')
temperature = float(line.split(':', 1)[1])
# temperature resampling
data[5, c:c + 300] = temperature
# time stamp interpolation
data[6, c:c + 300] = d + delta
# useless lines
for k in range(3):
_ = f.readline()
# data block
line_hex = f.readline().strip()
hexes = struct.unpack("12s " * 300, line_hex)
for e in hexes:
dataChunk = struct.unpack("12s 12s 12s 10s 1s 1s", bytes(bin(int(e, 16))[2:].zfill(48), 'utf-8'))
data[0, c] = int(dataChunk[0], 2) # x
data[1, c] = int(dataChunk[1], 2) # y
data[2, c] = int(dataChunk[2], 2) # z
data[3, c] = int(dataChunk[3], 2) # lux
data[4, c] = int(dataChunk[4], 2) # button
c += 1
# Converting unsigned 12-bit data into 12-bit signed data
data[0, data[0, :] >= 2048] = data[0, data[0, :] >= 2048] - 4096
data[1, data[1, :] >= 2048] = data[1, data[1, :] >= 2048] - 4096
data[2, data[2, :] >= 2048] = data[2, data[2, :] >= 2048] - 4096
# And calculating values in g (or lux) using calibration parameters
data[0, :] = (data[0, :] * 100 - int(metadata['x offset'])) / int(metadata['x gain'])
data[1, :] = (data[1, :] * 100 - int(metadata['y offset'])) / int(metadata['y gain'])
data[2, :] = (data[2, :] * 100 - int(metadata['z offset'])) / int(metadata['z gain'])
data[3, :] = data[3, :] * int(metadata['Lux']) / int(metadata['Volts'])
return data, metadata, fs, channel_names
@staticmethod
def _read_csv(path: str) -> Tuple[np.ndarray, dict, float, list]:
metadata = {}
with open(path) as f:
for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data
row = f.readline()
if row != '\n' and ',' in row:
key, value = row.split(',', 1)
if 'Sensor type' in key:
sensor = value[:-1]
for i in range(4):
row = f.readline()
if ',' in row:
key, value = row.split(',', 1)
key = sensor.strip() + ' ' + key.strip()
metadata[key] = value[:-1].replace('\0', '').strip()
else:
metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values
channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp']
tz = metadata['Time Zone'][3:].strip()
TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp()
data = np.loadtxt(path, skiprows=100, usecols=(1, 2, 3, 4, 5, 6), delimiter=',').T
if data.shape[0] != 6:
raise ValueError("Trying to read epoched data? Use GeneActivMVM instead.")
ts = np.loadtxt(path, skiprows=100, usecols=0, dtype=str, delimiter=',')
ts = np.array([parse_datetime(x[:-4] + '.' + x[-3:] + '000' + tz).timestamp() - TS for x in tqdm(ts, position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True)])
data = np.concatenate((data, ts.reshape(1, -1)), axis=0)
fs = float(metadata['Measurement Frequency'][:-3])
return data, metadata, fs, channel_names
@staticmethod
def _preview_bin_metadata(path: str) -> tuple[dict, list[str]]:
metadata = {}
with open(path, 'rb') as f:
for i in range(59): # lines contain metadata
line = (f.readline()).decode('utf-8').strip()
if ':' in line:
key, value = line.split(':', 1)
metadata[key] = value.strip().replace('\x00', '')
channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp']
return metadata, channel_names
@staticmethod
def _preview_csv_metadata(path: str) -> tuple[dict, list[str]]:
metadata = {}
with open(path) as f:
for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data
row = f.readline()
if row != '\n' and ',' in row:
key, value = row.split(',', 1)
if 'Sensor type' in key:
sensor = value[:-1]
for i in range(4):
row = f.readline()
if ',' in row:
key, value = row.split(',', 1)
key = sensor.strip() + ' ' + key.strip()
metadata[key] = value[:-1].replace('\0', '').strip()
else:
metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values
channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp']
return metadata, channel_names
[docs]
@dataclass(slots=True, eq=False)
class GeneActivMVM(_Epoched):
"""A class for storing and handling epoched data generated by GeneActiv-style MVM method."""
def __repr__(self):
str1 = pprint.pformat(self._metadata, indent=4)
str2 = pprint.pformat(self._epoching_method_metadata, indent=4)
return 'Raw metadata:\n{}\n\nEpoching metadata:\n{}'.format(str1, str2)
@property
def x(self) -> np.ndarray:
"""Mean acceleration (in g) along x axis."""
return self._data[0]
@property
def y(self) -> np.ndarray:
"""Epoch-wise mean acceleration (in g) along y axis."""
return self._data[1]
@property
def z(self) -> np.ndarray:
"""Epoch-wise mean acceleration (in g) along z axis."""
return self._data[2]
@property
def lux(self) -> np.ndarray:
"""Epoch-wise mean lightmeter values in lux."""
return self._data[3]
@property
def button(self) -> np.ndarray:
"""Sum of button values (1=pressed) over epochs."""
return self._data[4]
@property
def temperature(self) -> np.ndarray:
"""Epoch-wise mean recorded temperature in Celsius degrees."""
return self._data[5]
@property
def timestamp(self) -> np.ndarray:
"""Timestamp of the epoch's end."""
return self._data[6]
@property
def mvm(self) -> np.ndarray:
"""Mean vector magnitude calculated as sum of vlens in the epoch."""
return self._data[7]
@property
def to_score(self) -> np.ndarray:
return self.mvm
@property
def x_std(self) -> np.ndarray:
"""Epoch-wise standard deviation along x axis."""
return self._data[8]
@property
def y_std(self) -> np.ndarray:
"""Epoch-wise standard deviation along y axis."""
return self._data[9]
@property
def z_std(self) -> np.ndarray:
"""Epoch-wise standard
deviation along z axis."""
return self._data[10]
@property
def peak_lux(self) -> np.ndarray:
"""Maximum lightmeter value for each epoch (in lux)."""
return self._data[11]
@property
def first_sample_timestamp(self) -> float:
"""Unix timestamp of first sample."""
return RawGeneActiv._convert_timestamp(self.timestamp[0], self._metadata, True)
@property
def id(self) -> str:
return self.metadata['Subject Code']
[docs]
@staticmethod
def load_file(path: str) -> "GeneActivMVM":
"""Loading .csv file with epoched data generated by the GeneActiv software or the MVM method.
Args:
path (str): Path .csv file.
Raises:
ValueError: Unsupported file format otherwise.
Returns:
GeneActivMVM: Object containing data.
"""
metadata = {}
with open(path) as f:
for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data
row = f.readline()
if row != '\n' and ',' in row:
key, value = row.split(',', 1)
if 'Sensor type' in key:
sensor = value[:-1]
for i in range(4):
row = f.readline()
if ',' in row:
key, value = row.split(',', 1)
key = sensor.strip() + ' ' + key.strip()
metadata[key] = value[:-1].replace('\0', '').strip()
else:
metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values
channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp', 'mvm', 'x_std', 'y_std', 'z_std', 'peak_lux']
tz = metadata['Time Zone'][3:].strip()
TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp()
data = np.loadtxt(path, skiprows=100, usecols=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), delimiter=',').T
if data.shape[0] != 11:
raise ValueError("Trying to read non-epoched, raw data? Use RawGeneactiv instead.")
ts = np.loadtxt(path, skiprows=100, usecols=0, dtype=str, delimiter=',')
ts = np.array([parse_datetime(x[:-4] + '.' + x[-3:] + '000' + tz).timestamp() - TS for x in tqdm(ts, position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True)])
data = np.concatenate((data[:6], ts.reshape(1, -1), data[6:]), axis=0)
fs = 1 / (ts[1] - ts[0])
epoching_method_metadata = {'epoching method': 'Geneactiv-like epoching', 'epoch length': ts[1] - ts[0], 'to_score channel': 7}
return GeneActivMVM(data, metadata, fs, channel_names, epoching_method_metadata)
[docs]
def export(self, path: str):
"""Export object to .csv file fully resembling the manufacturer .csv format.
Args:
path (str): Path to .csv file to which data will be exported.
"""
with open(path, 'w', newline='') as f:
writer = csv.writer(f)
if 'Number of Pages' in self._metadata.keys():
RawGeneActiv._export_bin_metadata(self._metadata, writer)
else:
RawGeneActiv._export_csv_metadata(self._metadata, writer)
ts = self._metadata['Start Time']
tz = self._metadata['Time Zone'][3:].strip()
sign = -1 if tz[0] == '-' else 1
if len(tz.split(':')) == 1:
tzinfo = timezone(sign * timedelta(hours=int(tz[1:])))
else:
temp = tz[1:].split(':')
tzinfo = timezone(sign * timedelta(hours=int(temp[0]), minutes=int(temp[1])))
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
for i in tqdm(range(self._data.shape[1]), position=0, leave=True, desc='Saving to file: {}'.format(path), dynamic_ncols=True):
row = self._data[:, i]
date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3]
writer.writerow([date] + list(row[:6]) + list(row[7:]))
[docs]
def cut_by_samples(self, start_sample: int, end_sample: int) -> "GeneActivMVM":
"""Create new object with the data cut by given indexes.
Args:
start_sample (int): First sample of output data.
end_sample (int): Sample after the last sample of output data.
Returns:
GeneActivMVM: Object containing the cutted data.
"""
return GeneActivMVM(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._channel_names, self._epoching_method_metadata)
[docs]
def cut_by_timestamp(self, start_ts: float, end_ts: float | None = None) -> "GeneActivMVM":
"""Create new object with the data cut by given timestamps.
Args:
start_ts (float): Unix timestamp of output data beginning.
end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data.
Returns:
GeneActivMVM: Object containing the cutted data.
"""
ts = self._metadata['Start Time']
tz = self._metadata['Time Zone'][3:].strip()
ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp()
start_sample = np.where(self._data[6, :] >= start_ts - ts)[0][0]
if end_ts is None:
end_ts = ts + float(self._data[6, -1])
try:
end_sample = np.where(self._data[6, :] >= end_ts - ts)[0][0]
except IndexError:
end_sample = len(self.timestamp)
return self.cut_by_samples(start_sample, end_sample)
[docs]
def cut_by_dates(self, start_date: str, end_date: str | None = None) -> "GeneActivMVM":
"""Create new object with the data cut by given dates.
Args:
start_date (str): ISO-formated date of outputa data beginning.
end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data.
Returns:
GeneActivMVM: Object containing the cutted data.
"""
start_ts = parse_datetime(start_date).timestamp()
if end_date is not None:
end_ts = parse_datetime(end_date).timestamp()
else:
end_ts = None
return self.cut_by_timestamp(start_ts, end_ts)