Código fuente para ruidaqan.gui.reader_debug

import pprint
import numpy as np
import time
from pyqtgraph.Qt import QtCore
from .data_io import DataWriter


[documentos] class GetDevicesInfo(): """ Simula información sobre la placa de adquisición de NI dev_names : Lista con todos los dispositivos instalados en el sistema device_info(dev_name) : método para obtener información de un dispositivo específico device_info(dev_name).ai_ranges : Lista con rangos disponibles device_info(dev_name).channel_names : Lista con los nombres de canales disponibles device_info(dev_name).scaling_coeff(ai_channel_name, ai_range): coeficientes para convertir a voltage para un dado canal y un dado rango. Si bien por default son los mismos para todos los canales, se podrían configurar de forma individual. """ def __init__(self): self.dev_names = [f"Dev{i}" for i in range(1, 4)]
[documentos] def device_info(self, dev_name="Dev1"): return self._device_info(dev_name)
class _device_info(): def __init__(self, dev_name): if dev_name == "Dev1": _rgs_lst = [-0.1, 0.1, -1, 1, -2, 2, -5, 5, -10, 10] else: _rgs_lst = [-0.1, 0.1, -1, 1, -2, 2, -4, 4] self.ai_ranges = [] for i in range(0, len(_rgs_lst), 2): self.ai_ranges.append( (_rgs_lst[i], _rgs_lst[i +1])) self.channel_names = [f"{dev_name}/ai{i}" for i in range(12)] self.resolution_ADC = 16 if dev_name=="Dev1" else 12 def scaling_coeff(self, ai_channel_name="Dev1/ai0", ai_range=(-1, 1)): if ai_channel_name == "Dev1/ai0": if ai_range == (-10, 10): _co = [10, 0.2, 0.3] elif ai_range == (-5, 5): _co = [5, 0.2, 0.3] elif ai_range == (-2, 2): _co = [2, 0.2, 0.3] elif ai_range == (-1, 1): _co = [1, 0.2, 0.3] else: _co = [0, -0.1, 0.3] elif ai_channel_name == "Dev2/ai0": _co = [1, 1.2, 1.3] elif ai_channel_name == "Dev1/ai1": _co = [11, 11, 11] elif ai_channel_name == "Dev1/ai2": _co = [21, 21, 21] elif ai_channel_name == "Dev1/ai3": _co = [31, 31, 31] elif ai_channel_name == "Dev1/ai4": _co = [41, 41, 41] elif ai_channel_name == "Dev1/ai5": _co = [51, 51, 51] else: _co = [0, 1, 2] return _co
[documentos] class SignalReader(QtCore.QThread): """ Thread for simulating a signal """ # Signal with read data incoming_data = QtCore.pyqtSignal(object) def __init__(self, sample_rate, sample_size, channels, *args, **kargs): super().__init__() self.reading_type = kargs.get('reading_type', 'float') self.reader = None self.is_running = False self.is_paused = False self.input_channels = channels self.sample_rate = sample_rate self.sample_size = sample_size
[documentos] def run(self): """ Start thread for data simulation. Called by QThread.start() """ # self.writer = DataWriter() self.is_running = True while self.is_running: if not self.is_paused: rng = np.random.default_rng() if self.reading_type == "float": # Simulate voltage data self.input = rng.normal(0, 0.1, size=(len(self.input_channels), self.sample_size) ) # Add different offset for each cchannel for i, _ in enumerate(self.input): self.input[i, :] += i else: # Simulate integer data self.input = rng.poisson(10, size=(len(self.input_channels), self.sample_size) ) # Add different offset for each cchannel for i, _ in enumerate(self.input): self.input[i, :] += i # Canal 0 saturado # self.input[0, :] = 32767 # Emit signal with the data self.incoming_data.emit(self.input) # Write data to file # self.writer.write_data(self.input) # Print data to screen # pprint.pprint(self.input.T) # Wait the time it would have taken to acquiere the real signal _time_acq = self.sample_size / self.sample_rate time.sleep(_time_acq) # Finish data writer # self.writer.close_file() # Returns run and the thread is closed return None
[documentos] def restart(self): print("Restarting the task") self.is_paused = True #self.writer.close_file() self.is_paused = False
# self.writer = DataWriter() if __name__ == "__main__": print("\nRunning demo for SignalReader\n") reader_thread = SignalReader(sample_rate=1000, sample_size=1000, channels=[0, 1, 2], dev_name='Dev1', ) reader_thread.start() input("Press return to stop") reader_thread.is_running = False # Wait for the thread to finish (see QThread class) reader_thread.wait() print("\nTask done")