#!python
#
#     Copyright (C) 2017–2026 Jose Manuel Martí Martínez
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU Affero General Public License as
#     published by the Free Software Foundation, either version 3 of the
#     License, or (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#     GNU Affero General Public License for more details.
#
#     You should have received a copy of the GNU Affero General Public License
#     along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
Launch tests for the whole Recentrifuge package.

Meaning of retest exit codes:
 Code   Explanation
 ----   ----------------------------------------------
    0   All tests passed
    1   Problem with required dependencies
    2   Failed the version check
    3   Failed the retaxdump test
    4   Failed the remock test
    5   Failed the recentrifuge test
    6   Failed the rextract test
    7   Failed the check of rextract output against standard values
    >7  Failed a multiple comparison between rcf test data and test standard
"""

import argparse
import collections as col
import gzip
import os
import re
import subprocess as sp
import sys
import time
from statistics import mean
from typing import Counter, List, Dict, Any, Tuple, Optional
from warnings import filterwarnings

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas import ExcelFile
from pandas.testing import assert_frame_equal

from recentrifuge import __version__, __author__, __date__
from recentrifuge import __path__  # type: ignore  # mypy issue #1422
from recentrifuge.config import Filename, Score, Extra, Sample
from recentrifuge.config import LICENSE, TAXDUMP_PATH
from recentrifuge.config import HTML_SUFFIX, XLSX_SUFFIX
from recentrifuge.config import REXTRACT_TEST_SAMPLE, REXTRACT_TEST_FASTQ
from recentrifuge.config import REXTRACT_TEST_TAXID, GZEXT
from recentrifuge.config import STATS_SHEET_NAME, TEST_OUTPUT_DIR, STR_CONTROL
from recentrifuge.config import MOCK_XLSX, MOCK_SHEET, KEY_SHEET, UNASSIGNED
from recentrifuge.config import gray, blue, magenta, green, red, yellow

# # BioPython
try:
    from Bio import SeqIO
except ImportError:
    print(red('ERROR!'), 'Please install BioPython for testing recentrifuge.')
    sys.exit(1)

# Local constants
MHL_DEFAULT = 35
MINTAXA_DEFAULT = 5
EPS = 1e-16  # Local epsilon for floating point comparisons
F_V = '-V'
F_TAX = '-n=' + TAXDUMP_PATH
PATH = os.path.dirname(os.path.realpath(__file__))
PATH_TEST: Filename = Filename(os.path.join(__path__[0], 'test/'))
F_FIL = '-f=' + TEST_OUTPUT_DIR
TEST_PREFIX = 'TEST'
OUT_PRE: Filename = Filename(TEST_PREFIX)
XLSX_FIL: Filename = Filename(TEST_PREFIX + XLSX_SUFFIX)
TEST_PDF_FILE: Filename = Filename(TEST_PREFIX + '.rcf.pdf')
ROC_CTRL_PDF_FILE: Filename = Filename('ROC_CTRL.rcf.pdf')
ROC_MINTAX_PDF_FILE: Filename = Filename('ROC_MINTAX.rcf.pdf')
# # General
F_C = '-c'
F_D = '-d'
F_T = '-t'
# # For rextract
F_CFG = '-f=' + os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_SAMPLE)
F_INC = '-i=' + REXTRACT_TEST_TAXID
F_FAQ = '-q=' + os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_FASTQ + GZEXT)
REXTRACTED_FASTQ_GZ = Filename(
    os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_FASTQ).rstrip('.fastq') +
    '_rxtr_incl' + REXTRACT_TEST_TAXID + '.fastq.gz'
)
# # For rcf
F_OUT = '-o='
F_CTR = '-c=3'
F_MHL = '-y='
F_RND = '-r='
F_MIN = '-m='
F_CTR_MIN = '-w='
F_PKL = '--pickle'
F_THR = '-T='
THREADS_DEFAULT = 0  # Legacy mode: use min(cpu_count, samples)
XLSX_PATH: Filename = Filename(os.path.join(TEST_OUTPUT_DIR, XLSX_FIL))
STND_PATH: Filename = Filename(os.path.join(PATH_TEST, XLSX_FIL))
MOCK_PATH: Filename = Filename(os.path.join(PATH_TEST, MOCK_XLSX))
SAMPLES: List[Sample] = [
    Sample(spl) for spl in ['ctrl1', 'ctrl2', 'ctrl3',
                            'smpl1', 'smpl2', 'smpl3', 'smpl4']
]
SAMPLEX: List[Sample] = SAMPLES + [Sample('smplH')]
STR_CTRL_SP = '_' + STR_CONTROL + '_species'
CTRL_SP: List[Sample] = [
    Sample(spl + STR_CTRL_SP) for spl in ['smpl1', 'smpl2', 'smpl3', 'smpl4']]
DEBUG_SP: Sample = Sample('smplH')


def main():
    """Main entry point to script."""

    def to_native(obj):
        """Convert numpy types to native Python types for cleaner printing"""
        if isinstance(obj, dict):
            return {k: to_native(v) for k, v in obj.items()}
        elif isinstance(obj, (list, tuple)):
            return type(obj)(to_native(v) for v in obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return obj

    def vprint(*arguments, **kargs) -> None:
        """Print only if verbose/debug mode is enabled"""
        if args.debug:
            print(*[to_native(arg) for arg in arguments], **kargs)

    def configure_parser():
        """Argument Parser Configuration"""
        parser = argparse.ArgumentParser(
            description='Launch Recentrifuge tests',
            epilog=f'%(prog)s  - Release {__version__} - {__date__}' + LICENSE,
            formatter_class=argparse.RawDescriptionHelpFormatter
        )
        parser.add_argument(
            '-d', '--debug',
            action='store_true',
            help='increase output verbosity and perform additional checks'
        )
        parser.add_argument(
            '-i', '--ignore',
            action='store_true',
            help='continue testing even if errors arise'
        )
        parser.add_argument(
            '-l', '--local',
            action='store_true',
            help='test local directory scripts instead of pip installed'
        )
        parser_additional = parser.add_argument_group(
            'additional', 'perform additional tests')
        parser_additional.add_argument(
            '-m', '--mintaxa',
            action='store_true',
            help='perform additional tests for mintaxa dependency'
        )
        parser_additional.add_argument(
            '-r', '--roc',
            action='store_true',
            help='perform additional tests and get ROC figures'
        )
        parser_advanced = parser.add_argument_group(
            'advanced', 'activate advanced or experimental features')
        parser_advanced.add_argument(
            '-s', '--skip',
            action='store_true',
            help='skip the recentrifuge calls and just load the results to '
                 'plot ROC figure by mintaxa (results should be available)'
        )
        parser_advanced.add_argument(
            '--no-strain',
            action='store_true',
            help='set this same flag in rcf to disable strain level'
        )
        parser.add_argument(
            '-y', '--minscore',
            action='store',
            metavar='NUMBER',
            type=lambda txt: Score(float(txt)),
            default=Score(MHL_DEFAULT),
            help=('minimum score/confidence of the classification of a read '
                  f'to pass the quality filter; {MHL_DEFAULT} by default')
        )
        parser.add_argument(
            '-V', '--version',
            action='version',
            version=f'%(prog)s release {__version__} ({__date__})'
        )
        return parser

    def check_debug_strain():
        """Check debugging and strain modes"""
        # Check for debugging mode
        if args.debug:
            print(blue('INFO:'), gray('Debugging mode activated'))
            print(blue('INFO:'), gray('Active parameters:'))
            for key, value in vars(args).items():
                if value:
                    print(gray(f'\t{key} ='), f'{value}')
            print(blue('INFO:'), gray('Relevant paths:'))
            for var in ['TAXDUMP_PATH', 'PATH', 'PATH_TEST',
                        'XLSX_PATH', 'STND_PATH', 'MOCK_PATH']:
                print(gray(f'\t{var} ='), f'{eval(var)}')  # Safe eval of const
        # Check for no-strain mode (strain is now default)
        if args.no_strain:
            print(gray('INFO:'), 'Strain level disabled in rcf')

    def test_version():
        """Test version matching"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> TESTING VERSION ... '), end='')
        sys.stdout.flush()
        cproc = sp.run([prog, F_V], stdout=sp.PIPE, stderr=sp.STDOUT,
                       universal_newlines=True)
        match = re.search(r'(\d+\.\d+\.\d+[a-z]*\d*)\s', cproc.stdout)
        if match is not None and match.group(1) == __version__:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if match is not None:
                print(blue('INFO:'),
                      f'Installed recentrifuge is v{match.group(1)} '
                      f'while these tests are for version {__version__}')
            if not args.ignore:
                sys.exit(2)

    def test_retaxdump():
        """Test retaxdump"""
        prog: Filename = Filename(prefix + 'retaxdump')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        sys.stdout.flush()
        cproc = sp.run([prog], universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(3)

    def test_remock():
        """Test remock"""
        prog: Filename = Filename(prefix + 'remock')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        argums = [prog, F_TAX, F_RND + str(minscore), F_T, F_D, F_C]
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(4)

    def test_rcf(out_prefix: Filename = OUT_PRE,
                 mintaxa: int | None = None):
        """Test recentrifuge"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> TESTING {prog} WITH MINTAXA {mintaxa} ...'))
        argums = [prog, F_TAX, F_FIL, F_PKL, F_PKL,  # '--pickle' twice
                  F_OUT + os.path.join(TEST_OUTPUT_DIR, out_prefix),
                  F_CTR, F_MHL + str(minscore), F_THR + str(THREADS_DEFAULT),
                  F_D]
        if mintaxa is not None:
            argums.append(F_MIN + str(mintaxa))
            argums.append(F_CTR_MIN + str(mintaxa))
        if args.no_strain:
            argums.append('--no-strain')
        else:
            pass  # Strain is now default, so we don't need to add anything
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(5)

    def test_rextract():
        """Test rextract"""
        prog: Filename = Filename(prefix + 'rextract')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        argums = [prog, F_CFG, F_FAQ, F_INC, F_D, F_C]
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(6)

    def load_excels(just_excel: Filename | None = None):
        """Load excels from standard and recentrifuge results"""
        nonlocal pd_xlsx, pd_stnd, pd_mock  # type: ignore  # mypy issue #7057
        if just_excel is None:
            print(magenta(f'>>> LOADING TEST RESULTS ... '), end='')
        try:
            if just_excel is None:
                pd_xlsx = pd.ExcelFile(XLSX_PATH,
                                       engine='openpyxl',
                                       )
            else:
                pd_xlsx = pd.ExcelFile(just_excel,
                                       engine='openpyxl',
                                       )
        except OSError:
            if just_excel is None:
                print(red('FAILED!\nERROR!'), 'Results for test not found!')
            else:
                print(red('FAILED!\nERROR!'), f'Problem with {just_excel}.')
                if args.skip:
                    print(blue('HINT:'),
                          'You probably want to rerun without the skip flag')
            raise
        else:
            if just_excel is None:
                print(green('OK!'))
        if just_excel is None:
            print(magenta(f'>>> LOADING STANDARD RESULTS ... '), end='')
            try:
                pd_stnd = pd.ExcelFile(STND_PATH,
                                       engine='openpyxl',
                                       )
            except OSError:
                print(red('FAILED!\nERROR!'), 'Missing standard test results!')
                raise
            else:
                print(green('OK!'))
            print(magenta(f'>>> LOADING MOCK DATA ... '), end='')
            try:
                pd_mock = pd.ExcelFile(MOCK_PATH,
                                       engine='openpyxl',
                                       )
            except OSError:
                print(red('FAILED!\nERROR!'), 'Missing mock xlsx file!')
                raise
            else:
                print(green('OK!'))

    def test_rextract_results():
        """Test rextract results against standard values"""
        prog: Filename = Filename(prefix + 'rextract')
        print(magenta(f'\n>>> CHECKING {prog} RESULTS AGAINST STANDARD ... '),
              end='', flush=True)

        assert pd_mock is not None, "pd_mock must be loaded before calling test_rextract_results"
        df_mock = pd_mock.parse(
            sheet_name=MOCK_SHEET, header=[0],
            index_col=1, skipfooter=1,
            engine='openpyxl',
        )
        cnt_mck: int
        try:
            cnt_mck = df_mock[REXTRACT_TEST_SAMPLE.rstrip('.out')][
                int(REXTRACT_TEST_TAXID)]
        except KeyError:
            cnt_mck = 0

        with gzip.open(REXTRACTED_FASTQ_GZ, 'rt') as fgz:
            record_iterator = SeqIO.parse(fgz, "quickfastq")
            num_seqs: int = len(list(record_iterator))
        if num_seqs == cnt_mck:
            print(blue(f'{cnt_mck}'), 'fastq seqs', green('OK!'))
        else:
            print(red('FAILED!'))
            print('Difference between test results and standard values:')
            print(f'Expected {cnt_mck} FASTQ seqs and got {num_seqs} instead.')
            if not args.ignore:
                sys.exit(7)

    def analyze_robust_contam_removal():
        """Analyze recentrifuge results for robust contamination removal"""
        print(magenta(f'\n>>> ANALYZING ROBUST CONTAMINATION REMOVAL ... '),
              end='')

        # Figure initialization
        filterwarnings("ignore", category=UserWarning, module="matplotlib")
        fig, ax = plt.subplots(
            nrows=2, ncols=1, squeeze=False, sharey='col', sharex='col',
            figsize=(20, 12)
        )
        fig.tight_layout(rect=(0.0, 0.0, 1.0, 1.0), w_pad=0, h_pad=0.1)
        ax_raw = ax[0, 0]
        ax_ctr = ax[1, 0]

        # Load data sheets as pandas DataFrames
        assert pd_stnd is not None, "pd_stnd must be loaded before calling analyze_robust_contam_removal"
        assert pd_xlsx is not None, "pd_xlsx must be loaded before calling analyze_robust_contam_removal"
        assert pd_mock is not None, "pd_mock must be loaded before calling analyze_robust_contam_removal"
        df_stnd = pd_stnd.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',

        )
        df_xlsx = pd_xlsx.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        try:
            df_xlsx.columns = df_stnd.columns  # Normalize columns name
        except ValueError as e:
            print(red('FAILED!'))
            print(e)
            print(blue(f'Columns of sheet {Extra.FULL} in test are:'))
            print(df_xlsx.columns)
            print(blue(f'... but columns of {Extra.FULL} in standard are:'))
            print(df_stnd.columns)
            if not args.ignore:
                sys.exit(8)                
        df_key = pd_mock.parse(
            sheet_name=KEY_SHEET, header=[0],
            index_col=0, dtype={'NAME': str, 'COLOR': str, 'HATCH': str,
                                'NATIVES': str, 'RANK': str, 'HIST': bool},
            engine='openpyxl',
        )

        # Data initialization
        numtaxa = len(df_key['HIST'][df_key['HIST']])
        n = len(SAMPLES)
        ind = np.arange(n)  # the x locations for all the samples
        width = 0.90 / numtaxa  # the width of the bars regarding space
        ymax = 1e+5
        ymin = 1e+0

        # Generate bar plot
        rects: List[Any] = []
        for num, (taxid, taxon) in enumerate(df_key.iterrows()):
            if not taxon['HIST']:
                break
            taxid_int = int(str(taxid))
            # Convert empty/whitespace/NaN hatch values to None for matplotlib compatibility
            # Note: Excel data has quotes around values, so empty hatch is '""' not ''
            hatch_raw = taxon["HATCH"]
            hatch_str = str(hatch_raw) if hatch_raw is not None else ''
            # Strip quotes and whitespace, check if result is empty
            hatch_stripped = hatch_str.strip().strip('"').strip("'")
            if pd.isna(hatch_raw) or len(hatch_stripped) == 0:
                hatch_val = None
            else:
                hatch_val = hatch_stripped
            rects.append(
                ax_raw.bar(
                    ind + num * width,
                    [df_xlsx[s][UNASSIGNED][taxid_int] for s in SAMPLES],
                    width, color=[taxon["COLOR"]], hatch=hatch_val,
                    alpha=0.5, label=taxon["NAME"])
            )
            ax_ctr.bar(ind + num * width,
                       [0.1, 0.1, 0.1] +
                       [df_xlsx[s][UNASSIGNED][taxid_int] for s in CTRL_SP],
                       width, color=[taxon["COLOR"]], hatch=hatch_val,
                       alpha=0.5, label=taxon["NAME"])
        # Add mintaxa threshold
        rects.append(ax_raw.axhline(y=MINTAXA_DEFAULT, color='r', lw=1))
        rects.append(ax_ctr.axhline(y=MINTAXA_DEFAULT, color='r', lw=1))
        # Set scale, axis lims, ticks, and grid
        ax_raw.set_yscale("log", nonpositive='clip')
        ax_raw.set_ylim(top=ymax, bottom=ymin)
        xmin = ind[0] - 0.5 + width * (numtaxa - 1) / 2
        xmax = ind[-1] + 0.5 + width * (numtaxa - 1) / 2
        ax_raw.set_xlim(left=xmin, right=xmax)
        ax_raw.set_xticks(ind + width * (numtaxa - 1) / 2)
        ax_raw.set_xticks(ind[:-1] + 0.5 +
                          width * (numtaxa - 1) / 2, minor=True)
        ax_raw.xaxis.grid(True, which='minor', linewidth=5, color='k')
        ax_raw.yaxis.grid(True, alpha=0.5)
        ax_ctr.xaxis.grid(True, which='minor', linewidth=5, color='k')
        ax_ctr.yaxis.grid(True, alpha=0.5)
        ax_ctr.set_xticklabels(SAMPLES, fontdict={'fontsize': 'xx-large'})
        ax_ctr.legend(rects,
                      [pair[1]['NAME'] for pair in df_key.iterrows()
                       if pair[1]['HIST']] +
                      [r'$\mathtt{mintaxa}$'],
                      loc='best', framealpha=0.9,
                      prop={'family': 'sans-serif', 'size': 'medium'})

        # Add inner titles
        ax_raw.text(
            ind[n - 2] + width * (numtaxa - 1) / 2, ymax / 1.3,
            'Species or below in raw samples', size=20,
            ha="center", va="center",
            bbox=dict(
                boxstyle="round", ec=(1., 0.4, 0.4),
                fc=(1., 0.7, 0.7)
            )
        )
        ax_ctr.text(
            ind[n - 2] + width * (numtaxa - 1) / 2, ymax / 1.3,
            'Species after robust contamination removal', size=20,
            ha="center", va="center",
            bbox=dict(
                boxstyle="round", ec=(0.2, 0.8, 0.2),
                fc=(0.5, 0.8, 0.5)
            )
        )

        # Label negative controls
        ax_raw.fill_between([xmin, ind[2] + 0.5 + width * (numtaxa - 1) / 2],
                            ymax, color='c', alpha=0.1)
        ax_raw.text(
            ind[2] + 0.4 + width * (numtaxa - 1) / 2, ymax / 10,
            'negative controls', ha="center", va="center",
            rotation=-90, fontsize='x-large', color='b'
        )
        ax_ctr.fill_between([xmin, ind[2] + 0.5 + width * (numtaxa - 1) / 2],
                            ymax, color='c', alpha=0.1)
        ax_ctr.text(
            ind[2] + 0.4 + width * (numtaxa - 1) / 2, ymax / 10,
            'negative controls', ha="center", va="center",
            rotation=-90, fontsize='x-large', color='b'
        )

        # Save and show figure
        fig.savefig(os.path.join(TEST_OUTPUT_DIR, TEST_PDF_FILE),
                    bbox_inches='tight')
        try:
            fig.show()
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to show plot.')
        try:
            plt.close(fig)
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to close figure.')
        else:
            print(green('OK!'))

    def get_roc(plot_roc: bool = True, silent: bool = False
                ) -> Tuple[Dict[Sample, float], Dict[Sample, float]]:
        """Get ROC parameters and plot ROC"""
        if not silent:
            print(magenta(f'\n>>> GET ROC ... '))
            print(magenta(f'>> GET PARAMETERS... '), end='', flush=True)

        # Load data sheets as pandas DataFrames
        assert pd_stnd is not None, "pd_stnd must be loaded before calling get_roc"
        assert pd_xlsx is not None, "pd_xlsx must be loaded before calling get_roc"
        assert pd_mock is not None, "pd_mock must be loaded before calling get_roc"
        df_stnd = pd_stnd.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        df_xlsx = pd_xlsx.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        df_mock = pd_mock.parse(
            sheet_name=MOCK_SHEET, header=[0],
            index_col=1, skipfooter=1,
            engine='openpyxl',
        )
        del df_mock['RECENTRIFUGE MOCK']
        df_key = pd_mock.parse(
            sheet_name=KEY_SHEET, header=[0],
            index_col=0, dtype={'NAME': str, 'COLOR': str, 'HATCH': str,
                                'NATIVES': str, 'RANK': str, 'HIST': bool},
            engine='openpyxl',
        )

        # Normalize column names to the standard names (without directory)
        stnd_samples = df_stnd.columns.get_level_values(0)
        rename_cols = {TEST_OUTPUT_DIR + spl: spl for spl in stnd_samples}
        df_xlsx.rename(columns=rename_cols, inplace=True, level=0)

        # Get data
        positives: Counter[Sample] = col.Counter()
        negatives: Counter[Sample] = col.Counter()
        true_pos_raw: Counter[Sample] = col.Counter()
        fake_pos_raw: Counter[Sample] = col.Counter()
        true_pos_ctr: Counter[Sample] = col.Counter()
        fake_pos_ctr: Counter[Sample] = col.Counter()
        cnt_mck: int
        cnt_raw: int
        cnt_ctr: int
        for taxid, taxon in df_key.iterrows():
            if 'species' not in taxon['RANK']:
                continue
            taxid_int = int(str(taxid))
            for spl in SAMPLEX[3:]:
                try:
                    cnt_mck = df_mock[spl][taxid_int]
                except KeyError:
                    cnt_mck = 0
                try:
                    cnt_raw = df_xlsx[spl][UNASSIGNED][taxid_int]
                except KeyError:
                    cnt_raw = 0
                try:
                    cnt_ctr = df_xlsx[spl
                                      + STR_CTRL_SP][UNASSIGNED][taxid_int]
                except KeyError:
                    cnt_ctr = 0
                if spl in taxon['NATIVES']:
                    positives[spl] += cnt_mck
                    true_pos_raw[spl] += cnt_raw
                    true_pos_ctr[spl] += cnt_ctr
                else:
                    negatives[spl] += cnt_mck
                    fake_pos_raw[spl] += cnt_raw
                    fake_pos_ctr[spl] += cnt_ctr
        # Get TPR and FPR
        tpr_raw: Dict[Sample, float] = {}
        fpr_raw: Dict[Sample, float] = {}
        tpr_ctr: Dict[Sample, float] = {}
        fpr_ctr: Dict[Sample, float] = {}
        segs: List[List[Tuple[float, float]]] = []
        for spl in SAMPLEX[3:]:
            # For raw samples
            if true_pos_raw[spl] > positives[spl]:
                fake_pos_raw[spl] += (true_pos_raw[spl] - positives[spl])
                true_pos_raw[spl] = positives[spl]
            tpr_raw[spl] = true_pos_raw[spl] / positives[spl]
            fpr_raw[spl] = fake_pos_raw[spl] / negatives[spl]
            # For CTRL samples
            if true_pos_ctr[spl] > positives[spl]:
                fake_pos_ctr[spl] += (true_pos_ctr[spl] - positives[spl])
                true_pos_ctr[spl] = positives[spl]
            tpr_ctr[spl] = true_pos_ctr[spl] / positives[spl]
            fpr_ctr[spl] = fake_pos_ctr[spl] / negatives[spl]
            segs.append([(fpr_raw[spl], tpr_raw[spl]),
                         (fpr_ctr[spl], tpr_ctr[spl])])
        if not silent:
            print(green('OK!'), flush=True)
        vprint('TPR (raw samples) =', tpr_raw)
        vprint('FPR (raw samples) =', fpr_raw)
        vprint('TPR (CTRL samples) =', tpr_ctr)
        vprint('FPR (CTRL samples) =', fpr_ctr)
        vprint(f'Coordinates (for {DEBUG_SP}) =',
               segs[SAMPLEX.index(DEBUG_SP)-3])  # 3 = num of ctrl samples
        vprint(f'P (for {DEBUG_SP}) =', positives[DEBUG_SP])
        vprint(f'N (for {DEBUG_SP}) =', negatives[DEBUG_SP])
        vprint('TP (...raw) =', true_pos_raw[DEBUG_SP])
        vprint('FP (...raw) =', fake_pos_raw[DEBUG_SP])
        vprint('TP (...CTRL) =', true_pos_ctr[DEBUG_SP])
        vprint('FP (...CTRL) =', fake_pos_ctr[DEBUG_SP])

        if plot_roc:
            if not silent:
                print(magenta(f'>> PLOT ROC... '), end='', flush=True)
            # Figure initialization
            filterwarnings("ignore", category=UserWarning, module="matplotlib")
            fig, ax = plt.subplots(figsize=(6, 6))
            ax.set_xlim(-0.02, 1.02)
            ax.set_ylim(-0.02, 1.02)
            ax.set_aspect('equal')
            ax.set_xlabel('FPR | (1 - specificity)')
            ax.set_ylabel('TPR | Sensitivity | Recall')
            ax.set_title('Evolution from raw samples to CTRL_species samples')

            # Generate arrow plot
            arrows: List[Any] = []
            for spl, color in zip(SAMPLEX[3:], ['r', 'g', 'b', 'm', 'c']):
                arrows.append(
                    ax.arrow(fpr_raw[spl], tpr_raw[spl],
                             fpr_ctr[spl] - fpr_raw[spl],
                             tpr_ctr[spl] - tpr_raw[spl],
                             shape='full', lw=1, length_includes_head=True,
                             head_width=0.02, head_length=0.01, color=color)
                )
            arrows.append(
                ax.arrow(1.2, 1.2, -1.4, -1.4, lw=1, linestyle=':',
                         head_width=0, head_length=0, color='0.8', alpha=0.5))
            ax.legend(arrows,
                      SAMPLEX[3:] + ['ROC plot diagonal'], loc='lower right',
                      prop={'family': 'sans-serif', 'size': 'medium'},
                      framealpha=0.9)
            # Save and show figure
            fig.savefig(os.path.join(TEST_OUTPUT_DIR, ROC_CTRL_PDF_FILE),
                        bbox_inches='tight')
            try:
                fig.show()
            except KeyboardInterrupt:
                print(gray(' User'), yellow('interrupted!'))
                raise
            except Exception:
                print(yellow('WARNING!'), 'Unable to show plot.')
            try:
                plt.close(fig)
            except KeyboardInterrupt:
                print(gray(' User'), yellow('interrupted!'))
                raise
            except Exception:
                print(yellow('WARNING!'), 'Unable to close figure.')
            else:
                if not silent:
                    print(green('OK!'), flush=True)

        # Return ROC values
        return tpr_ctr, fpr_ctr

    def get_mintaxa_roc():
        """Get ROC parameters function of mintaxa and plot ROC"""
        print(magenta(f'\n>>> GET MINTAXA ROC ... '), end='', flush=True)
        tpr_mintax: Dict[int, Dict[Sample, float]] = {}
        fpr_mintax: Dict[int, Dict[Sample, float]] = {}
        vals = list(range(11)) + [
            12, 15, 25, 50, 100, 150, 200, 250, 400, 500, 800, 1000, 1500,
            2000, 2500, 4000, 5000, 8000, 10000, 15000, 20000, 40000, 80000]
        html_name = [Filename(TEST_PREFIX + '_mintaxa' + str(v) + HTML_SUFFIX)
                     for v in vals]
        for mintax, html in zip(vals, html_name):
            print(magenta(f' {mintax},'), end='', flush=True)
            if not args.skip:
                test_rcf(out_prefix=html, mintaxa=mintax)
            xlsx_test_filename: Filename = Filename(
                TEST_PREFIX + '_mintaxa' + str(mintax) + XLSX_SUFFIX)
            load_excels(just_excel=Filename(
                os.path.join(TEST_OUTPUT_DIR, xlsx_test_filename)))
            (tpr_ctr, fpr_ctr) = get_roc(plot_roc=False, silent=True)
            tpr_mintax[mintax] = tpr_ctr
            fpr_mintax[mintax] = fpr_ctr
        vprint('\t All TPR =', tpr_mintax)
        vprint('\t All FPR =', fpr_mintax)

        # Figure initialization
        filterwarnings("ignore", category=UserWarning, module="matplotlib")
        fig, ax = plt.subplots(figsize=(6, 6))
        ax.set_xlim(-0.02, 1.02)
        ax.set_ylim(-0.02, 1.02)
        ax.set_aspect('equal')
        ax.set_xlabel('FPR | (1 - specificity)')
        ax.set_ylabel('TPR | Sensitivity | Recall')
        ax.set_title('Evolution of ROC with mintaxa variation')
        rocs: List[Any] = []
        for spl, color in zip(SAMPLEX[3:], ['r', 'g', 'b', 'm', 'c']):
            x_fpr = [fpr_mintax[mintax][spl] for mintax in vals]
            y_tpr = [tpr_mintax[mintax][spl] for mintax in vals]
            rocs.append(  # Append element 0 of axis.plot
                ax.plot(x_fpr, y_tpr, color=color, linestyle='-', marker='',
                        linewidth=2, label=spl)[0])
        x_fpr_avg = [mean(fpr_mintax[mintax].values()) for mintax in vals]
        y_tpr_avg = [mean(tpr_mintax[mintax].values()) for mintax in vals]
        rocs.append(  # Append element 0 of axis.plot
            ax.plot(x_fpr_avg, y_tpr_avg, color='orange',
                    linestyle=':', marker='', linewidth=3, label='MEAN')[0])
        rocs.append(
            ax.arrow(1.2, 1.2, -1.4, -1.4, lw=1, linestyle=':',
                     head_width=0, head_length=0, color='0.8',
                     alpha=0.5, label='ROC plot diagonal'))
        ax.legend(rocs, SAMPLEX[3:] + ['Mean of samples', 'ROC plot diagonal'],
                  prop={'family': 'sans-serif', 'size': 'medium'},
                  framealpha=0.9, loc='lower right')
        # Annotate ROC by mintaxa
        vals_show = [0, 2, 3, 4, 5, 15, 25, 200]
        for num, mintax in enumerate(vals_show):
            x_pos = mean(fpr_mintax[mintax].values())
            y_pos = mean(tpr_mintax[mintax].values())
            ax.annotate(str(mintax), xy=(x_pos, y_pos),
                        xytext=(x_pos+0.4+0.01*num,
                                y_pos-0.06*(len(vals_show)-num/2)),
                        ha="center", va="center", color='0.2',
                        fontsize='small', fontfamily='monospace',
                        arrowprops=dict(color='0.2', shrink=0.02,
                                        width=0, headwidth=2,
                                        linestyle='-', linewidth=0.1)
                        )
        vals_show = [500, 1000, 2000, 4000, 5000, 8000, 20000, 40000]
        for num, mintax in enumerate(vals_show):
            x_pos = mean(fpr_mintax[mintax].values())
            y_pos = mean(tpr_mintax[mintax].values())
            ax.annotate(str(mintax), xy=(x_pos, y_pos),
                        xytext=(x_pos+0.12+0.04*(len(vals_show)-num),
                                y_pos), ha="center", va="center", color='0.2',
                        fontsize='small', fontfamily='monospace',
                        arrowprops=dict(color='0.2', shrink=0.02,
                                        width=0, headwidth=2,
                                        linestyle='-', linewidth=0.1)
                        )

    # Save and show figure
        fig.savefig(os.path.join(TEST_OUTPUT_DIR, ROC_MINTAX_PDF_FILE),
                    bbox_inches='tight')
        try:
            fig.show()
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to show plot.')
        try:
            plt.close(fig)
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to close figure.')
        else:
            print(green(' OK!'), flush=True)

    def test_rcf_results():
        """Test recentrifuge results against standard"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> COMPARING {prog} RESULTS WITH STANDARD:'))

        assert pd_stnd is not None, "pd_stnd must be loaded before calling test_rcf_results"
        assert pd_xlsx is not None, "pd_xlsx must be loaded before calling test_rcf_results"
        for sheet in [STATS_SHEET_NAME, str(Extra.FULL)]:
            print(magenta(f'>> TEST FOR {sheet}... '), end='')
            df_stnd = pd_stnd.parse(
                sheet_name=sheet, header=[0, 1],
                skiprows=None, index_col=0,
                engine='openpyxl',
            )
            df_xlsx = pd_xlsx.parse(
                sheet_name=sheet, header=[0, 1],
                skiprows=None, index_col=0,
                engine='openpyxl',
            )
            try:
                df_xlsx.columns = df_stnd.columns  # Normalize columns name
            except ValueError as e:
                print(red('FAILED!'))
                print(e)
                print(blue('Columns of sheet {sheet} in test are:'))
                print(df_xlsx.columns)
                print(blue('... but columns of {sheet} in standard are:'))
                print(df_stnd.columns)
                if not args.ignore:
                    sys.exit(8)                
            try:
                assert_frame_equal(df_xlsx, df_stnd, check_exact=False)
            except AssertionError as e:
                print(red('FAILED!'))
                print(e)
                df_mask = df_xlsx.ne(df_stnd)
                print(blue('Test results are:'))
                df_drop = df_xlsx[df_mask].dropna(axis=0, how='all')
                df_drop.dropna(axis=1, how='all', inplace=True)
                print(df_drop)
                print(blue('... but standard results are:'))
                df_drop = df_stnd[df_mask].dropna(axis=0, how='all')
                df_drop.dropna(axis=1, how='all', inplace=True)
                print(df_drop)
                if not args.ignore:
                    sys.exit(9)
            else:
                print(green('OK!'))

    # Timing initialization
    start_time: float = time.time()

    # Program header
    print(f'\n=-= {sys.argv[0]} =-= v{__version__} - {__date__}'
          f' =-= by {__author__} =-=\n')
    sys.stdout.flush()

    # Parse arguments
    argparser = configure_parser()
    args = argparser.parse_args()
    minscore: Score = args.minscore
    prefix: Filename = Filename('')
    if args.local:
        prefix = Filename('./')

    check_debug_strain()
    test_version()
    test_retaxdump()
    test_remock()
    test_rcf()
    test_rextract()

    pd_xlsx: Optional[ExcelFile] = None
    pd_stnd: Optional[ExcelFile] = None
    pd_mock: Optional[ExcelFile] = None

    load_excels()
    test_rextract_results()
    if args.no_strain:
        print(blue('INFO:'),
              gray('Skipping further tests due to the --no-strain option!'))
    else:
        analyze_robust_contam_removal()
        if args.roc:
            get_roc(plot_roc=True)
        if args.mintaxa:
            get_mintaxa_roc()

        load_excels()
        test_rcf_results()

    # Timing results
    print(gray('Total test time:'), time.strftime(
        "%H:%M:%S", time.gmtime(time.time() - start_time)))


if __name__ == '__main__':
    main()
