Código fuente para ruidaqan.script.data_fercin3_io

#! /usr/bin/env python3

import pandas as pd
import numpy as np
from numpy.polynomial.polynomial import polyval
import tables as tb
import os
import sys
import pickle
from scipy import signal
from uncertainties import ufloat, unumpy, correlated_values
import importlib.resources
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
try:
    from .fit_models import (ajuste_apsd, ajuste_cpsd, ajuste_lineal,
                               weighted_average)
except ImportError:
    from fit_models import (ajuste_apsd, ajuste_cpsd, ajuste_lineal,
                               weighted_average)


[documentos] class HDF5FileProcessing(): """ Procesamiento de los archivos en formato HDF5 generados en la adquisición del programa. Permite calcular los espectros y graficarlos. Parameters ---------- file_name : str Nombre del archivo en formato hdf5 """ def __init__(self, file_name): self.file_name = file_name self._read_file(self.file_name) self.TRANSFERS_FOLDER = str(importlib.resources.files("ruidaqan") / "transferencias") def _read_file(self, file_name): with tb.open_file(file_name, mode="r") as f: # ---- Atributos del archivo aa = f.root._v_attrs self.date = aa.Fecha self.hour = aa.Hora self.reactor = aa.Reactor self.nucleo = aa.Nucleo self.serie = aa.Serie self.n_det = aa.N_detectores self.cip = aa.CIp_adquirido self.comentaario = aa.Comentario self.nivel = aa.Nivel self.medicion = aa.Medicion self.comentario_medicion = aa.Comentario_medicion self.BW = aa.BW self.dt = 1 / (2 * self.BW) self.n_puntos = aa.N_puntos self.historias = aa.Historias self.resolucion_DAC = aa.Resolucion_DAC self.AC_volts, self.DC_volts = [], [] self.AC_rangos, self.DC_rangos = [], [] self.AC_nombres, self.DC_nombres = [], [] self.DC_volt_avg_sf = [] self.If, self.I = [], [] self.sensibilidades, self.AC_gains, self.offsets = [], [], [] for i in range(self.n_det): g = f.get_node(f"/Cadena_{i + 1}") # Uso una incerteza del 1% en las sensibilidades # Es la incerteza de la resistencia que usa el conversor # corriente-voltaje (ver manual K330) # TODO: la incerteza guardarla en el archivo sensibilidad = ufloat(g._v_attrs.Sensibilidad, g._v_attrs.Sensibilidad * 0.01) self.sensibilidades.append(sensibilidad) AC_gan = g._v_attrs.AC_gains # Pongo a mano una incerteza para la ganancia del 0.5% # (es la incerteza que se le asigna en el FERCIN5) # TODO: la incerteza guardarla en el archivo self.AC_gains.append(ufloat(AC_gan, AC_gan * 0.005)) offset = g._v_attrs.Offset self.offsets.append(offset) fondo = ufloat(*g._v_attrs.Fondo) self.If.append(fondo) AC_int = g.AC.read() self.AC_volts.append(polyval(AC_int, g.AC.attrs.Coeficientes)) self.AC_rangos.append(g.AC.attrs.Rango) self.AC_nombres.append(g.AC.attrs.Nombre) DC_int = g.DC.read() DC_volt = polyval(DC_int, g.DC.attrs.Coeficientes) self.DC_volts.append(DC_volt) self.DC_rangos.append(g.DC.attrs.Rango) self.DC_nombres.append(g.DC.attrs.Nombre) DC_avg = ufloat(np.mean(DC_volt), np.std(DC_volt)) self.I.append((DC_avg - offset) * sensibilidad - fondo) # Voltaje DC promedio sin fondo DC_volt_m_sf = DC_avg - offset - fondo / sensibilidad self.DC_volt_avg_sf.append(DC_volt_m_sf) # Para CIp g = f.get_node("/Cadena_Ip") self.If.append(ufloat(*g._v_attrs.Fondo)) if self.cip: sensibilidad = ufloat(g._v_attrs.Sensibilidad, 0.0) self.sensibilidades.append(sensibilidad) offset = g._v_attrs.Offset self.offsets.append(offset) DC_int = g.DC.read() DC_volt = polyval(DC_int, g.DC.attrs.Coeficientes) self.DC_volts.append(DC_volt) self.DC_rangos.append(g.DC.attrs.Rango) self.DC_nombres.append(g.DC.attrs.Nombre) DC_avg = ufloat(np.mean(DC_volt), np.std(DC_volt)) # Recordar que no se resta fondo a Ip self.Ip = (DC_avg - offset) * sensibilidad # Voltaje DC promedio (no se resta fondo para Ip) DC_volt_m_sf = DC_avg - offset # Por comodidad lo pongo en la lista que ya estaba usando # a pesar de que Ip tiene el fondo self.DC_volt_avg_sf.append(DC_volt_m_sf) else: # Se leen valores ingreados de forma manual Ip_DC_mag = g.DC_mag.read() Ip_DC_std = g.DC_std.read() # Se calcula promedio pesado mean_Ip, std_Ip = weighted_average(Ip_DC_mag, 1 / Ip_DC_std**2) self.Ip = ufloat(mean_Ip, std_Ip)
[documentos] def get_temporal_signals(self): """ Calcula las AC en voltaje. Convierte dependiendo ADC """ # Calculo la corriente DC temporal # Esto sólo se usa para graficar, no se incluyen las incertezas # (ver que se usa .n para las sensibilidades) I_DC = [] for i, dc in enumerate(self.DC_volts): I_DC.append((dc - self.offsets[i]) * self.sensibilidades[i].n) # Vector temporal time_vec = np.arange(0, len(self.AC_volts[0])) * self.dt return time_vec, self.AC_volts, I_DC
[documentos] def get_spectrums(self, N_fft=None, N_overlap=None): """ Espectros sin normalizar ni corregidos con función transferencia Son los valores que se guardan en los archivos .PSD con fercin3/5 Se estima el espectro con el estimador de Welch utilizando N_fft puntos y N_overlap puntos de solapamiento. Returns ------- self.spectrums: lists of numpy arrays Lista con los espectros [APSD1, APSD2, CPSD] self.f: numpy array Vector con las frecuencias asociada a los espectros """ if (N_fft is None) or (N_overlap is None): raise ValueError("Faltan especificar N_fft o N_overlap") self.N_fft = N_fft self.N_overlap = N_overlap spectrums = [] for AC in self.AC_volts: # Estimación de las APSD f_spectrums, _spec = signal.welch(AC, fs=1/self.dt, noverlap=N_overlap, nperseg=N_fft) spectrums.append(_spec) if self.n_det == 2: # Estimación de la CPSD _, _spec = signal.csd(self.AC_volts[0], self.AC_volts[1], fs=1/self.dt, noverlap=N_overlap, nperseg=N_fft) spectrums.append(_spec) self.spectrums = spectrums # Se convierten self.f = np.asarray(f_spectrums) return self.spectrums, self.f
[documentos] def get_normalized_spectrums(self, reactor_name, N_fft=None, N_overlap=None, freqs_to_remove=[50, 100, 150], freqs_to_remove_tr=[50, 100, 150]): """ Calcula los espectros normalizados con el método de Welch. Se les quita las frencuencia de línea y sus dos armónicos. Están corregidos por la función transferencia. Si no se especifica N_fft y N_overlap se usan los espectros estimados con .get_spectrum(). Si se especifican, se vuelven a calcular. Parameters ---------- N_fft: int Puntos para calcular la FFT en cada bloque N_overlap: int Cantidad de puntos para el solapamiento de bloques reactor_name: string Identificación del reactor (para leer transferencias) freqs_to_remove: list of floats Frecuencias que serán eliminadas del espectro medido freqs_to_remove: list of floats Frecuencias que serán eliminadas de las transferencias Returns ------- self.spectrums: listr of numpy arrays Lista con los espectros [NPSD1, NPSD1, NCPSD12] Los NPSD son reales, la NCPSD es compleja self.f: numpy array Frecuencias asociadas a los espectros """ # Si no se calcularon los espectros previamente, se los calcula if not hasattr(self, 'spectrums'): self.get_spectrums(N_fft, N_overlap) else: # Si se especifican los valores if N_fft or N_overlap: self.get_spectrums(N_fft, N_overlap) # Nombres de las funciones transferencia que voy a leer _tr_names = self._get_transfer_name(reactor_name) norm_spec = [] _trans = [] for i, _tr_name in enumerate(_tr_names): # Lectura de las funciones transferencia try: H2_dB, ReH, ImH = self._get_transfer_function(_tr_name, freqs_to_remove_tr) except IOError: _tr_name = os.path.join('..', _tr_name) H2_dB, ReH, ImH = self._get_transfer_function(_tr_name, freqs_to_remove_tr) # Módulo cuadrado de la función transferencia H2 = 10**(H2_dB / 10) _trans.append([H2, ReH, ImH]) print(f"Se lee la función transferencia de {_tr_name}") # Espectro APSD APSD = self.spectrums[i] # En algunos casos del RA1 no coinciden los tamaños entre APSD y # la función tansferencia len_s, len_h = len(APSD), len(H2) if len_s != len_h: _min = min(len_s, len_h) f = f[:_min] APSD = APSD[:_min] H2 = H2[:_min] print(80*'!') msg = "Advertencia: el tamaño de la función transferencia\n" msg += "no coincide con el tamaño del espectro medido \n" msg += "Se toma el mínimo entre ambos: {}" print(msg.format(_min)) print(80*'!') # Corrección por función transferencia APSD_tr = APSD / H2 # Normalizo NAPSD_tr = APSD_tr / self.DC_volt_avg_sf[i]**2 # Quito frecuencias de línea NAPSD_tr = self._remove_utility_freq( self.f, NAPSD_tr, freqs_to_remove) # Los espectros tienen incertezas debido a la propagación, pero las # descarto por ahora: NAPSD_tr = unumpy.nominal_values(NAPSD_tr) norm_spec.append(NAPSD_tr) if len(_tr_names) == 2: # --- Corrección para la CPSD # Espectros sin corregir APSD1 = self.spectrums[0] APSD2 = self.spectrums[1] ReCPSD = np.real(self.spectrums[2]) ImCPSD = np.imag(self.spectrums[2]) # Funciones transferencia leidas anteriormente H2_1, ReH_1, ImH_1 = _trans[0] H2_2, ReH_2, ImH_2 = _trans[1] # Corrección de la CPSD por función transferencia _c1 = ReH_1 * ReH_2 + ImH_1 * ImH_2 _c2 = ImH_1 * ReH_2 - ReH_1 * ImH_2 ReCPSD_tr = (_c1 * ReCPSD - _c2 * ImCPSD) / (H2_1 * H2_2) ImCPSD_tr = (_c1 * ImCPSD + _c2 * ReCPSD) / (H2_1 * H2_2) # Normalizo con valor medio fac_norm = (self.DC_volt_avg_sf[0] * self.DC_volt_avg_sf[1]) ReNCPSD_tr = ReCPSD_tr / fac_norm ImNCPSD_tr = ImCPSD_tr / fac_norm # Quito frecuencias de línea ReNCPSD_tr = self._remove_utility_freq(self.f, ReNCPSD_tr, freqs_to_remove) ImNCPSD_tr = self._remove_utility_freq(self.f, ImNCPSD_tr, freqs_to_remove) # Los espectros tienen incertezas debido a la propagación, pero las # descarto por ahora: ReNCPSD_tr = unumpy.nominal_values(ReNCPSD_tr) ImNCPSD_tr = unumpy.nominal_values(ImNCPSD_tr) # Guardo el espectro complejo norm_spec.append(ReNCPSD_tr + 1j * ImNCPSD_tr) self.norm_spectrums = norm_spec return self.norm_spectrums, self.f
def _get_transfer_name(self, reactor_name): """ Get names of transfer functions """ #FOLDER_NAME = "transferencias" if reactor_name: sub_folder = reactor_name else: sub_folder = "" path_file = [] for i, K in enumerate(self.AC_gains): K_over_100 = np.round(K.n / 100) if i !=3: file_name = "TR{:.0f}{:.0f}A{}.PSD".format( K_over_100, self.BW, i+1) path_file.append(os.path.join(self.TRANSFERS_FOLDER, sub_folder, file_name)) return path_file def _get_transfer_function(self, file_name, freqs_to_remove=[50, 100, 150]): """ Función para leer archivos .PSD con las funciones de transferencia producidas por el programa TRA.EXE. Se quitan también las frecuencias de línea. Parameters ---------- file_name : string Nombre del archivo con extensión PSD de la función transferencia Returns ------- H2_dB : numpy array Módulo cuadrado de la función transferencia. Expresado en dB 10*log10(H2) ReH : numpy array Parte real de la función transferencia ImH : numpy array Parte imaginaria de la función transferencia """ # TODO: Se podría poner en los comentarios de los archivos la # identificación del reactor (al menos). Para evitar confusiones. # Preguntar. SKIP_HEADER = 12 # Lectura de los datos # Frecuencia [Hz] Re(CPSD) Im(CPSD) APSD1[dB] APSD2[dB] _data = np.loadtxt(file_name, skiprows=SKIP_HEADER) # Frecuencia [Hz] freq = _data[:, 0] # Re(CPSD) _ReCPSD = _data[:, 1] # Im(CPSD) _ImCPSD = _data[:, 2] # APSD1 [dB] _APSD1_dB = _data[:, 3] # APSD2 [dB] _APSD2_dB = _data[:, 4] # Módulo cuadrado de H(f) H2_dB = _APSD2_dB - _APSD1_dB # Parte real e imaginaria de H(f) _APSD1 = 10**(_APSD1_dB / 10) ReH = _ReCPSD / _APSD1 ImH = _ImCPSD / _APSD1 # Se quitan frecuencias de línea H2_dB = self._remove_utility_freq(freq, H2_dB, freqs_to_remove) ReH = self._remove_utility_freq(freq, ReH, freqs_to_remove) ImH = self._remove_utility_freq(freq, ImH, freqs_to_remove) return H2_dB, ReH, ImH def _remove_utility_freq(self, frequency, spectrum, freqs_to_remove): """ Removes the utility frequencies from the spectrum Values of 'spectrum' at those freqs (+/- 1Hz) are replaced with those of its neighbours (Angel's approach). Parameters ---------- frequency : numpy array Vector with frequencies spectrum : numpy array Vector with the spectrum (should be same size of frequency) freqs_to_remove : double or array Frequency/ies to be removed Returns ------- new_spectrum : numpy array Spectrum without the freqs_to_remove """ # Force list if not isinstance(freqs_to_remove, list): freqs_to_remove = [freqs_to_remove] new_spectrum = np.copy(spectrum) for freqs in freqs_to_remove: indx_to_remove = (frequency > freqs-1) & (frequency < freqs + 1) # Closest value to (freqs - 1Hz) value_to_assign = new_spectrum[frequency <= freqs-1][-1] new_spectrum[indx_to_remove] = value_to_assign # print("Aviso: se quitó la frecuencia {} Hz".format(freqs)) return new_spectrum
[documentos] def plot_spectrums(self, normalizados=True, *args, **kargs): if normalizados: if self.norm_spectrums is None: raise NameError("Faltan estimar los espectros normalizados") _datas = self.norm_spectrums _labels = ["NPSD1", "NPSD2", "Re(NCPSD12)"] _units = "[1 / Hz]" else: if self.spectrums is None: raise NameError("Faltan estimar los espectros") _datas = self.spectrums _labels = ["PSD1", "PSD2", "Re(CPSD12)"] _units = "[V$^2$ / Hz]" figs, ax = plt.subplots(1, 1) for i, data in enumerate(_datas): if i == 2: data = np.real(data) ax.plot(self.f, data, '.-', label=_labels[i]) ax.set_xlabel("Frecuencia [Hz]") ax.set_ylabel("Espectro " + _units) ax.legend() ax.set_title(f"Archivo leido: {self.file_name}") figs.tight_layout() plt.show()
[documentos] def fit_spectrums(self, intervals, **kargs): """ Ajuste de los espectros intervals: list of tuples Lista con los límitesde los intervalos para el ajuste [(f_min, f_max), (f_min, f_max), ...] en el orden [NPSD1, NPSD2, CPSD12] kargs: plot: bool True si se quiere graficar. False por default """ if self.norm_spectrums is None: raise NameError("Faltan estimar los espectros normalizados") plot = kargs.get('plot', False) _ttle = ["NPSD1", "NPSD2", "Re(NCPSD12)"] _ylab = [i + " [1 / Hz]" for i in _ttle] self.fit_results = [] self.fitted_data = [] for i, (spe, inter) in enumerate(zip(self.norm_spectrums, intervals)): ind = (self.f >= inter[0]) & (self.f <= inter[1]) if i != 2: _dat = spe[ind] _res = ajuste_apsd(self.f[ind], _dat) else: _dat = np.real(spe[ind]) _res = ajuste_cpsd(self.f[ind], _dat) self.fit_results.append(_res) self.fitted_data.append([self.f[ind], _dat]) if plot: best_fit = _dat + _res.residual fig, (ax0, ax1) = plt.subplots(2, sharex=True, gridspec_kw={'height_ratios': [3,1]},) ax0.errorbar(self.f, np.real(spe), fmt='k.', elinewidth=2, label='Medición') ax0.plot(self.f[ind], best_fit, 'r', zorder=3, label='fit') ax0.set_ylabel(_ylab[i]) ax1.plot(self.f[ind], _res.residual, 'k.') ax1.set_xlabel('f [Hz]') ax1.set_ylabel(r'Residuos [1/Hz]') ax0.set_title(f"Ajuste de {_ttle[i]}") fig.subplots_adjust(hspace=0.1) if plot: plt.show()
[documentos] def calcula_parametros_cineticos(self, reactor, **kwargs): """ Calcula parámetros cinéticos en base al ajuste espectral Parameters ---------- reactor: reactor class Clase importada del archivo "constantes_reactores.py" Returns ------- self.parametros_cineticos: dic Diccionario con los parámetros cinéticos estimados. Los keys son 'alfa', 'potencia', 'fq' y 'efi_over_beta2'. Los últimos dos son None en caso de ajustar una NCPSD """ if self.fit_results is None: raise NameError("Faltan hacer el ajuste de los espectros") # Constantes cinéticas DIVEN = reactor.FACTOR_DIVEN BETA = reactor.BETA_EFECTIVO FORMA = reactor.FACTOR_FORMA_L EFISION = reactor.ENERGIA_FISION BENNETT = reactor.FACTOR_BENNETT self.parametros_cineticos = [] # Asocio con sus incertezas for i, res in enumerate(self.fit_results): # Valores estimados par_val = [res.params[s].value for s in res.var_names] if i != 2: # Cálculo con NAPSD if "covar" in dir(res): # Se obtiene la matríz de covarianza alfa, amplitud, constante = \ correlated_values(par_val, res.covar, tags=res.var_names) else: # No se obtiene matriz de covarianza alfa, amplitud, constante = par_val print( "No se pudo estimar la matríz de covaarianza del ajuste") fq = constante * self.I[i] / 2 efi_over_beta2 = amplitud * BENNETT / (1 - BETA) / FORMA \ / DIVEN / constante else: # Calculo con NCPSD if "covar" in dir(res): # Se obtiene la matríz de covarianza alfa, amplitud = correlated_values(par_val, res.covar, tags=res.var_names) else: # No se obtiene matriz de covarianza alfa, amplitud = par_val print( "No se pudo estimar la matríz de covaarianza del ajuste") constante = None fq = None efi_over_beta2 = None # Cálculo de la potencia potencia = 2 * EFISION * DIVEN * (1 - BETA) * FORMA / BETA**2 \ / amplitud # Salida _pars = { "amplitud": amplitud, "constante": constante, "alfa": alfa, "potencia": potencia, "fq": fq, "efi_over_beta2": efi_over_beta2, } self.parametros_cineticos.append(_pars)
[documentos] class RUIFileProcessing(): """ Procesamiento de los archivos .RUI generados por el programa FERCIN3 Permite calcular los espectros, normalizarlos, graficarlos y obtener los parámetros cinéticos a partir del ajuste. Parameters ---------- file_name : str Nombre del archivo en formato hdf5 resolution: int Resolución del ADC con que se adquirió. Si no se especifica, se la trata de estimar en base a los valores registrados (elige entre 12 y 16 bits). """ def __init__(self, file_name, resolution=None): self.file_name = file_name self._read_file(self.file_name) self.TRANSFERS_FOLDER = str(importlib.resources.files("ruidaqan") / "transferencias") if resolution is None: self.ADC_resolution = self._get_resolution() else: self.ADC_resolution = resolution self.norm_spectrums = None self.N_fft = None self.N_overlap = None self.fit_results = None def _read_file(self, file_name): with open(file_name, 'r') as f: # Date self.date = f.readline().rstrip('\n') # Hour self.hour = f.readline().rstrip('\n') # Number of AC data self.num_datos = int(float(f.readline().rstrip('\n'))) tmp = [float(it) for it in f.readline().split()] # dt self.dt = tmp[0] # Band width self.BW = 1 / (2 * self.dt) # Ip current and error [A] self.Ip = ufloat(tmp[1], tmp[2]) # Number of detectores self.n_det = int(f.readline().strip('\n')) # Acquisition death time [s] self.death_time = float(f.readline().strip('\n')) # Rango/s de la adquisición [V] self.rango = [f.readline().split()[0]] if self.n_det == 2: self.rango.append(f.readline().split()[0]) # ---- Detectors # Comment self.comment = [f.readline().strip('\n')] tmp = [float(it) for it in f.readline().split()] # DC voltage [V] self.DC_volts = [ufloat(tmp[0], tmp[1])] # Background current [A] self.If = [ufloat(tmp[2], tmp[3])] tmp = [float(it) for it in f.readline().split()] # Impedance amplifier self.Z = [ufloat(tmp[0], tmp[1])] tmp = [float(it) for it in f.readline().split()] # AC gain self.K = [ufloat(tmp[0], tmp[1])] if self.n_det == 2: self.comment.append(f.readline().strip('\n')) tmp = [float(it) for it in f.readline().split()] # DC voltage [V] self.DC_volts.append(ufloat(tmp[0], tmp[1])) # Background current [A] self.If.append(ufloat(tmp[2], tmp[3])) tmp = [float(it) for it in f.readline().split()] # Impedance amplifier self.Z.append(ufloat(tmp[0], tmp[1])) tmp = [float(it) for it in f.readline().split()] # AC gain self.K.append(ufloat(tmp[0], tmp[1])) # AC data in int16 representation if self.n_det == 2: skip_hdr = 16 rui_data = np.loadtxt(file_name, skiprows=skip_hdr) # AC data from CIC are alternated self.AC_int = [rui_data[0:][::2], rui_data[1:][::2]] else: skip_hdr = 12 rui_data = np.loadtxt(file_name, skiprows=skip_hdr) self.AC_int = [rui_data[0:]] # Calculo las corrientes DC de cada cadena self.I = [] for i in range(self.n_det): self.I.append(self.DC_volts[i] / self.Z[i] - self.If[i]) def _get_resolution(self): """ Guess between 12 and 16 bits resolution """ for dat in self.AC_int: if any(dat > 2**12): return 16 else: print( "Resolución estimada de la ADC: 12 bits (podría estar mal)") return 12
[documentos] def get_temporal_signals(self): """ Calcula las AC en voltaje. Convierte dependiendo ADC """ AC_volts = [] if self.ADC_resolution == 12: # PCL818 for i in range(self.n_det): rango = float(self.rango[i]) AC_volts.append( (self.AC_int[i] - 2**11) * 2 * rango / 2**12 ) elif self.ADC_resolution == 16: for i in range(self.n_det): AC_volts.append(self.raw_to_volt_NI( self.AC_int[i] - 2**15, self.rango[i] )) else: raise ValueError(f"No está contemplada una resolución de " + f"{self.ADC_resolution} para la ADC") # Vector temporal time_vec = np.arange(0, len(AC_volts[0])) * self.dt return time_vec, AC_volts, None
[documentos] def get_spectrums(self, N_fft=None, N_overlap=None): """ Espectros sin normalizar ni corregidos con función transferencia Son los valores que se guardan en los archivos .PSD con fercin3/5 Los valores temporales del .RUI se transforman a voltaje, y se estima el espectro con el estimador de Welch utilizando N_fft puntos y N_overlap puntos de solapamiento. Returns: self.spectrums: lists of numpy arrays Lista con los espectros [APSD1, APSD2, CPSD] self.f: numpy array Vector con las frecuencias asociada a los espectros """ if (N_fft is None) or (N_overlap is None): raise ValueError("Faltan especificar N_fft o N_overlap") self.N_fft = N_fft self.N_overlap = N_overlap VC_volts = [] for i in range(self.n_det): VC_volts.append(self.DC_volts[i] - self.If[i] * self.Z[i]) _, AC_volts, _ = self.get_temporal_signals() spectrums = [] for AC in AC_volts: # Estimación de las APSD f_spectrums, _spec = signal.welch(AC, fs=1/self.dt, noverlap=N_overlap, nperseg=N_fft) spectrums.append(_spec) if self.n_det == 2: # Estimación de la CPSD _, _spec = signal.csd(AC_volts[0], AC_volts[1], fs=1/self.dt, noverlap=N_overlap, nperseg=N_fft) spectrums.append(_spec) self.spectrums = spectrums # Se convierten self.f = np.asarray(f_spectrums) return self.spectrums, self.f
[documentos] def get_normalized_spectrums(self, reactor_name, N_fft=None, N_overlap=None, freqs_to_remove=[50, 100, 150], freqs_to_remove_tr=[50, 100, 150]): """ Calcula los espectros normalizados con el método de Welch. Se les quita las frencuencia de línea y sus dos armónicos. Están corregidos por la función transferencia. Si no se especifica N_fft y N_overlap se usan los espectros estimados con .get_spectrum(). Si se especifican, se vuelven a calcular. Parameters ---------- N_fft: int Puntos para calcular la FFT en cada bloque N_overlap: int Cantidad de puntos para el solapamiento de bloques reactor_name: string Identificación del reactor (para leer transferencias) freqs_to_remove: list of floats Frecuencias que serán eliminadas del espectro medido freqs_to_remove: list of floats Frecuencias que serán eliminadas de las transferencias Returns ------- self.spectrums: listr of numpy arrays Lista con los espectros [NPSD1, NPSD1, NCPSD12] Los NPSD son reales, la NCPSD es compleja self.f: numpy array Frecuencias asociadas a los espectros """ # Si no se calcularon los espectros previamente, se los calcula if not hasattr(self, 'spectrums'): self.get_spectrums(N_fft, N_overlap) else: # Si se especifican los valores if N_fft or N_overlap: self.get_spectrums(N_fft, N_overlap) # Nombres de las funciones transferencia que voy a leer _tr_names = self._get_transfer_name(reactor_name) norm_spec = [] _trans = [] for i, _tr_name in enumerate(_tr_names): # Lectura de las funciones transferencia try: H2_dB, ReH, ImH = self._get_transfer_function(_tr_name, freqs_to_remove_tr) except IOError: _tr_name = os.path.join('..', _tr_name) H2_dB, ReH, ImH = self._get_transfer_function(_tr_name, freqs_to_remove_tr) # Módulo cuadrado de la función transferencia H2 = 10**(H2_dB / 10) _trans.append([H2, ReH, ImH]) print(f"Se lee la función transferencia de {_tr_name}") # Espectro APSD APSD = self.spectrums[i] # En algunos casos del RA1 no coinciden los tamaños entre APSD y # la función tansferencia len_s, len_h = len(APSD), len(H2) if len_s != len_h: _min = min(len_s, len_h) f = f[:_min] APSD = APSD[:_min] H2 = H2[:_min] print(80*'!') msg = "Advertencia: el tamaño de la función transferencia\n" msg += "no coincide con el tamaño del espectro medido \n" msg += "Se toma el mínimo entre ambos: {}" print(msg.format(_min)) print(80*'!') # Corrección por función transferencia APSD_tr = APSD / H2 # Normalizo V_sin_fondo = self.DC_volts[i] - self.If[i] * self.Z[i] NAPSD_tr = APSD_tr / V_sin_fondo**2 # Quito frecuencias de línea NAPSD_tr = self._remove_utility_freq( self.f, NAPSD_tr, freqs_to_remove) # Los espectros tienen incertezas debido a la propagación, pero las # descarto por ahora: NAPSD_tr = unumpy.nominal_values(NAPSD_tr) norm_spec.append(NAPSD_tr) if len(_tr_names) == 2: # --- Corrección para la CPSD # Espectros sin corregir APSD1 = self.spectrums[0] APSD2 = self.spectrums[1] ReCPSD = np.real(self.spectrums[2]) ImCPSD = np.imag(self.spectrums[2]) # Funciones transferencia leidas anteriormente H2_1, ReH_1, ImH_1 = _trans[0] H2_2, ReH_2, ImH_2 = _trans[1] # Corrección de la CPSD por función transferencia _c1 = ReH_1 * ReH_2 + ImH_1 * ImH_2 _c2 = ImH_1 * ReH_2 - ReH_1 * ImH_2 ReCPSD_tr = (_c1 * ReCPSD - _c2 * ImCPSD) / (H2_1 * H2_2) ImCPSD_tr = (_c1 * ImCPSD + _c2 * ReCPSD) / (H2_1 * H2_2) # Normalizo con valor medio # Resto el fondo al voltaje medio V1_sf = self.DC_volts[0] - self.If[0] * self.Z[0] V2_sf = self.DC_volts[1] - self.If[1] * self.Z[1] ReNCPSD_tr = ReCPSD_tr / (V1_sf * V2_sf) ImNCPSD_tr = ImCPSD_tr / (V1_sf * V2_sf) # Quito frecuencias de línea ReNCPSD_tr = self._remove_utility_freq(self.f, ReNCPSD_tr, freqs_to_remove) ImNCPSD_tr = self._remove_utility_freq(self.f, ImNCPSD_tr, freqs_to_remove) # Los espectros tienen incertezas debido a la propagación, pero las # descarto por ahora: ReNCPSD_tr = unumpy.nominal_values(ReNCPSD_tr) ImNCPSD_tr = unumpy.nominal_values(ImNCPSD_tr) # Guardo el espectro complejo norm_spec.append(ReNCPSD_tr + 1j * ImNCPSD_tr) self.norm_spectrums = norm_spec return self.norm_spectrums, self.f
def _get_transfer_name(self, reactor_name): """ Get names of transfer functions """ # FOLDER_NAME = "transferencias" if reactor_name: sub_folder = reactor_name else: sub_folder = "" path_file = [] for i, K in enumerate(self.K): K_over_100 = np.round(K.n / 100) if i !=3: file_name = "TR{:.0f}{:.0f}A{}.PSD".format( K_over_100, self.BW, i+1) path_file.append(os.path.join(self.TRANSFERS_FOLDER, sub_folder, file_name)) return path_file def _get_transfer_function(self, file_name, freqs_to_remove=[50, 100, 150]): """ Función para leer archivos .PSD con las funciones de transferencia producidas por el programa TRA.EXE. Se quitan también las frecuencias de línea. Parameters ---------- file_name : string Nombre del archivo con extensión PSD de la función transferencia Returns ------- H2_dB : numpy array Módulo cuadrado de la función transferencia. Expresado en dB 10*log10(H2) ReH : numpy array Parte real de la función transferencia ImH : numpy array Parte imaginaria de la función transferencia """ # TODO: Se podría poner en los comentarios de los archivos la # identificación del reactor (al menos). Para evitar confusiones. # Preguntar. SKIP_HEADER = 12 # Lectura de los datos # Frecuencia [Hz] Re(CPSD) Im(CPSD) APSD1[dB] APSD2[dB] _data = np.loadtxt(file_name, skiprows=SKIP_HEADER) # Frecuencia [Hz] freq = _data[:, 0] # Re(CPSD) _ReCPSD = _data[:, 1] # Im(CPSD) _ImCPSD = _data[:, 2] # APSD1 [dB] _APSD1_dB = _data[:, 3] # APSD2 [dB] _APSD2_dB = _data[:, 4] # Módulo cuadrado de H(f) H2_dB = _APSD2_dB - _APSD1_dB # Parte real e imaginaria de H(f) _APSD1 = 10**(_APSD1_dB / 10) ReH = _ReCPSD / _APSD1 ImH = _ImCPSD / _APSD1 # Se quitan frecuencias de línea H2_dB = self._remove_utility_freq(freq, H2_dB, freqs_to_remove) ReH = self._remove_utility_freq(freq, ReH, freqs_to_remove) ImH = self._remove_utility_freq(freq, ImH, freqs_to_remove) return H2_dB, ReH, ImH def _remove_utility_freq(self, frequency, spectrum, freqs_to_remove): """ Removes the utility frequencies from the spectrum Values of 'spectrum' at those freqs (+/- 1Hz) are replaced with those of its neighbours (Angel's approach). Parameters ---------- frequency : numpy array Vector with frequencies spectrum : numpy array Vector with the spectrum (should be same size of frequency) freqs_to_remove : double or array Frequency/ies to be removed Returns ------- new_spectrum : numpy array Spectrum without the freqs_to_remove """ # Force list if not isinstance(freqs_to_remove, list): freqs_to_remove = [freqs_to_remove] new_spectrum = np.copy(spectrum) for freqs in freqs_to_remove: indx_to_remove = (frequency > freqs-1) & (frequency < freqs + 1) # Closest value to (freqs - 1Hz) value_to_assign = new_spectrum[frequency <= freqs-1][-1] new_spectrum[indx_to_remove] = value_to_assign # print("Aviso: se quitó la frecuencia {} Hz".format(freqs)) return new_spectrum
[documentos] def plot_spectrums(self, normalizados=True, *args, **kargs): if normalizados: if self.norm_spectrums is None: raise NameError("Faltan estimar los espectros normalizados") _datas = self.norm_spectrums _labels = ["NPSD1", "NPSD2", "Re(NCPSD12)"] _units = "[1 / Hz]" else: if self.spectrums is None: raise NameError("Faltan estimar los espectros") _datas = self.spectrums _labels = ["PSD1", "PSD2", "Re(CPSD12)"] _units = "[V$^2$ / Hz]" figs, ax = plt.subplots(1, 1) for i, data in enumerate(_datas): if i == 2: data = np.real(data) ax.plot(self.f, data, '.-', label=_labels[i]) ax.set_xlabel("Frecuencia [Hz]") ax.set_ylabel("Espectro " + _units) ax.legend() ax.set_title(f"Archivo leido: {self.file_name}") figs.tight_layout() plt.show()
[documentos] def fit_spectrums(self, intervals, **kargs): """ Ajuste de los espectros intervals: list of tuples Lista con los límitesde los intervalos para el ajuste [(f_min, f_max), (f_min, f_max), ...] en el orden [NPSD1, NPSD2, CPSD12] kargs: plot: bool True si se quiere graficar. False por default """ if self.norm_spectrums is None: raise NameError("Faltan estimar los espectros normalizados") plot = kargs.get('plot', False) _ttle = ["NPSD1", "NPSD2", "Re(NCPSD12)"] _ylab = [i + " [1 / Hz]" for i in _ttle] self.fit_results = [] self.fitted_data = [] for i, (spe, inter) in enumerate(zip(self.norm_spectrums, intervals)): ind = (self.f >= inter[0]) & (self.f <= inter[1]) if i != 2: _dat = spe[ind] _res = ajuste_apsd(self.f[ind], _dat) else: _dat = np.real(spe[ind]) _res = ajuste_cpsd(self.f[ind], _dat) self.fit_results.append(_res) self.fitted_data.append([self.f[ind], _dat]) if plot: best_fit = _dat + _res.residual fig, (ax0, ax1) = plt.subplots(2, sharex=True, gridspec_kw={'height_ratios': [3,1]},) ax0.errorbar(self.f, np.real(spe), fmt='k.', elinewidth=2, label='Medición') ax0.plot(self.f[ind], best_fit, 'r', zorder=3, label='fit') ax0.set_ylabel(_ylab[i]) ax1.plot(self.f[ind], _res.residual, 'k.') ax1.set_xlabel('f [Hz]') ax1.set_ylabel(r'Residuos [1/Hz]') ax0.set_title(f"Ajuste de {_ttle[i]}") fig.subplots_adjust(hspace=0.1) if plot: plt.show()
[documentos] def calcula_parametros_cineticos(self, reactor, **kwargs): """ Calcula parámetros cinéticos en base al ajuste espectral Parameters ---------- reactor: reactor class Clase importada del archivo "constantes_reactores.py" Returns ------- self.parametros_cineticos: dic Diccionario con los parámetros cinéticos estimados. Los keys son 'alfa', 'potencia', 'fq' y 'efi_over_beta2'. Los últimos dos son None en caso de ajustar una NCPSD """ if self.fit_results is None: raise NameError("Faltan hacer el ajuste de los espectros") # Constantes cinéticas DIVEN = reactor.FACTOR_DIVEN BETA = reactor.BETA_EFECTIVO FORMA = reactor.FACTOR_FORMA_L EFISION = reactor.ENERGIA_FISION BENNETT = reactor.FACTOR_BENNETT self.parametros_cineticos = [] # Asocio con sus incertezas for i, res in enumerate(self.fit_results): # Valores estimados par_val = [res.params[s].value for s in res.var_names] if i != 2: # Cálculo con NAPSD if "covar" in dir(res): # Se obtiene la matríz de covarianza alfa, amplitud, constante = \ correlated_values(par_val, res.covar, tags=res.var_names) else: # No se obtiene matriz de covarianza alfa, amplitud, constante = par_val print( "No se pudo estimar la matríz de covaarianza del ajuste") fq = constante * self.I[i] / 2 efi_over_beta2 = amplitud * BENNETT / (1 - BETA) / FORMA \ / DIVEN / constante else: # Calculo con NCPSD if "covar" in dir(res): # Se obtiene la matríz de covarianza alfa, amplitud = correlated_values(par_val, res.covar, tags=res.var_names) else: # No se obtiene matriz de covarianza alfa, amplitud = par_val print( "No se pudo estimar la matríz de covaarianza del ajuste") constante = None fq = None efi_over_beta2 = None # Cálculo de la potencia potencia = 2 * EFISION * DIVEN * (1 - BETA) * FORMA / BETA**2 \ / amplitud # Salida _pars = { "amplitud": amplitud, "constante": constante, "alfa": alfa, "potencia": potencia, "fq": fq, "efi_over_beta2": efi_over_beta2, } self.parametros_cineticos.append(_pars)
[documentos] def raw_to_volt_NI(self, raw_data, volt_range): """ Values for converting integer values into volt for the NI 6353 board The conversion is performed according to: f(x) = a[0]x^0 + a[1]x^1 + a[2]*x^2 + a[3]*x^3 where 'x' is the raw data measured, 'a' are the coefficients given by the board specifications and f(x) is the data in Volts units. With the NI 6353 (PCI and USB?) the range is alwas bipolar (why?). The data in the .RUI file is converted to unipolar by adding 32768. So this last value should be substracted before calling this function. See Anexo 1 of IN-EN-GRYCN-FER-012_Rev0 for more info. Parameters ---------- raw_data : array Raw data measured by the board volt_range : string Range used for the measurement ('10', '5', '2', '1', '.5', '.2' and '.1') Returns ------- volt_data : array Voltage converted """ try: # Se trata de leer el archivo con los coeficientes # Archivo generado en el momento de la adquisición (con programa # nuevo) data_folder = os.path.dirname(self.file_name) name_c = self.file_name.split('-')[0] + \ "-coeficientes_int_to_volt.pkl" coef_file = os.path.join(data_folder, name_c) with open(coef_file, 'rb') as f: conversion = pickle.load(f) print("Se usan coenficientes de conversión a voltaje del", f"archivo {coef_file}") except IOError: # Se usan unos valores de la NI USB-6353. # Debido a que si no se guardaron en el momento de la adquisición, # los .RUI no tienen esta información. conversion = { .1: [-3.482209127662006e-05, 3.2165366451149876e-06, -2.6971282517362403e-16, 7.29540470181873e-20], .2: [-6.136419891439307e-05, 6.442667885061415e-06, -5.402301756997927e-16, 1.4612570838363108e-19], .5: [-0.0001385744440899197, 1.6111197802991186e-05, -1.3509551283910506e-15, 3.6541697226543377e-19], 1.0: [-0.00026948936905494587, 3.222353064123045e-05, -2.702005431685175e-15, 7.308596881874746e-19], 2.0: [-0.0005310680905440859, 6.443202206542382e-05, -5.4027497959046236e-15, 1.4613782729249057e-18], 5.0: [-0.0013130513105791529, 0.00016106960701858363, -1.3505998392576855e-14, 3.653208708652728e-18], 10.0: [-0.0026160260933094583, 0.0003222353136881715, -2.702005492694728e-14, 7.308597046898186e-18], } print("Se usan coenficientes de conversión a voltaje genéricos") _volt_float = float(volt_range) if _volt_float not in conversion.keys(): raise KeyError(f"The range {volt_range} not available for the", f"NI6353 board") else: volt_data = 0.0 for k, c in enumerate(conversion[_volt_float]): volt_data += c * raw_data**k return volt_data
[documentos] class FitResults(): """ Entre otras cosas, esta clase selecciona qué mediciones se utilizan para calcular ciertos parámetros (alfas, fq, etc). Por ejemplo, aquí se especifica que los fq se calculen sólo con las mediciones de 200Hz. """ def __init__(self, file_name_pkl=None): if file_name_pkl: self.file_name_pkl = file_name_pkl try: with open(self.file_name_pkl, "rb") as f: self.data = pickle.load(f) except IOError: raise IOError( f"FileResults no pudo abrir el archivo {self.file_name_pkl}") #self.data = np.asarray(self.data) columns = ["Nivel", "Medición", "BW", "Espectro", "Ip", "Ii", "Alfa", "Potencia", "fq", "efi/b2"] # Trabajo con una DataFrame de panda df = pd.DataFrame(self.data, columns=columns) if len(df.Espectro.unique()) == 3: self.n_det = 2 else: self.n_det = 1 # Alfas calculados con detector 1 a 40Hz self.alfa_1 = df[df["Espectro"]=="NPSD1"]["Alfa"] # Alfas calculados con detector 2 a 40Hz self.alfa_2 = df[df["Espectro"]=="NPSD2"]["Alfa"] # Alfas calculados con CPSD a 40Hz self.alfa = df[df["Espectro"]=="NCPSD"]["Alfa"] self.alfas = [self.alfa_1, self.alfa_2, self.alfa] # Poetncias calculadas con detector 1 self.pote_1 = df[df["Espectro"]=="NPSD1"]["Potencia"] # Poetncias calculadas con detector 2 self.pote_2 = df[df["Espectro"]=="NPSD2"]["Potencia"] # Poetncias calculadas con CPSD self.pote = df[df["Espectro"]=="NCPSD"]["Potencia"] self.potencias = [self.pote_1, self.pote_2, self.pote] if self.n_det == 2: self.pote_bylevel = df[df["Espectro"]=="NCPSD"].groupby( "Nivel")["Potencia"].apply(unumpy.nominal_values).apply(np.mean) self.alfa_bylevel = df[df["Espectro"]=="NCPSD" ].groupby("Nivel")["Alfa"] elif self.n_det == 1: self.pote_bylevel = df[df["Espectro"]=="NPSD1"].groupby( "Nivel")["Potencia"].apply(unumpy.nominal_values).apply(np.mean) self.alfa_bylevel = df[df["Espectro"]=="NPSD1" ].groupby("Nivel")["Alfa"] # Fq fq_1 = df[(df["Espectro"]=="NPSD1") & (df["BW"]==200)]["fq"] fq_2 = df[(df["Espectro"]=="NPSD2") & (df["BW"]==200)]["fq"] self.fqs = [fq_1, fq_2] self.I_fq1 = df[(df["Espectro"]=="NPSD1") & (df["BW"]==200)]["Ii"] self.I_fq2 = df[(df["Espectro"]=="NPSD2") & (df["BW"]==200)]["Ii"] self.fq_1_bylevel = df[(df["Espectro"]=="NPSD1") & (df["BW"]==200)].groupby("Nivel")["fq"] self.fq_2_bylevel = df[(df["Espectro"]=="NPSD2") & (df["BW"]==200)].groupby("Nivel")["fq"] # Eficiencia dividio beta cuadrado efi_b2_1 = df[(df["Espectro"]=="NPSD1") & (df["BW"]==200)]["efi/b2"] efi_b2_2 = df[(df["Espectro"]=="NPSD2") & (df["BW"]==200)]["efi/b2"] self.efi_b2s = [efi_b2_1, efi_b2_2] self.efi_1_bylevel = df[(df["Espectro"]=="NPSD1") & (df["BW"]==200)].groupby("Nivel")["efi/b2"] self.efi_2_bylevel = df[(df["Espectro"]=="NPSD2") & (df["BW"]==200)].groupby("Nivel")["efi/b2"] # Corrientes self.I1 = df[df["Espectro"]=="NPSD1"]["Ii"] self.I2 = df[df["Espectro"]=="NPSD2"]["Ii"] self.Ip = df[df["Espectro"]=="NPSD1"]["Ip"] self.Is = [self.I1, self.I2, self.Ip]
[documentos] def simulate_data_S19(self): """ Datos medidos en el RA-3 con dos detectores (serie 19) """ dat = [] dat.append([1, 1, 200, "NPSD1", '', '', '', '', '', '']) dat.append([1, 1, 200, "NPSD2", '', '', '', '', '', '']) dat.append([1, 1, 200, "NCPSD", '', None, '', '', None, None]) dat.append([1, 2, 40, "NPSD1", '', '', '', '', '', '']) dat.append([1, 2, 40, "NPSD2", '', '', '', '', '', '']) dat.append([1, 2, 40, "NCPSD", '', None, '', '', None, None]) dat.append([1, 3, 40, "NPSD1", '', '', '', '', '', '']) dat.append([1, 3, 40, "NPSD2", '', '', '', '', '', '']) dat.append([1, 3, 40, "NCPSD", '', None, '', '', None, None]) dat.append([2, 1, 200, "NPSD1", '', '', '', '', '', '']) dat.append([2, 1, 200, "NPSD2", '', '', '', '', '', '']) dat.append([2, 1, 200, "NCPSD", '', None, '', '', None, None]) dat.append([2, 2, 40, "NPSD1", '', '', '', '', '', '']) dat.append([2, 2, 40, "NPSD2", '', '', '', '', '', '']) dat.append([2, 2, 40, "NCPSD", '', None, '', '', None, None]) dat.append([2, 3, 40, "NPSD1", '', '', '', '', '', '']) dat.append([2, 3, 40, "NPSD2", '', '', '', '', '', '']) dat.append([2, 3, 40, "NCPSD", '', None, '', '', None, None]) dat.append([3, 1, 200, "NPSD1", '', '', '', '', '', '']) dat.append([3, 1, 200, "NPSD2", '', '', '', '', '', '']) dat.append([3, 1, 200, "NCPSD", '', None, '', '', None, None]) dat.append([3, 2, 40, "NPSD1", '', '', '', '', '', '']) dat.append([3, 2, 40, "NPSD2", '', '', '', '', '', '']) dat.append([3, 2, 40, "NCPSD", '', None, '', '', None, None]) dat.append([3, 3, 40, "NPSD1", '', '', '', '', '', '']) dat.append([3, 3, 40, "NPSD2", '', '', '', '', '', '']) dat.append([3, 3, 40, "NCPSD", '', None, '', '', None, None]) self.data = dat columns = ["Nivel", "Medición", "BW", "Espectro", "Ip", "Ii", "Alfa", "Potencia", "fq", "efi/b2"] df = pd.DataFrame(dat, columns=columns) I1, _, I1_s = np.loadtxt(os.path.join("test","I1-S19.EST"), skiprows=5, delimiter=',', unpack=True) I2, _, I2_s = np.loadtxt(os.path.join("test","I2-S19.EST"), skiprows=5, delimiter=',', unpack=True) Ip, _, Ip_s = np.loadtxt(os.path.join("test","IP-S19.EST"), skiprows=5, delimiter=',', unpack=True) pot, _, pot_s = np.loadtxt(os.path.join("test","POTE-S19.EST"), skiprows=5, delimiter=',', unpack=True) pot1, pot1_s = np.loadtxt(os.path.join("test","P1I1-S19.LIN"), skiprows=3, delimiter=',', unpack=True) pot1 = pot1[1::2] pot1_s = pot1_s[1::2] pot2, pot2_s = np.loadtxt(os.path.join("test","P2I1-S19.LIN"), skiprows=3, delimiter=',', unpack=True) pot2 = pot2[1::2] pot2_s = pot2_s[1::2] alfa, _, alfa_s = np.loadtxt(os.path.join("test","ALFA-S19.EST"), skiprows=5, delimiter=',', unpack=True) alfa1, _, alfa1_s = np.loadtxt(os.path.join("test","ALF1-S19.EST"), skiprows=5, delimiter=',', unpack=True) alfa2, _, alfa2_s = np.loadtxt(os.path.join("test","ALF2-S19.EST"), skiprows=5, delimiter=',', unpack=True) fq1, _, fq1_s = np.loadtxt(os.path.join("test","FQ1-S19.EST"), skiprows=5, delimiter=',', unpack=True) fq1 = fq1 * 1E-15 fq1_s = fq1_s * 1E-15 fq2, _, fq2_s = np.loadtxt(os.path.join("test","FQ2-S19.EST"), skiprows=5, delimiter=',', unpack=True) fq2 = fq2 * 1E-15 fq2_s = fq2_s * 1E-15 efi1, _, efi1_s = np.loadtxt(os.path.join("test","EF1B-S19.EST"), skiprows=5, delimiter=',', unpack=True) efi2, _, efi2_s = np.loadtxt(os.path.join("test","EF2B-S19.EST"), skiprows=5, delimiter=',', unpack=True) uI1 = [] for n, s in zip(I1, I1_s): uI1.append(ufloat(n, s)) uI2 = [] for n, s in zip(I2, I2_s): uI2.append(ufloat(n, s)) uIp = [] for n, s in zip(Ip, Ip_s): uIp.append(ufloat(n, s)) upot = [] for n, s in zip(pot, pot_s): upot.append(ufloat(n, s)) upot1 = [] for n, s in zip(pot1, pot1_s): upot1.append(ufloat(n, s)) upot2 = [] for n, s in zip(pot2, pot2_s): upot2.append(ufloat(n, s)) ualfa = [] for n, s in zip(alfa, alfa_s): ualfa.append(ufloat(n, s)) ualfa1 = [] for n, s in zip(alfa1, alfa1_s): ualfa1.append(ufloat(n, s)) ualfa2 = [] for n, s in zip(alfa2, alfa2_s): ualfa2.append(ufloat(n, s)) ufq1 = [] for n, s in zip(fq1, fq1_s): ufq1.append(ufloat(n, s)) ufq2 = [] for n, s in zip(fq2, fq2_s): ufq2.append(ufloat(n, s)) uefi1 = [] for n, s in zip(efi1, efi1_s): uefi1.append(ufloat(n, s)) uefi2 = [] for n, s in zip(efi2, efi2_s): uefi2.append(ufloat(n, s)) df.loc[df["Espectro"]=="NPSD1", "Ii"] = uI1 df.loc[df["Espectro"]=="NPSD2", "Ii"] = uI2 df.loc[df["Espectro"]=="NPSD1", "Ip"] = uIp df.loc[df["Espectro"]=="NPSD2", "Ip"] = uIp df.loc[df["Espectro"]=="NCPSD", "Ip"] = uIp df.loc[df["Espectro"]=="NCPSD", "Potencia"] = upot df.loc[df["Espectro"]=="NPSD1", "Potencia"] = upot1 df.loc[df["Espectro"]=="NPSD2", "Potencia"] = upot2 df.loc[(df["Espectro"]=="NCPSD") & (df["BW"]==40), "Alfa"] = ualfa df.loc[(df["Espectro"]=="NPSD1") & (df["BW"]==40), "Alfa"] = ualfa1 df.loc[(df["Espectro"]=="NPSD2") & (df["BW"]==40), "Alfa"] = ualfa2 df.loc[(df["Espectro"]=="NPSD1") & (df["BW"]==200), "fq"] = ufq1 df.loc[(df["Espectro"]=="NPSD2") & (df["BW"]==200), "fq"] = ufq2 df.loc[(df["Espectro"]=="NPSD1") & (df["BW"]==200), "efi/b2"] = uefi1 df.loc[(df["Espectro"]=="NPSD2") & (df["BW"]==200), "efi/b2"] = uefi2 df.replace("", np.nan, inplace=True) self.data = df with open("S19-fit_results.pkl", "wb") as f: pickle.dump(df.to_numpy(), f)
[documentos] def fit_p_vs_I(self, limites=None, plot=True, *args, **kargs): """ Se hacen todos los ajustes lineales de corrientes y potencias p vs I1, p vs I2, p vs Ip """ if not hasattr(self, '_I_mag'): self._I_mag, self._I_std = self._split_pandas_ufloat(self.Is) if not hasattr(self, '_pot_mag'): self._pots_mag, self._pots_std = \ self._split_pandas_ufloat(self.potencias) # Para saber qué potencia usar para el ajuste # Si existe, tomo la calculada con NCPSD, de lo contrario con NPSD1 ind_spec = 2 if self._pots_mag[2].size > 0 else 0 # Selecciono la potencia para el ajuste self._pot_mag = self._pots_mag[ind_spec] self._pot_std = self._pots_std[ind_spec] if limites is None: limites = [] for curr in self._I_mag: if curr.size > 0: limites.append([np.min(curr), np.max(curr)]) else: limites.append([]) labels = ["I$_1$ [A]", "I$_2$ [A]", "I$_p$ [A]"] # Ajustes lineales fited_p_vs_I = [] for k, (cu, cu_s) in enumerate(zip(self._I_mag, self._I_std)): # Ajustes if cu.size > 0: # Límites del ajuste I1, I2 = limites[k] ind = (cu >= I1) & (cu <= I2) # Ajuste de p vs I a, b, r_2, cov, y_fit = ajuste_lineal( cu[ind], self._pot_mag[ind], self._pot_std[ind]) fited_p_vs_I.append([a, b, r_2, cov, y_fit]) if plot: fig, ax = plt.subplots(1, 1) ax.errorbar(cu, self._pot_mag, xerr=cu_s, yerr=self._pot_std, fmt='ok') ax.plot(cu[ind], y_fit, 'r-') ax.set_xscale('log') ax.set_yscale('log') ax.set_xlabel(labels[k]) ax.set_ylabel("p [W]") # En el gráfico str_a = f"a = ${a:.2uL}$ W/A\n" str_b = f"b = ${b:.2uL}$ W/A\n" str_r2 = f"r$^2$ = ${r_2:1.5f}$" bbox_props = dict(boxstyle='Round', fc='w') ax.annotate(str_a + str_b + str_r2, xy=(0.4, 0.15), bbox=bbox_props, size=12, xycoords='axes fraction') fig.tight_layout() else: # Para respetar el orden [f1, f2, fp] fited_p_vs_I.append([[]]) # Graficos self.fited_p_vs_I = np.asarray(fited_p_vs_I, dtype=object) plt.show()
[documentos] def fit_Ip_vs_I(self, limites=None, plot=True, *args, **kargs): """ Se hacen todos los ajustes lineales entre corrientes Ip vs I1, Ip vs I2 """ if not hasattr(self, "_I_mag"): self._I_mag, self._I_std = self._split_pandas_ufloat(self.Is) if limites is None: limites = [] for curr in self._I_mag[:-1]: if curr.size > 0: limites.append([np.min(curr), np.max(curr)]) else: limites.append([]) labels = ["I$_1$ [A]", "I$_2$ [A]", "I$_p$ [A]"] # Ajustes lineales fit_Ip_vs_I = [] for k, (cu, cu_s) in enumerate(zip(self._I_mag[:-1], self._I_std[:-1])): # Ajustes if cu.size > 0: # Límites del ajuste I1, I2 = limites[k] ind = (cu >= I1) & (cu <= I2) # Ajuste de p vs I a, b, r_2, cov, y_fit = ajuste_lineal( cu[ind], self._I_mag[2][ind], self._I_std[2][ind]) fit_Ip_vs_I.append([a, b, r_2, cov, y_fit]) if plot: fig, ax = plt.subplots(1, 1) ax.errorbar(cu, self._I_mag[2], xerr=cu_s, yerr=self._I_std[2], fmt='ok') ax.plot(cu[ind], y_fit, 'r-') ax.set_xscale('log') ax.set_yscale('log') ax.set_xlabel(labels[k]) ax.set_ylabel("I$_p$ [A]") # En el gráfico str_a = f"a = ${a:.2uL}$ \n" str_b = f"b = ${b:.2uL}$ \n" str_r2 = f"r$^2$ = ${r_2:1.5f}$" bbox_props = dict(boxstyle='Round', fc='w') ax.annotate(str_a + str_b + str_r2, xy=(0.4, 0.15), bbox=bbox_props, size=12, xycoords='axes fraction') fig.tight_layout() else: # Para respetar el orden fit_Ip_vs_I.append([[]]) # Graficos self.fit_Ip_vs_I = np.asarray(fit_Ip_vs_I, dtype=object) plt.show()
def _split_pandas_ufloat(self, panda_serie): """ Convierte una serie de panda con datos ufloat a dos arrays de numpy Uno es la magnitud y la otra su incerteza """ mag, std = [], [] for it in panda_serie: mag.append(unumpy.nominal_values(it.to_numpy())) std.append(unumpy.std_devs(it.to_numpy())) return mag, std
[documentos] def average_parameters(self, plot=True): """ Hace los promedios pesados de a, fq y e/b2 self.añfa_prom y self.alfa_std: floats self.fqs_av: list of touples (mag, std) self.eb2_av: list of touples (mag, std) """ # ---- Alfas _alfas_mag, _alfas_std = self._split_pandas_ufloat(self.alfas) # Para saber qué alfas usar para el promedio # Si existen, tomo la calculada con NCPSD, de lo contrario con NPSD1 ind_spec = 2 if _alfas_mag[2].size > 0 else 0 # Selecciono la potencia para el ajuste self._alfa_mag = _alfas_mag[ind_spec] self._alfa_std = _alfas_std[ind_spec] self.alfa_prom, self.alfa_std = \ weighted_average(self._alfa_mag, 1.0 / self._alfa_std**2) # ----- fqs self._fqs_mag, self._fqs_std = self._split_pandas_ufloat(self.fqs) self.fqs_av = [] for fq, fq_s in zip(self._fqs_mag, self._fqs_std): self.fqs_av.append(weighted_average(fq, 1 / fq_s**2)) # ----- e/b2 self._eb2_mag, self._eb2_std = self._split_pandas_ufloat(self.efi_b2s) self.eb2_av = [] for eb2, eb2_s in zip(self._eb2_mag, self._eb2_std): self.eb2_av.append(weighted_average(eb2, 1 / eb2_s**2)) if plot: # alfas alfa_lvl = self.alfa_bylevel.apply(unumpy.nominal_values) alfa_lvl_s = self.alfa_bylevel.apply(unumpy.std_devs) fig, ax = plt.subplots(1, 1) for p, a, a_s in zip(self.pote_bylevel, alfa_lvl, alfa_lvl_s): ax.errorbar(p * np.ones_like(a), a, yerr=a_s, fmt='ok', label="Con NCPSD a 40Hz") # valor medio x = np.linspace(10, 10e3, 20) y = np.ones_like(x) * self.alfa_prom y_inf = np.ones_like(x) * (self.alfa_prom - self.alfa_std) y_sup = np.ones_like(x) * (self.alfa_prom + self.alfa_std) ax.plot(x, y, 'r-') ax.plot(x, y_inf, 'r--') ax.plot(x, y_sup, 'r--') handles, labels = ax.get_legend_handles_labels() ax.legend([handles[0]], [labels[0]]) ax.set_xscale('log') ax.set_xlabel('Potencia [W]') ax.set_ylabel("$\\alpha$ [1/s]") ax.set_xlim(10, 10e3) a_uf = ufloat(self.alfa_prom, self.alfa_std) ax.set_title(f"$\\langle\\alpha\\rangle$ = {a_uf} 1/s") # fq1 fq1 = self.fqs[0].apply(unumpy.nominal_values) fq1_s = self.fqs[0].apply(unumpy.std_devs) I_fq1 = self.I_fq1.apply(unumpy.nominal_values) fig, ax = plt.subplots(1, 1) ax.errorbar(I_fq1, fq1, yerr=fq1_s, fmt='ok', label="Detector 1") hand, lab = ax.get_legend_handles_labels() hand1, lab1 = hand[0], lab[0] uf1 = ufloat(*self.fqs_av[0]) str_title1 = f"$\\langle f_{{q1}} \\rangle$ = {uf1} C" ax.set_title(str_title1) ax.legend([hand1], [lab1]) ax.set_xscale('log') ax.set_xlabel('I$_1$ [A]') ax.set_ylabel("f$_q$ [C]") # fq2 fq2 = self.fqs[1].apply(unumpy.nominal_values) fq2_s = self.fqs[1].apply(unumpy.std_devs) I_fq2 = self.I_fq2.apply(unumpy.nominal_values) if fq2.size > 0: fig, ax = plt.subplots(1, 1) ax.errorbar(I_fq2, fq2, yerr=fq2_s, fmt='ok', label="Detector 2") hand, lab = ax.get_legend_handles_labels() hand2, lab2 = hand[-1], lab[-1] ax.legend([hand1, hand2], [lab1, lab2]) uf2 = ufloat(*self.fqs_av[1]) str_title2 = f"$\\langle f_{{q2}} \\rangle$ = {uf2} C" ax.set_title(str_title1 + '\t' + str_title2) ax.set_xscale('log') ax.set_xlabel('I$_2$ [A]') ax.set_ylabel("f$_q$ [C]") # efi/beta2 1 efi1 = self.efi_1_bylevel.apply(unumpy.nominal_values) efi1_s = self.efi_1_bylevel.apply(unumpy.std_devs) fig, ax = plt.subplots(1, 1) for p, ef1, ef1_s in zip(self.pote_bylevel, efi1, efi1_s): ax.errorbar(p * np.ones_like(ef1), ef1, yerr=ef1_s, fmt='ok', label="Detector 1") hand, lab = ax.get_legend_handles_labels() hand1, lab1 = hand[0], lab[0] ef1 = ufloat(*self.eb2_av[0]) str_title1 = f"$\\langle \\epsilon_1/\\beta^2 \\rangle$ = {ef1}" # efi/beta2 2 efi2 = self.efi_2_bylevel.apply(unumpy.nominal_values) efi2_s = self.efi_2_bylevel.apply(unumpy.std_devs) if fq2.size > 0: for p, ef2, ef2_s in zip(self.pote_bylevel, efi2, efi2_s): ax.errorbar(p * np.ones_like(ef2), ef2, yerr=ef2_s, fmt='ob', label="Detector 2") hand, lab = ax.get_legend_handles_labels() hand2, lab2 = hand[-1], lab[-1] ax.legend([hand1, hand2], [lab1, lab2]) ef2 = ufloat(*self.eb2_av[1]) str_title2 = f"$\\langle \\epsilon_2/\\beta^2 \\rangle$ = {ef2}" ax.set_title(str_title1 + '\t' + str_title2) else: ax.legend([hand1], [lab1]) ax.set_title(str_title1) ax.set_xscale('log') ax.set_xlabel('Potencia [W]') ax.set_ylabel(r"$\epsilon/\beta^2$") ax.set_xlim(10, 10e3) plt.show()
if __name__ == "__main__": sys.path.append(parent) ########################################################################## # Prueba de clase HDF5FileProcessing ########################################################################## if False: from constantes.constantes_reactores import RA3 as REACTOR file_name = "S01-N1-1.h5" folder = os.path.join(parent, "gui") file_path = os.path.join(folder, file_name) rui = HDF5FileProcessing(file_path) print(f"Fecha: {rui.date}") print(f"Hora: {rui.hour}") print(f"Resolucion: {rui.resolucion_DAC}") print(f"Sensibilidades: {rui.sensibilidades}") print(f"DC {rui.DC_volts}") print(f"DC rangos {rui.DC_rangos}") print(f"Offsets {rui.offsets}") print(f"AC gains {rui.AC_gains}") print(f"Corrientes fondo {rui.If}") print(f"Corrientes DC {rui.I}") print(f"Corrientes Ip {rui.Ip}") # quit() # Espectros sin normalizar spect, fs = rui.get_spectrums(512, 0) # Espectros normalizados y corregidos por función transferencia norm_spect, fs = rui.get_normalized_spectrums("RA3", freqs_to_remove=[50, 100, 150]) # Graficación de los espectros # rui.plot_spectrums(normalizados=True) # Intervalos para el ajuste if rui.BW == 200: intervalos = 2 * [(2.3, 178.9)] + [(2.3, 93)] elif rui.BW == 40: intervalos = 3 * [(2.8, 35.8)] rui.fit_spectrums(intervalos, plot=True) rui.calcula_parametros_cineticos(REACTOR) print(80*'-') for param in rui.parametros_cineticos: for key, val in param.items(): if val: print(key, f"{val:.3e}") else: print(key, f"{val}") print(80*'-') ########################################################################## # Prueba de clase FitResults ########################################################################## if False: #filename = "S98-fit_results.pkl" # Dos detectores RA-3 filename = "S19-fit_results.pkl" # Un detector RA-1 fit_results = FitResults(filename) fit_results.fit_p_vs_I() for fps in fit_results.fited_p_vs_I: print(fps[0]) fit_results.fit_Ip_vs_I(plot=True) fit_results.average_parameters(plot=True) ########################################################################## # Prueba de clase RUIFile ########################################################################## if True: from constantes.constantes_reactores import RA3 as REACTOR file_name = "S31-N3-1.RUI" folder = os.path.join(parent, "datos") file_path = os.path.join(folder, file_name) rui = RUIFileProcessing(file_path) print(f"Rangos utilizados: {rui.rango}") print(f"Resolución del ADC: {rui.ADC_resolution}") print(f"Corrientes: {rui.I}") rui.get_temporal_signals() # Espectros sin normalizar spect, fs = rui.get_spectrums(512, 0) # Espectros normalizados y corregidos por función transferencia norm_spect, fs = rui.get_normalized_spectrums("RA1", freqs_to_remove=[50, 100, 150]) # Graficación de los espectros # rui.plot_spectrums(normalizados=True) # Intervalos para el ajuste rui.BW = round(rui.BW) if rui.BW == 200: intervalos = 2 * [(2.3, 178.9)] + [(2.3, 93)] elif rui.BW == 40: intervalos = 3 * [(2.8, 35.8)] rui.fit_spectrums(intervalos, plot=True) rui.calcula_parametros_cineticos(REACTOR) print(80*'-') for param in rui.parametros_cineticos: for key, val in param.items(): print(key, val) print(80*'-')