Source code for ada.io.geneactiv

# -*- coding: utf-8 -*-
"""Module for reading and cutting GeneActive data."""

import numpy as np
from ciso8601 import parse_datetime
import struct
from tqdm import tqdm
import pprint
import csv
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from typing import Tuple, ClassVar

from ada.data_containers._base import _Raw, _Epoched


[docs] @dataclass(slots=True, eq=False) class RawGeneActiv(_Raw): """A class for storing and handling raw data generated by GeneActiv.""" _stationary_variance: ClassVar[float] = 0.0004563 _dynamic_range: ClassVar[float] = 8.0 def __repr__(self): return "Raw metadata:\n{}".format(pprint.pformat(self._metadata, indent=4)) @property def stationary_variance(self) -> float: return self._stationary_variance @property def dynamic_range(self) -> float: return self._dynamic_range @property def x(self) -> np.ndarray: return self._data[0] @property def y(self) -> np.ndarray: return self._data[1] @property def z(self) -> np.ndarray: return self._data[2] @property def lux(self) -> np.ndarray: """Lightmeter values in lux.""" return self._data[3] @property def button(self) -> np.ndarray: """Button values (1=pressed).""" return self._data[4] @property def temperature(self) -> np.ndarray: """Recorded temperature in Celsius degrees.""" return self._data[5] @property def timestamp(self) -> np.ndarray: return self._data[6] @property def vlen(self) -> np.ndarray: return np.abs(np.sqrt(self._data[0] ** 2 + self._data[1] ** 2 + self._data[2] ** 2) - 1) @property def to_score(self) -> np.ndarray: """Data to be scored by scoring algorithms. Here equal to vlen.""" return self.vlen @property def id(self) -> str: return self.metadata['Subject Code']
[docs] @staticmethod def load_file(path: str) -> "RawGeneActiv": """Loading file generated by the GeneActiv device. Args: path (str): Path to .bin or .csv file. Raises: ValueError: Unsupported file format otherwise. Returns: RawGeneActiv: Object containing data. """ if path[-3:] == 'csv': data, metadata, fs, channel_names = RawGeneActiv._read_csv(path) return RawGeneActiv(data, metadata, fs, channel_names) else: try: data, metadata, fs, channel_names = RawGeneActiv._read_bin(path) return RawGeneActiv(data, metadata, fs, channel_names) except Exception as e: print(e) raise ValueError('Unsupported file format')
[docs] def export(self, path: str): """Export object to .csv file fully resembling the manufacturer .csv format. Args: path (str): Path to .csv file to which data will be exported. """ if 'Number of Pages' in self._metadata.keys(): self._export_bin(path) else: self._export_csv(path)
[docs] def cut_by_samples(self, start_sample: int, end_sample: int) -> "RawGeneActiv": """Create new object with the data cut by given indexes. Args: start_sample (int): First sample of output data. end_sample (int): Sample after the last sample of output data. Returns: RawGeneActiv: Object containing the cutted data. """ return RawGeneActiv(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._channel_names)
[docs] def cut_by_timestamp(self, start_ts: float, end_ts: float | None) -> "RawGeneActiv": """Create new object with the data cut by given timestamps. Args: start_ts (float): Unix timestamp of output data beginning. end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data. Returns: RawGeneActiv: Object containing the cutted data. """ ts = self._metadata['Start Time'] tz = self._metadata['Time Zone'][3:].strip() ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() start_sample = np.where(self._data[6, :] >= start_ts - ts)[0][0] if end_ts is None: end_ts = ts + float(self._data[6, -1]) try: end_sample = np.where(self._data[6, :] >= end_ts - ts)[0][0] except IndexError: end_sample = len(self.timestamp) return self.cut_by_samples(start_sample, end_sample)
[docs] def cut_by_dates(self, start_date: str, end_date: str | None) -> "RawGeneActiv": """Create new object with the data cut by given dates. Args: start_date (str): ISO-formated date of outputa data beginning. end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data. Returns: RawGeneActiv: Object containing the cutted data. """ start_ts = parse_datetime(start_date).timestamp() if end_date is not None: end_ts = parse_datetime(end_date).timestamp() else: end_ts = None return self.cut_by_timestamp(start_ts, end_ts)
@staticmethod def _convert_timestamp(timestamp: float, metadata: dict, to_unix: bool = False) -> float: ts = metadata['Start Time'] tz = metadata['Time Zone'][3:].strip() ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() if to_unix: return timestamp + ts else: return timestamp - ts @staticmethod def _export_csv_metadata(metadata, writer): def write_sensor_data(axis): writer.writerow(['Sensor type'] + [axis]) writer.writerow(['Range'] + [metadata[axis + ' Range']]) try: writer.writerow(['Resolution'] + [metadata[axis + ' Resolution']]) writer.writerow(['Units'] + [metadata[axis + ' Units']]) except KeyError: writer.writerow(['Resolution']) writer.writerow(['Units']) if axis + ' Additional information' in metadata.keys(): writer.writerow(['Additional information'] + [metadata[axis + ' Additional information']]) else: writer.writerow(['Additional information']) # writer = csv.writer(f, quoting=csv.QUOTE_NONE, quotechar='', escapechar='*') # Some fields have , (which is also a delimiter) in them, so this line adds * before , and then it works for e in ['Device Type', 'Device Model', 'Device Unique Serial Code', 'Device Firmware Version', 'Calibration Date']: writer.writerow([e, metadata[e]]) writer.writerow(['Application name & version ', 'ADA v1.0.0']) for i in range(4): writer.writerow('') for e in ['Measurement Frequency', 'Start Time', 'Last measurement', 'Device Location Code', 'Time Zone']: writer.writerow([e, metadata[e]]) for i in range(5): writer.writerow('') for e in ['Subject Code', 'Date of Birth', 'Sex', 'Height', 'Weight', 'Handedness Code', 'Subject Notes']: writer.writerow([e, metadata[e]]) for i in range(3): writer.writerow('') for e in ['Study Centre', 'Study Code', 'Investigator ID', 'Exercise Type', 'Config Operator ID', 'Config Time', 'Config Notes', 'Extract Operator ID', 'Extract Time', 'Extract Notes']: writer.writerow([e, metadata[e]]) for i in range(10): writer.writerow('') for sensor in ['MEMS accelerometer x-axis', 'MEMS accelerometer y-axis', 'MEMS accelerometer z-axis', 'Lux Photodiode 400nm - 1100nm', 'User button event marker', 'Linear active thermistor']: write_sensor_data(sensor) for i in range(20): writer.writerow('') @staticmethod def _export_bin_metadata(metadata, writer): writer.writerow(['Device Type', metadata['Device Type']]) writer.writerow(['Device Model', metadata['Device Model']]) writer.writerow(['Device Unique Serial Code', metadata['Device Unique Serial Code']]) writer.writerow(['Device Firmware Version', metadata['Device Firmware Version']]) writer.writerow(['Calibration Date', metadata['Calibration Date']]) writer.writerow(['Application name & version', 'ADA v1.0.0']) for i in range(4): writer.writerow('') writer.writerow(['Measurement Frequency', metadata['Measurement Frequency']]) writer.writerow(['Start Time', metadata['Start Time']]) writer.writerow(['Last measurement', '']) writer.writerow(['Device Location Code', '']) writer.writerow(['Time Zone', metadata['Time Zone']]) for i in range(5): writer.writerow('') writer.writerow(['Subject Code', metadata['Subject Code']]) writer.writerow(['Date of Birth', metadata['Date of Birth']]) writer.writerow(['Sex', metadata['Sex']]) writer.writerow(['Height', metadata['Height']]) writer.writerow(['Weight', metadata['Weight']]) writer.writerow(['Handedness Code', metadata['Handedness Code']]) writer.writerow(['Subject Notes', metadata['Subject Notes']]) for i in range(3): writer.writerow('') writer.writerow(['Study Centre', metadata['Study Centre']]) writer.writerow(['Study Code', metadata['Study Code']]) writer.writerow(['Investigator ID', metadata['Investigator ID']]) writer.writerow(['Exercise Type', metadata['Exercise Type']]) writer.writerow(['Config Operator ID', metadata['Config Operator ID']]) writer.writerow(['Config Time', metadata['Config Time']]) writer.writerow(['Config Notes', metadata['Config Notes']]) writer.writerow(['Extract Operator ID', metadata['Extract Operator ID']]) writer.writerow(['Extract Time', metadata['Extract Time']]) writer.writerow(['Extract Notes', metadata['Extract Notes']]) for i in range(10): writer.writerow('') for e in ['x', 'y', 'z']: writer.writerow(['Sensor type', 'MEMS accelerometer ' + e + '-axis']) writer.writerow(['Range', metadata['Accelerometer Range']]) writer.writerow(['Resolution', metadata['Accelerometer Resolution']]) writer.writerow(['Units', metadata['Accelerometer Units']]) writer.writerow(['Additional information']) writer.writerow(['Sensor type', 'Lux Photodiode 400nm - 1100nm']) writer.writerow(['Range', metadata['Light Meter Range']]) writer.writerow(['Resolution', metadata['Light Meter Resolution']]) writer.writerow(['Units', metadata['Light Meter Units']]) writer.writerow(['Additional information']) writer.writerow(['Sensor type', 'User button event marker']) writer.writerow(['Range', '1 or 0']) writer.writerow(['Resolution']) writer.writerow(['Units']) writer.writerow(['Additional information', '1=pressed']) writer.writerow(['Sensor type', 'Linear active thermistor']) writer.writerow(['Range', metadata['Temperature Sensor Range']]) writer.writerow(['Resolution', metadata['Temperature Sensor Resolution']]) writer.writerow(['Units', metadata['Temperature Sensor Units']]) writer.writerow(['Additional information']) for i in range(20): writer.writerow('') def _export_csv(self, path: str): with open(path, 'w', newline='') as f: writer = csv.writer(f) RawGeneActiv._export_csv_metadata(self.metadata, writer) ts = self._metadata['Start Time'] tz = self._metadata['Time Zone'][3:].strip() sign = -1 if tz[0] == '-' else 1 if len(tz.split(':')) == 1: tzinfo = timezone(sign * timedelta(hours=int(tz[1:]))) else: temp = tz[1:].split(':') tzinfo = timezone(sign * timedelta(hours=int(temp[0]), minutes=int(temp[1]))) ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() pbar = tqdm(total=self._data.shape[1], position=0, leave=True, desc='Saving to file: {}'.format(path), dynamic_ncols=True) for i in range(self._data.shape[1]): row = self._data[:, i] date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3] writer.writerow([date] + list(row[:6])) pbar.update(1) pbar.close() def _export_bin(self, file: str): with open(file, 'w', newline='') as f: # writer = csv.writer(f, quoting=csv.QUOTE_NONE, quotechar='', escapechar='*') # Some fields have , (which is also a delimiter) in them, so this line adds * before , and then it works writer = csv.writer(f) RawGeneActiv._export_bin_metadata(self._metadata, writer) ts = self._metadata['Start Time'] tz = self._metadata['Time Zone'][3:].strip() sign = -1 if tz[0] == '-' else 1 hours, minutes = tz[1:].split(':') tzinfo = timezone(sign * timedelta(hours=int(hours), minutes=int(minutes))) ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() pbar = tqdm(total=self._data.shape[1], position=0, leave=True, desc='Saving to file: {}'.format(file), dynamic_ncols=True) for i in range(self._data.shape[1]): row = self._data[:, i] date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3] writer.writerow([date] + list(row[:6])) pbar.update(1) pbar.close() f.close() @staticmethod def _read_bin(path: str) -> Tuple[np.ndarray, dict, float, list]: metadata = {} with open(path, 'rb') as f: for i in range(59): # lines contain metadata line = (f.readline()).decode('utf-8').strip() if ':' in line: key, value = line.split(':', 1) metadata[key] = value.strip().replace('\x00', '') fs = float(metadata['Measurement Frequency'][:-3]) delta = np.linspace(0, 1 / fs * 299, 300) # delta for time stamp interpolation NP = int(metadata['Number of Pages']) # number of data blocks data = np.zeros((7, NP * 300)) channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp'] tz = metadata['Time Zone'][3:].strip() TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp() c = 0 # counter for data for i in tqdm(range(NP), position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True): # useless lines for k in range(3): _ = f.readline() # timestamp of data block line = f.readline().strip().decode('utf-8') temp_line_to_parse = line[10:].rsplit(':', 1)[-2] + '.' + line[10:].rsplit(':', 1)[-1] d = parse_datetime(temp_line_to_parse + tz).timestamp() - TS # timestamp of first sample != timestamp of Start time??? Same problem is in file, its not bug in the code, but some problem with .bin file??? # useless line _ = f.readline() # temperature line = f.readline()[:-2].decode('utf-8') temperature = float(line.split(':', 1)[1]) # temperature resampling data[5, c:c + 300] = temperature # time stamp interpolation data[6, c:c + 300] = d + delta # useless lines for k in range(3): _ = f.readline() # data block line_hex = f.readline().strip() hexes = struct.unpack("12s " * 300, line_hex) for e in hexes: dataChunk = struct.unpack("12s 12s 12s 10s 1s 1s", bytes(bin(int(e, 16))[2:].zfill(48), 'utf-8')) data[0, c] = int(dataChunk[0], 2) # x data[1, c] = int(dataChunk[1], 2) # y data[2, c] = int(dataChunk[2], 2) # z data[3, c] = int(dataChunk[3], 2) # lux data[4, c] = int(dataChunk[4], 2) # button c += 1 # Converting unsigned 12-bit data into 12-bit signed data data[0, data[0, :] >= 2048] = data[0, data[0, :] >= 2048] - 4096 data[1, data[1, :] >= 2048] = data[1, data[1, :] >= 2048] - 4096 data[2, data[2, :] >= 2048] = data[2, data[2, :] >= 2048] - 4096 # And calculating values in g (or lux) using calibration parameters data[0, :] = (data[0, :] * 100 - int(metadata['x offset'])) / int(metadata['x gain']) data[1, :] = (data[1, :] * 100 - int(metadata['y offset'])) / int(metadata['y gain']) data[2, :] = (data[2, :] * 100 - int(metadata['z offset'])) / int(metadata['z gain']) data[3, :] = data[3, :] * int(metadata['Lux']) / int(metadata['Volts']) return data, metadata, fs, channel_names @staticmethod def _read_csv(path: str) -> Tuple[np.ndarray, dict, float, list]: metadata = {} with open(path) as f: for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data row = f.readline() if row != '\n' and ',' in row: key, value = row.split(',', 1) if 'Sensor type' in key: sensor = value[:-1] for i in range(4): row = f.readline() if ',' in row: key, value = row.split(',', 1) key = sensor.strip() + ' ' + key.strip() metadata[key] = value[:-1].replace('\0', '').strip() else: metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp'] tz = metadata['Time Zone'][3:].strip() TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp() data = np.loadtxt(path, skiprows=100, usecols=(1, 2, 3, 4, 5, 6), delimiter=',').T if data.shape[0] != 6: raise ValueError("Trying to read epoched data? Use GeneActivMVM instead.") ts = np.loadtxt(path, skiprows=100, usecols=0, dtype=str, delimiter=',') ts = np.array([parse_datetime(x[:-4] + '.' + x[-3:] + '000' + tz).timestamp() - TS for x in tqdm(ts, position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True)]) data = np.concatenate((data, ts.reshape(1, -1)), axis=0) fs = float(metadata['Measurement Frequency'][:-3]) return data, metadata, fs, channel_names
[docs] @staticmethod def preview_metadata(path: str) -> tuple[dict, list[str]]: """Preview of metadata without loading the file. Args: path (str): Path to the file Raises: ValueError: Wrong format Returns: tuple[dict, list[str]]: Metadata and channel names. """ if path[-3:] == 'csv': return RawGeneActiv._preview_csv_metadata(path) else: try: return RawGeneActiv._preview_bin_metadata(path) except Exception as e: print(e) raise ValueError('Unsupported file format')
@staticmethod def _preview_bin_metadata(path: str) -> tuple[dict, list[str]]: metadata = {} with open(path, 'rb') as f: for i in range(59): # lines contain metadata line = (f.readline()).decode('utf-8').strip() if ':' in line: key, value = line.split(':', 1) metadata[key] = value.strip().replace('\x00', '') channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp'] return metadata, channel_names @staticmethod def _preview_csv_metadata(path: str) -> tuple[dict, list[str]]: metadata = {} with open(path) as f: for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data row = f.readline() if row != '\n' and ',' in row: key, value = row.split(',', 1) if 'Sensor type' in key: sensor = value[:-1] for i in range(4): row = f.readline() if ',' in row: key, value = row.split(',', 1) key = sensor.strip() + ' ' + key.strip() metadata[key] = value[:-1].replace('\0', '').strip() else: metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp'] return metadata, channel_names
[docs] @dataclass(slots=True, eq=False) class GeneActivMVM(_Epoched): """A class for storing and handling epoched data generated by GeneActiv-style MVM method.""" def __repr__(self): str1 = pprint.pformat(self._metadata, indent=4) str2 = pprint.pformat(self._epoching_method_metadata, indent=4) return 'Raw metadata:\n{}\n\nEpoching metadata:\n{}'.format(str1, str2) @property def x(self) -> np.ndarray: """Mean acceleration (in g) along x axis.""" return self._data[0] @property def y(self) -> np.ndarray: """Epoch-wise mean acceleration (in g) along y axis.""" return self._data[1] @property def z(self) -> np.ndarray: """Epoch-wise mean acceleration (in g) along z axis.""" return self._data[2] @property def lux(self) -> np.ndarray: """Epoch-wise mean lightmeter values in lux.""" return self._data[3] @property def button(self) -> np.ndarray: """Sum of button values (1=pressed) over epochs.""" return self._data[4] @property def temperature(self) -> np.ndarray: """Epoch-wise mean recorded temperature in Celsius degrees.""" return self._data[5] @property def timestamp(self) -> np.ndarray: """Timestamp of the epoch's end.""" return self._data[6] @property def mvm(self) -> np.ndarray: """Mean vector magnitude calculated as sum of vlens in the epoch.""" return self._data[7] @property def to_score(self) -> np.ndarray: return self.mvm @property def x_std(self) -> np.ndarray: """Epoch-wise standard deviation along x axis.""" return self._data[8] @property def y_std(self) -> np.ndarray: """Epoch-wise standard deviation along y axis.""" return self._data[9] @property def z_std(self) -> np.ndarray: """Epoch-wise standard deviation along z axis.""" return self._data[10] @property def peak_lux(self) -> np.ndarray: """Maximum lightmeter value for each epoch (in lux).""" return self._data[11] @property def first_sample_timestamp(self) -> float: """Unix timestamp of first sample.""" return RawGeneActiv._convert_timestamp(self.timestamp[0], self._metadata, True) @property def id(self) -> str: return self.metadata['Subject Code']
[docs] @staticmethod def load_file(path: str) -> "GeneActivMVM": """Loading .csv file with epoched data generated by the GeneActiv software or the MVM method. Args: path (str): Path .csv file. Raises: ValueError: Unsupported file format otherwise. Returns: GeneActivMVM: Object containing data. """ metadata = {} with open(path) as f: for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data row = f.readline() if row != '\n' and ',' in row: key, value = row.split(',', 1) if 'Sensor type' in key: sensor = value[:-1] for i in range(4): row = f.readline() if ',' in row: key, value = row.split(',', 1) key = sensor.strip() + ' ' + key.strip() metadata[key] = value[:-1].replace('\0', '').strip() else: metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp', 'mvm', 'x_std', 'y_std', 'z_std', 'peak_lux'] tz = metadata['Time Zone'][3:].strip() TS = parse_datetime(metadata['Start Time'][:-4] + '.' + metadata['Start Time'][-3:] + '000' + tz).timestamp() data = np.loadtxt(path, skiprows=100, usecols=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), delimiter=',').T if data.shape[0] != 11: raise ValueError("Trying to read non-epoched, raw data? Use RawGeneactiv instead.") ts = np.loadtxt(path, skiprows=100, usecols=0, dtype=str, delimiter=',') ts = np.array([parse_datetime(x[:-4] + '.' + x[-3:] + '000' + tz).timestamp() - TS for x in tqdm(ts, position=0, leave=True, desc='Reading from file: {}'.format(path), dynamic_ncols=True)]) data = np.concatenate((data[:6], ts.reshape(1, -1), data[6:]), axis=0) fs = 1 / (ts[1] - ts[0]) epoching_method_metadata = {'epoching method': 'Geneactiv-like epoching', 'epoch length': ts[1] - ts[0], 'to_score channel': 7} return GeneActivMVM(data, metadata, fs, channel_names, epoching_method_metadata)
[docs] @staticmethod def preview_metadata(path: str) -> tuple[dict, list[str]]: """Preview metadata without loading the whole file. Args: path (str): Path to a file Returns: tuple[dict, list[str]]: Metadata and channel names. """ metadata = {} with open(path) as f: for i in range(76): # First 100 lines contain metadata, but there are six blocks of 5 lines containing sensor data row = f.readline() if row != '\n' and ',' in row: key, value = row.split(',', 1) if 'Sensor type' in key: sensor = value[:-1] for i in range(4): row = f.readline() if ',' in row: key, value = row.split(',', 1) key = sensor.strip() + ' ' + key.strip() metadata[key] = value[:-1].replace('\0', '').strip() else: metadata[key] = value[:-1].replace('\0', '').replace('*', '').strip() # if file is written with export method, sometimes there will be unneeded * in some values channel_names = ['x', 'y', 'z', 'lux', 'button', 'temperature', 'timestamp', 'mvm', 'x_std', 'y_std', 'z_std', 'peak_lux'] return metadata, channel_names
[docs] def export(self, path: str): """Export object to .csv file fully resembling the manufacturer .csv format. Args: path (str): Path to .csv file to which data will be exported. """ with open(path, 'w', newline='') as f: writer = csv.writer(f) if 'Number of Pages' in self._metadata.keys(): RawGeneActiv._export_bin_metadata(self._metadata, writer) else: RawGeneActiv._export_csv_metadata(self._metadata, writer) ts = self._metadata['Start Time'] tz = self._metadata['Time Zone'][3:].strip() sign = -1 if tz[0] == '-' else 1 if len(tz.split(':')) == 1: tzinfo = timezone(sign * timedelta(hours=int(tz[1:]))) else: temp = tz[1:].split(':') tzinfo = timezone(sign * timedelta(hours=int(temp[0]), minutes=int(temp[1]))) ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() for i in tqdm(range(self._data.shape[1]), position=0, leave=True, desc='Saving to file: {}'.format(path), dynamic_ncols=True): row = self._data[:, i] date = datetime.fromtimestamp(row[6] + ts, tz=tzinfo).strftime('%Y-%m-%d %H:%M:%S:%f')[:-3] writer.writerow([date] + list(row[:6]) + list(row[7:]))
[docs] def cut_by_samples(self, start_sample: int, end_sample: int) -> "GeneActivMVM": """Create new object with the data cut by given indexes. Args: start_sample (int): First sample of output data. end_sample (int): Sample after the last sample of output data. Returns: GeneActivMVM: Object containing the cutted data. """ return GeneActivMVM(self._data[:, start_sample:end_sample], self._metadata, self._fs, self._channel_names, self._epoching_method_metadata)
[docs] def cut_by_timestamp(self, start_ts: float, end_ts: float | None = None) -> "GeneActivMVM": """Create new object with the data cut by given timestamps. Args: start_ts (float): Unix timestamp of output data beginning. end_ts (float | None): Unix timestamp of output data end. If None, last sample of output data will be last sample of input data. Returns: GeneActivMVM: Object containing the cutted data. """ ts = self._metadata['Start Time'] tz = self._metadata['Time Zone'][3:].strip() ts = parse_datetime(ts[:-4] + '.' + ts[-3:] + '000' + tz).timestamp() start_sample = np.where(self._data[6, :] >= start_ts - ts)[0][0] if end_ts is None: end_ts = ts + float(self._data[6, -1]) try: end_sample = np.where(self._data[6, :] >= end_ts - ts)[0][0] except IndexError: end_sample = len(self.timestamp) return self.cut_by_samples(start_sample, end_sample)
[docs] def cut_by_dates(self, start_date: str, end_date: str | None = None) -> "GeneActivMVM": """Create new object with the data cut by given dates. Args: start_date (str): ISO-formated date of outputa data beginning. end_date (str | None): ISO-formated date of output data end. If None, last sample of output data will be last sample of input data. Returns: GeneActivMVM: Object containing the cutted data. """ start_ts = parse_datetime(start_date).timestamp() if end_date is not None: end_ts = parse_datetime(end_date).timestamp() else: end_ts = None return self.cut_by_timestamp(start_ts, end_ts)