Código fuente para ruidaqan.gui.reader

import pprint
import nidaqmx
import numpy as np
from nidaqmx.constants import AcquisitionType
from nidaqmx.stream_readers import ( AnalogMultiChannelReader,
                                     AnalogUnscaledReader,
                                   )
from pyqtgraph.Qt import QtCore
from .data_io import DataWriter


[documentos] class GetDevicesInfo(): """ Obtiene información sobre la placa de adquisición de NI dev_names : Lista con todos los dispositivos instalados en el sistema device_info(dev_name) : método para obtener información de un dispositivo específico device_info(dev_name).ai_ranges : Lista con rangos disponibles device_info(dev_name).channel_names : Lista con los nombres de canales disponibles device_info(dev_name).scaling_coeff(ai_channel_name, ai_range): coeficientes para convertir a voltage para un dado canal y un dado rango. Si bien por default son los mismos para todos los canales, se podrían configurar de forma individual. """ def __init__(self): _system = nidaqmx.system.System.local() self.dev_names = _system.devices.device_names
[documentos] def device_info(self, dev_name="Dev1"): return self._device_info(dev_name)
class _device_info(): def __init__(self, dev_name): device = nidaqmx.system.device.Device(dev_name) # Convierto la lista que devuelve nidaqmx en una lista de tuplas # con los rangos [(-10, 10), (-5, 5), ...] _rng_lst = device.ai_voltage_rngs self.ai_ranges = [] for i in range(0, len(_rng_lst), 2): self.ai_ranges.append((_rng_lst[i], _rng_lst[i +1 ])) self.channel_names = device.ai_physical_chans.channel_names self.resolution_ADC = int(self._resolution_ADC(dev_name)) def scaling_coeff(self, ai_channel_name="Dev1/ai0", ai_range=(-10, 10)): with nidaqmx.Task() as ni_task: ai_args = {'min_val': ai_range[0], 'max_val': ai_range[1]} ai_chan = ni_task.ai_channels.add_ai_voltage_chan( ai_channel_name, **ai_args) return ai_chan.ai_dev_scaling_coeff def _resolution_ADC(self, dev_name): with nidaqmx.Task() as ni_task: res = ni_task.ai_channels.add_ai_voltage_chan( dev_name + '/ai0').ai_resolution return res
[documentos] class SignalReader(QtCore.QThread): """ Thread for capturing input signal through DAQ TODO: Add reset device. Parameters ---------- sample_rate : float Frecuencia de muestreo sample_size : float Cantidad de puntos a leer channels: list Canales a medir ranges: list of tuples (v_min, V_max) de cada canal a medir kargs: reading_type: string: "float"(default), "int16" Tipo de dato que se va a leer terminal_cfg: string: "RSE" (default), "NRSE" Tipo de conexión para los canales (se usa la misma para todos) """ # Signal with read data incoming_data = QtCore.pyqtSignal(object) def __init__(self, sample_rate, sample_size, channels, ranges, **kargs): super().__init__() self.reading_type = kargs.get('reading_type', 'float') ter_cfg_val = kargs.get('terminal_cfg', 'RSE') # Cantidad de historias que se van a adquirir self.max_histories = kargs.get('max_histories', None) # Contador de historias adquiridas self.num_histories = 0 ter_cfg_dic = { 'RSE': nidaqmx.constants.TerminalConfiguration.RSE, 'NRSE': nidaqmx.constants.TerminalConfiguration.NRSE, } self.terminal_config = ter_cfg_dic[ter_cfg_val] self.reader = None self.is_running = False self.is_paused = False self.input_channels = channels self.input_ranges = ranges self.sample_rate = sample_rate self.sample_size = sample_size # actual data received from the DAQ if self.reading_type == "float": self.input = np.empty(shape=(len(channels), self.sample_size), dtype=np.float64) elif self.reading_type == "int16": self.input = np.empty(shape=(len(channels), self.sample_size), dtype=np.int16) else: msg = "Tipo de lectura no reconocida. Opciones: 'float', 'int16' " raise ValueError(msg)
[documentos] def run(self): """ Start thread for data acuisition. Called by QThread.start() """ self.is_running = True self.create_task() while self.is_running: if not self.is_paused: try: # Read data acquiered if self.reading_type == "float": self.reader.read_many_sample( data=self.input, number_of_samples_per_channel=self.sample_size, timeout=-1, ) elif self.reading_type == "int16": self.reader.read_int16( data=self.input, number_of_samples_per_channel=self.sample_size ) self.num_histories += 1 # Emit signal with the data self.incoming_data.emit(self.input) # Print data to screen # pprint.pprint(self.input.T) print("Total data acquired:" f"{self.task.in_stream.total_samp_per_chan_acquired}") except Exception as e: print("Error with read_many_sample") print(e) break if self.num_histories == self.max_histories: # Se finaliza la adquisición al completar todas las historias break # Stops acquisition self.task.close() # Returns run and the thread is closed return None
[documentos] def create_task(self): try: self.task = nidaqmx.Task("Reader Task") except OSError: print("DAQ is not connected, task could not be created") return try: for ch, rg in zip(self.input_channels, self.input_ranges): chan_args = { 'min_val': rg[0], 'max_val': rg[1], 'terminal_config': self.terminal_config, } self.task.ai_channels.add_ai_voltage_chan(ch, **chan_args) except Exception as e: print("DAQ is not connected, channel could not be added") print(e) return # Timing definition self.task.timing.cfg_samp_clk_timing( rate=self.sample_rate, sample_mode=AcquisitionType.CONTINUOUS ) self.task.start() # Start acquisition if self.reading_type == "float": self.reader = AnalogMultiChannelReader(self.task.in_stream) elif self.reading_type == "int16": self.reader = AnalogUnscaledReader(self.task.in_stream)
[documentos] def restart(self): print("Restarting the task") # Closing self.is_paused = True self.task.stop() self.task.close() # self.writer.close_file() # Starting # self.writer = DataWriter() self.create_task() self.is_paused = False
if __name__ == "__main__": print("\nRunning demo for SignalReader\n") reader_thread = SignalReader(sample_rate=1000, sample_size=1000, channels=[0, 1, 2], dev_name='Dev1', ) reader_thread.start() input("Press return to stop") reader_thread.is_running = False # Wait for the thread to finish (see QThread class) reader_thread.wait() print("\nTask done")