Source code for oilftir.astm_utils

# -*- coding: utf-8 -*-
"""
astm_utils.py
==================
FTIR Spectrum Processing Utilities for Lubricating Oil Analysis
Based on ASTM E2412 standard methodology.

Provides:
  - Spectrum loading (CSV/TXT; optional PerkinElmer .sp via specio)
  - Baseline correction and spectral normalisation
  - Net area calculation for ASTM-defined absorption bands
  - Hit Quality Index (HQI) library search
  - General-purpose plotting

Dependencies
------------
  numpy, pandas, scipy, matplotlib, pybaselines
  specio (optional – only needed for .sp files)

Usage example
-------------
  from oilftir.astm_utils import load_spectrum, astm_areas

  df, meta = load_spectrum("sample.csv")
  results = astm_areas(df)
  print(results)

References
----------
  ASTM E2412-23  Standard Practice for Condition Monitoring of Used Lubricants
                 by Trend Analysis Using Fourier Transform Infrared (FT-IR)
                 Spectrometry
"""

from __future__ import annotations
import re
import struct
import time
import concurrent.futures
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pybaselines import Baseline

# ---------------------------------------------------------------------------
# Optional dependency: specio (PerkinElmer .sp files)
# ---------------------------------------------------------------------------
try:
    from specio import specread
    import specio.plugins.sp as _sp_plugin

    def _decode_5104_smart(data):
        """
        Replacement decoder for specio's _decode_5104.
        Searches the binary blob for text segments rather than relying on
        fixed offsets, which are instrument-firmware–dependent.
        """
        text_items = []
        idx = 0
        while idx < len(data) - 2:
            try:
                possible_size = struct.unpack('<h', data[idx:idx + 2])[0]
            except Exception:
                break
            if 0 < possible_size < 500:
                start_str = idx + 2
                end_str = start_str + possible_size
                if end_str <= len(data):
                    chunk = data[start_str:end_str]
                    try:
                        decoded = chunk.decode('latin1')
                        if any(c.isalnum() for c in decoded):
                            text_items.append(decoded)
                            idx = end_str
                            next_tag = data.find(b'#u', idx)
                            if next_tag != -1:
                                idx = next_tag + 2
                                continue
                    except Exception:
                        pass
            idx += 1

        keys = ['description', 'axis_x', 'axis_y', 'date', 'time', 'sample_info']
        return {k: v for k, v in zip(keys, text_items) if v}

    _sp_plugin._decode_5104 = _decode_5104_smart
    _SPECIO_AVAILABLE = True

except ImportError:
    _SPECIO_AVAILABLE = False


# ===========================================================================
# DATA LOADING
# ===========================================================================

def _clean_binary_metadata(meta_dict: dict) -> dict:
    """
    Remove binary artefacts from a metadata dictionary, keeping only
    human-readable strings.

    Parameters
    ----------
    meta_dict : dict
        Raw metadata as returned by specio.

    Returns
    -------
    dict
        Cleaned metadata; non-string values are kept unchanged.
    """
    pattern = r'[a-zA-Z0-9\s:,\.\(\)-]{3,}'
    cleaned = {}
    for key, val in meta_dict.items():
        if not isinstance(val, str):
            cleaned[key] = val
            continue
        matches = re.findall(pattern, val)
        if not matches:
            cleaned[key] = "N/A" if len(val) > 10 else val
        else:
            text = " | ".join(m.strip() for m in matches)
            text = re.sub(r'\s+', ' ', text).strip()
            cleaned[key] = text
    return cleaned


[docs] def load_spectrum(path: str, magnitude: str = 'A') -> tuple[pd.DataFrame, dict]: """ Load an FTIR spectrum from a file. Supported formats ----------------- * CSV / TXT – two-column file (wavenumber, amplitude); delimiter is auto-detected (comma or whitespace). * .sp – PerkinElmer binary format (requires the *specio* package). Parameters ---------- path : str Path to the spectrum file. magnitude : {'A', 'T'} Indicates whether amplitudes in the file are Absorbance ('A') or Transmittance ('T', expressed as a fraction 0–1 or percentage 0–100). Returns ------- df : pd.DataFrame Columns: ``cm-1`` (wavenumber), ``A`` (absorbance), ``T`` (transmittance 0–1). meta : dict Metadata extracted from the file (empty dict for plain text files). Raises ------ ValueError If *magnitude* is not 'A' or 'T'. ImportError If a .sp file is requested but *specio* is not installed. """ if magnitude not in ('A', 'T'): raise ValueError("'magnitude' must be 'A' (absorbance) or 'T' (transmittance).") ext = Path(path).suffix.lower() # --- PerkinElmer .sp --- if ext == '.sp': if not _SPECIO_AVAILABLE: raise ImportError( "The 'specio' package is required to read .sp files. " "Install it with: pip install specio" ) spectra = specread(path) meta = _clean_binary_metadata(spectra.meta) df = pd.DataFrame({'cm-1': spectra.wavelength}) if magnitude == 'A': df['A'] = spectra.amplitudes df['T'] = 10 ** (-df['A'].values) else: raw = np.array(spectra.amplitudes) # Accept both fractional (0-1) and percentage (0-100) transmittance df['T'] = raw / 100.0 if raw.max() > 1.5 else raw df['A'] = -np.log10(np.where(df['T'] > 0, df['T'], np.nan)) return df, meta # --- Plain text (CSV / space-delimited) --- with open(path) as fh: first_line = fh.readline() delimiter = ',' if ',' in first_line else None data = np.loadtxt(path, delimiter=delimiter, comments='#') meta = {} df = pd.DataFrame({'cm-1': data[:, 0]}) if magnitude == 'A': df['A'] = data[:, 1] df['T'] = 10 ** (-df['A'].values) else: raw = data[:, 1] df['T'] = raw / 100.0 if raw.max() > 1.5 else raw df['A'] = -np.log10(np.where(df['T'] > 0, df['T'], np.nan)) return df, meta
# =========================================================================== # PRE-PROCESSING # ===========================================================================
[docs] def remove_baseline(df: pd.DataFrame, units: str = 'A', poly_order: int = 5) -> pd.DataFrame: """ Subtract a polynomial baseline from the spectrum using the modified polynomial method (ModPoly). Parameters ---------- df : pd.DataFrame Spectrum DataFrame with columns ``cm-1`` and *units*. units : str Column to correct ('A' or 'T'). poly_order : int Degree of the fitting polynomial (default 5). Returns ------- pd.DataFrame Copy of *df* with the baseline subtracted in-place. """ baseline_fitter = Baseline(x_data=df['cm-1']) baseline, _ = baseline_fitter.modpoly( df[units], poly_order=poly_order, tol=1e-3, max_iter=150 ) df = df.copy() df[units] = df[units] - baseline return df
[docs] def scale_reference(df_sample: pd.DataFrame, df_ref: pd.DataFrame, units: str = 'A') -> pd.DataFrame: """ Scale a reference spectrum to best match a sample spectrum using least-squares projection (scalar factor *k*). The scaling factor is computed as: k = (sample · reference) / (reference · reference) Parameters ---------- df_sample : pd.DataFrame Sample spectrum. df_ref : pd.DataFrame Reference spectrum to be scaled. units : str Column used for comparison. Returns ------- pd.DataFrame Scaled copy of *df_ref*. """ vec_sample = df_sample[units].values vec_ref = df_ref[units].values norm_sq = np.dot(vec_ref, vec_ref) k = np.dot(vec_sample, vec_ref) / norm_sq if norm_sq != 0 else 1.0 df_scaled = df_ref.copy() df_scaled[units] = df_scaled[units] * k return df_scaled
[docs] def subtract_reference(df_sample: pd.DataFrame, df_ref: pd.DataFrame, wavenumber_range: tuple[float, float], units: str = 'A') -> tuple[np.ndarray, float, np.ndarray]: """ Scale and subtract a reference spectrum from a sample spectrum over a specified wavenumber range. The reference is first interpolated onto the sample's wavenumber axis, then scaled using the least-squares projection within *wavenumber_range*. Parameters ---------- df_sample : pd.DataFrame Sample spectrum. df_ref : pd.DataFrame Reference spectrum (may have a different wavenumber axis). wavenumber_range : (float, float) ``(k_min, k_max)`` – region used to compute the scaling factor. units : str Column to use ('A' or 'T'). Returns ------- y_diff : np.ndarray Difference spectrum (sample − scaled reference). k : float Scaling factor applied to the reference. y_ref_scaled : np.ndarray Scaled reference interpolated onto the sample axis. """ k_min, k_max = wavenumber_range x_sample = df_sample['cm-1'].values y_sample = df_sample[units].values x_ref = df_ref['cm-1'].values y_ref = df_ref[units].values # Sort reference in ascending wavenumber order sort_idx = np.argsort(x_ref) x_ref_s, y_ref_s = x_ref[sort_idx], y_ref[sort_idx] y_ref_interp = np.interp(x_sample, x_ref_s, y_ref_s, left=0, right=0) mask = (x_sample >= k_min) & (x_sample <= k_max) if mask.sum() == 0: mask = np.ones_like(x_sample, dtype=bool) v_s, v_r = y_sample[mask], y_ref_interp[mask] norm_sq = np.dot(v_r, v_r) k = np.dot(v_s, v_r) / norm_sq if norm_sq != 0 else 1.0 y_ref_scaled = y_ref_interp * k return y_sample - y_ref_scaled, k, y_ref_scaled
# =========================================================================== # ASTM E2412 AREA CALCULATIONS # =========================================================================== # All functions receive a DataFrame with columns 'cm-1' and 'A'. # Net area is computed by trapezoidal integration after subtracting a # two-point linear baseline constructed from local absorbance minima. # # Band definitions follow ASTM E2412 Table 1. # =========================================================================== def _net_area(df: pd.DataFrame, baseline_region_1: tuple[float, float], baseline_region_2: tuple[float, float], measurement_region: tuple[float, float]) -> float: """ Internal helper: compute the net absorbance area of a band. A straight-line baseline is drawn between the absorbance minima found in *baseline_region_1* and *baseline_region_2*. The net area is the integral of (absorbance − baseline) over *measurement_region*. Returns ------- float Net area (≥ 0). Returns 0.0 if any region is absent from *df*. """ def _min_point(region): lo, hi = region mask = (df['cm-1'] >= lo) & (df['cm-1'] <= hi) if mask.sum() == 0: return None, None idx = df.loc[mask, 'A'].idxmin() return df.loc[idx, 'cm-1'], df.loc[idx, 'A'] x1, y1 = _min_point(baseline_region_1) x2, y2 = _min_point(baseline_region_2) if x1 is None or x2 is None: return 0.0 m = (y2 - y1) / (x2 - x1) if (x2 - x1) != 0 else 0.0 b = y1 - m * x1 lo, hi = measurement_region zone = df.loc[(df['cm-1'] >= lo) & (df['cm-1'] <= hi)].copy() if zone.empty: return 0.0 net = zone['A'] - (m * zone['cm-1'] + b) area = float(-np.trapezoid(net, zone['cm-1'])) return max(0.0, area)
[docs] def area_oxidation(df: pd.DataFrame) -> float: """ Oxidation band area (1800–1670 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 550–650 cm⁻¹ and 1900–2200 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(550, 650), baseline_region_2=(1900, 2200), measurement_region=(1670, 1800))
[docs] def area_nitration(df: pd.DataFrame) -> float: """ Nitration band area (1600–1650 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 550–650 cm⁻¹ and 1900–2200 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(550, 650), baseline_region_2=(1900, 2200), measurement_region=(1600, 1650))
[docs] def area_sulfation(df: pd.DataFrame) -> float: """ Sulfation band area (1120–1180 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 550–650 cm⁻¹ and 1900–2200 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(550, 650), baseline_region_2=(1900, 2200), measurement_region=(1120, 1180))
[docs] def area_water(df: pd.DataFrame) -> float: """ Water contamination band area (3150–3500 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 1900–2200 cm⁻¹ and 3680–4000 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(1900, 2200), baseline_region_2=(3680, 4000), measurement_region=(3150, 3500))
[docs] def area_glycol(df: pd.DataFrame) -> float: """ Ethylene glycol (coolant) band area (1030–1100 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 1010–1030 cm⁻¹ and 1100–1130 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(1010, 1030), baseline_region_2=(1100, 1130), measurement_region=(1030, 1100))
[docs] def area_fuel_petrol(df: pd.DataFrame) -> float: """ Petrol (gasoline) fuel dilution band area (745–755 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 730–750 cm⁻¹ and 760–780 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(730, 750), baseline_region_2=(760, 780), measurement_region=(745, 755))
[docs] def area_fuel_diesel(df: pd.DataFrame) -> float: """ Diesel fuel dilution band area (805–815 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 795–805 cm⁻¹ and 825–835 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). Notes ----- The raw integrated area is returned. Convert to a concentration using a laboratory-specific calibration curve. """ return _net_area(df, baseline_region_1=(795, 805), baseline_region_2=(825, 835), measurement_region=(805, 815))
[docs] def area_antiwear_zddp(df: pd.DataFrame) -> float: """ ZDDP antiwear additive band area (960–1025 cm⁻¹) – ASTM E2412. Baseline anchored at the minima in 550–650 cm⁻¹ and 1900–2200 cm⁻¹. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Net integrated area (absorbance·cm⁻¹). """ return _net_area(df, baseline_region_1=(550, 650), baseline_region_2=(1900, 2200), measurement_region=(960, 1025))
[docs] def soot_load(df: pd.DataFrame) -> float: """ Soot (carbon black) load index – ASTM E2412. The absorbance at the carbon-black scattering reference point (2000 cm⁻¹) is multiplied by 100 to yield a dimensionless index. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- float Soot index (dimensionless, ≥ 0). """ if df.empty: return 0.0 idx_2000 = int(np.argmin(np.abs(df['cm-1'].values - 2000))) value = float(df.iloc[idx_2000]['A']) * 100 return max(0.0, value)
[docs] def astm_areas(df: pd.DataFrame) -> dict: """ Compute all standard ASTM E2412 band areas in a single call. Parameters ---------- df : pd.DataFrame Spectrum with columns ``cm-1`` and ``A``. Returns ------- dict Keys: ``oxidation``, ``nitration``, ``sulfation``, ``water``, ``glycol``, ``fuel_petrol``, ``fuel_diesel``, ``antiwear_zddp``, ``soot``. """ return { 'oxidation': area_oxidation(df), 'nitration': area_nitration(df), 'sulfation': area_sulfation(df), 'water': area_water(df), 'glycol': area_glycol(df), 'fuel_petrol': area_fuel_petrol(df), 'fuel_diesel': area_fuel_diesel(df), 'antiwear_zddp': area_antiwear_zddp(df), 'soot': soot_load(df), }
# =========================================================================== # CONCENTRATION ESTIMATION # ===========================================================================
[docs] def area_to_concentration(area: float, calibration_areas: np.ndarray, calibration_concentrations: np.ndarray, poly_order: int = 3) -> float: """ Convert a band area to a concentration using a polynomial calibration curve. Build and apply your own calibration curve by providing known (area, concentration) pairs measured on your instrument. Parameters ---------- area : float Measured band area to convert. calibration_areas : array-like Band areas of the calibration standards. calibration_concentrations : array-like Corresponding concentrations (same units as desired output). poly_order : int Degree of the fitting polynomial (default 3). Returns ------- float Estimated concentration (≥ 0). Example ------- >>> # Example: water content calibration >>> cal_areas = np.array([0.0, 70.0, 87.0, 120.0, 144.0]) >>> cal_conc = np.array([0.0, 2.0, 4.0, 5.0, 10.0]) # % v/v >>> conc = area_to_concentration(measured_area, cal_areas, cal_conc) """ cal_a = np.asarray(calibration_areas, dtype=float) cal_c = np.asarray(calibration_concentrations, dtype=float) coeff = np.polyfit(cal_a, cal_c, poly_order) poly = np.poly1d(coeff) return max(0.0, float(poly(area)))
# =========================================================================== # HIT QUALITY INDEX (HQI) LIBRARY SEARCH # ===========================================================================
[docs] def hqi(x_query: np.ndarray, y_query: np.ndarray, x_ref: np.ndarray, y_ref: np.ndarray) -> float: """ Compute the Hit Quality Index (HQI) between two spectra. HQI is defined as the squared cosine similarity (0–100 %): HQI = 100 × (A·B)² / (‖A‖² · ‖B‖²) The reference is interpolated onto the query's wavenumber axis using the overlapping region only. Parameters ---------- x_query, y_query : np.ndarray Query spectrum (wavenumber, absorbance). x_ref, y_ref : np.ndarray Reference spectrum. Returns ------- float HQI score in percent (0–100). """ x_q = x_query[np.argsort(x_query)] y_q = y_query[np.argsort(x_query)] x_r = x_ref[np.argsort(x_ref)] y_r = y_ref[np.argsort(x_ref)] lo = max(x_q[0], x_r[0]) hi = min(x_q[-1], x_r[-1]) if lo >= hi: return 0.0 mask = (x_q >= lo) & (x_q <= hi) y_q_c = y_q[mask] y_r_c = np.interp(x_q[mask], x_r, y_r) dot = np.dot(y_q_c, y_r_c) norm_q = np.dot(y_q_c, y_q_c) norm_r = np.dot(y_r_c, y_r_c) if norm_q == 0 or norm_r == 0: return 0.0 return float((dot ** 2) / (norm_q * norm_r) * 100)
def _hqi_worker(filepath: Path, x_query: np.ndarray, y_query: np.ndarray) -> dict: """Process a single candidate file for HQI comparison (worker function).""" try: ext = filepath.suffix.lower() if ext == '.sp': data, _ = load_spectrum(str(filepath), 'A') data = remove_baseline(data, 'A') x_c, y_c = data['cm-1'].values, data['A'].values else: with open(filepath) as fh: first = fh.readline() delim = ',' if ',' in first else None arr = np.loadtxt(filepath, delimiter=delim, comments='#') x_c, y_c = arr[:, 0], arr[:, 1] score = hqi(x_query, y_query, x_c, y_c) return {'file': str(filepath), 'hqi': score} except Exception as exc: return {'file': str(filepath), 'hqi': -1.0, 'error': str(exc)} # =========================================================================== # PLOTTING # ===========================================================================
[docs] def plot_spectra(data_dict: dict, title: str = "FTIR Spectroscopy", xlim: tuple = (4000, 450), ylim_A: tuple = (0, 0.4), ylim_T: tuple = (0, 1), save_path: str | None = None) -> tuple: """ Plot one or more FTIR spectra (absorbance and transmittance panels). Parameters ---------- data_dict : dict ``{'Label': {'df': df, 'color': 'black', 'alpha': 1.0, 'linewidth': 2}}`` Only ``'df'`` is required per entry. title : str Figure title. xlim : (float, float) Wavenumber axis limits (default: 4000–450 cm⁻¹). ylim_A : (float, float) Absorbance axis limits. ylim_T : (float, float) Transmittance axis limits. save_path : str or None If given, the figure is saved to this path (PNG/PDF/SVG). Returns ------- fig, axes : matplotlib Figure and Axes array """ fig, axes = plt.subplots(ncols=2, figsize=(18, 7), dpi=150) for label, cfg in data_dict.items(): df = cfg['df'] kw = dict( linewidth=cfg.get('linewidth', 1.5), label=label, color=cfg.get('color', None), alpha=cfg.get('alpha', 1.0), ) axes[0].plot(df['cm-1'], df['A'], **kw) if 'T' in df.columns: axes[1].plot(df['cm-1'], df['T'], **kw) fig.suptitle(title, fontsize=16, fontweight='bold') for ax in axes: ax.set_xlim(*xlim) ax.set_xlabel(r'Wavenumber [cm$^{-1}$]', fontsize=12) ax.legend(fontsize=11) ax.grid(True, linestyle='--', alpha=0.4) axes[0].set_ylim(*ylim_A) axes[0].set_ylabel('Absorbance', fontsize=12) axes[1].set_ylim(*ylim_T) axes[1].set_ylabel('Transmittance', fontsize=12) fig.tight_layout() if save_path: fig.savefig(save_path, dpi=200, bbox_inches='tight') plt.show() return fig, axes