#!/usr/bin/env python3
#
#     Copyright (C) 2017–2024 Jose Manuel Martí Martínez
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU Affero General Public License as
#     published by the Free Software Foundation, either version 3 of the
#     License, or (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#     GNU Affero General Public License for more details.
#
#     You should have received a copy of the GNU Affero General Public License
#     along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
Launch tests for the whole Recentrifuge package.

Meaning of retest exit codes:
 Code   Explanation
 ----   ----------------------------------------------
    0   All tests passed
    1   Problem with required dependencies
    2   Failed the version check
    3   Failed the retaxdump test
    4   Failed the remock test
    5   Failed the recentrifuge test
    6   Failed the rextract test
    7   Failed the check of rextract output against standard values
    >7  Failed a multiple comparison between rcf test data and test standard
"""

import argparse
import collections as col
import gzip
import os
import re
import subprocess as sp
import sys
import time
from distutils.version import StrictVersion
from statistics import mean
from typing import Counter, List, Dict, Any, Tuple
from warnings import filterwarnings

import matplotlib.pyplot as plt
import numpy as np

from recentrifuge import __version__, __author__, __date__
from recentrifuge import __path__  # type: ignore  # mypy issue #1422
from recentrifuge.config import Filename, Score, Extra, Sample
from recentrifuge.config import LICENSE, TAXDUMP_PATH
from recentrifuge.config import HTML_SUFFIX, XLSX_SUFFIX
from recentrifuge.config import REXTRACT_TEST_SAMPLE, REXTRACT_TEST_FASTQ
from recentrifuge.config import REXTRACT_TEST_TAXID, GZEXT
from recentrifuge.config import STATS_SHEET_NAME, TEST_OUTPUT_DIR, STR_CONTROL
from recentrifuge.config import MOCK_XLSX, MOCK_SHEET, KEY_SHEET, UNASSIGNED
from recentrifuge.config import gray, blue, magenta, green, red, yellow

# Check for general optional dependencies but required for retest
# # pandas (to read Excel with mock layout)
try:
    import pandas as pd

    if StrictVersion(pd.__version__) < StrictVersion('0.23.2'):
        print(red('ERROR!'), 'Please upgrade pandas to v0.23.2 or higher for '
                             'testing recentrifuge.')
        sys.exit(1)
except ImportError:
    pd = None
    print(red('ERROR!'), 'Please install pandas (v0.23.2 or higher) for '
                         'testing recentrifuge.')
    sys.exit(1)
else:
    from pandas.testing import assert_frame_equal

# # BioPython
try:
    from Bio import SeqIO
except ImportError:
    print(red('ERROR!'), 'Please install BioPython for testing recentrifuge.')
    sys.exit(1)

# Local constants
MHL_DEFAULT = 35
MINTAXA_DEFAULT = 5
EPS = 1e-16  # Local epsilon for floating point comparisons
F_V = '-V'
F_TAX = '-n=' + TAXDUMP_PATH
PATH = os.path.dirname(os.path.realpath(__file__))
PATH_TEST: Filename = Filename(os.path.join(__path__[0], 'test/'))
F_FIL = '-f=' + TEST_OUTPUT_DIR
TEST_PREFIX = 'TEST'
OUT_PRE: Filename = Filename(TEST_PREFIX)
XLSX_FIL: Filename = Filename(TEST_PREFIX + XLSX_SUFFIX)
TEST_PDF_FILE: Filename = Filename(TEST_PREFIX + '.rcf.pdf')
ROC_CTRL_PDF_FILE: Filename = Filename('ROC_CTRL.rcf.pdf')
ROC_MINTAX_PDF_FILE: Filename = Filename('ROC_MINTAX.rcf.pdf')
# # General
F_C = '-c'
F_D = '-d'
F_T = '-t'
# # For rextract
F_CFG = '-f=' + os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_SAMPLE)
F_INC = '-i=' + REXTRACT_TEST_TAXID
F_FAQ = '-q=' + os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_FASTQ + GZEXT)
REXTRACTED_FASTQ_GZ = Filename(
    os.path.join(TEST_OUTPUT_DIR, REXTRACT_TEST_FASTQ).rstrip('.fastq') +
    '_rxtr_incl' + REXTRACT_TEST_TAXID + '.fastq.gz'
)
# # For rcf
F_OUT = '-o='
F_CTR = '-c=3'
F_MHL = '-y='
F_RND = '-r='
F_MIN = '-m='
F_CTR_MIN = '-w='
F_PKL = '--pickle'
XLSX_PATH: Filename = Filename(os.path.join(TEST_OUTPUT_DIR, XLSX_FIL))
STND_PATH: Filename = Filename(os.path.join(PATH_TEST, XLSX_FIL))
MOCK_PATH: Filename = Filename(os.path.join(PATH_TEST, MOCK_XLSX))
SAMPLES: List[Sample] = [
    Sample(spl) for spl in ['ctrl1', 'ctrl2', 'ctrl3',
                            'smpl1', 'smpl2', 'smpl3', 'smpl4']
]
SAMPLEX: List[Sample] = SAMPLES + [Sample('smplH')]
STR_CTRL_SP = '_' + STR_CONTROL + '_species'
CTRL_SP: List[Sample] = [
    Sample(spl + STR_CTRL_SP) for spl in ['smpl1', 'smpl2', 'smpl3', 'smpl4']]
DEBUG_SP: Sample = Sample('smplH')


def main():
    """Main entry point to script."""

    def vprint(*arguments, **kargs) -> None:
        """Print only if verbose/debug mode is enabled"""
        if args.debug:
            print(*arguments, **kargs)

    def configure_parser():
        """Argument Parser Configuration"""
        parser = argparse.ArgumentParser(
            description='Launch Recentrifuge tests',
            epilog=f'%(prog)s  - Release {__version__} - {__date__}' + LICENSE,
            formatter_class=argparse.RawDescriptionHelpFormatter
        )
        parser.add_argument(
            '-d', '--debug',
            action='store_true',
            help='increase output verbosity and perform additional checks'
        )
        parser.add_argument(
            '-i', '--ignore',
            action='store_true',
            help='continue testing even if errors arise'
        )
        parser.add_argument(
            '-l', '--local',
            action='store_true',
            help='test local directory scripts instead of pip installed'
        )
        parser_additional = parser.add_argument_group(
            'additional', 'perform additional tests')
        parser_additional.add_argument(
            '-m', '--mintaxa',
            action='store_true',
            help='perform additional tests for mintaxa dependency'
        )
        parser_additional.add_argument(
            '-r', '--roc',
            action='store_true',
            help='perform additional tests and get ROC figures'
        )
        parser_advanced = parser.add_argument_group(
            'advanced', 'activate advanced or experimental features')
        parser_advanced.add_argument(
            '-s', '--skip',
            action='store_true',
            help='skip the recentrifuge calls and just load the results to '
                 'plot ROC figure by mintaxa (results should be available)'
        )
        parser_advanced.add_argument(
            '--strain',
            action='store_true',
            help='set this same flag in rcf [experimental feature]'
        )
        parser.add_argument(
            '-y', '--minscore',
            action='store',
            metavar='NUMBER',
            type=lambda txt: Score(float(txt)),
            default=Score(MHL_DEFAULT),
            help=('minimum score/confidence of the classification of a read '
                  f'to pass the quality filter; {MHL_DEFAULT} by default')
        )
        parser.add_argument(
            '-V', '--version',
            action='version',
            version=f'%(prog)s release {__version__} ({__date__})'
        )
        return parser

    def check_debug_strain():
        """Check debugging and strain modes"""
        # Check for debugging mode
        if args.debug:
            print(blue('INFO:'), gray('Debugging mode activated'))
            print(blue('INFO:'), gray('Active parameters:'))
            for key, value in vars(args).items():
                if value:
                    print(gray(f'\t{key} ='), f'{value}')
            print(blue('INFO:'), gray('Relevant paths:'))
            for var in ['TAXDUMP_PATH', 'PATH', 'PATH_TEST',
                        'XLSX_PATH', 'STND_PATH', 'MOCK_PATH']:
                print(gray(f'\t{var} ='), f'{eval(var)}')  # Safe eval of const
        # Check for strain experimental mode
        if args.strain:
            print(yellow('CAUTION!'), '--strain experimental mode activated!')

    def test_version():
        """Test version matching"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> TESTING VERSION ... '), end='')
        sys.stdout.flush()
        cproc = sp.run([prog, F_V], stdout=sp.PIPE, stderr=sp.STDOUT,
                       universal_newlines=True)
        match = re.search(r'(\d+\.\d+\.\d+[a-z]*\d*)\s', cproc.stdout)
        if match is not None and match.group(1) == __version__:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if match is not None:
                print(blue('INFO:'),
                      f'Installed recentrifuge is v{match.group(1)} '
                      f'while these tests are for version {__version__}')
            if not args.ignore:
                sys.exit(2)

    def test_retaxdump():
        """Test retaxdump"""
        prog: Filename = Filename(prefix + 'retaxdump')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        sys.stdout.flush()
        cproc = sp.run([prog], universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(3)

    def test_remock():
        """Test remock"""
        prog: Filename = Filename(prefix + 'remock')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        argums = [prog, F_TAX, F_RND + str(minscore), F_T, F_D, F_C]
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(4)

    def test_rcf(out_prefix: Filename = OUT_PRE,
                 mintaxa: int = None):
        """Test recentrifuge"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> TESTING {prog} WITH MINTAXA {mintaxa} ...'))
        argums = [prog, F_TAX, F_FIL, F_PKL, F_PKL,  # '--pickle' twice
                  F_OUT + os.path.join(TEST_OUTPUT_DIR, out_prefix),
                  F_CTR, F_MHL + str(minscore), F_D]
        if mintaxa is not None:
            argums.append(F_MIN + str(mintaxa))
            argums.append(F_CTR_MIN + str(mintaxa))
        if args.strain:
            argums.append('--strain')
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(5)

    def test_rextract():
        """Test rextract"""
        prog: Filename = Filename(prefix + 'rextract')
        print(magenta(f'\n>>> TESTING {prog} ...'))
        argums = [prog, F_CFG, F_FAQ, F_INC, F_D, F_C]
        if args.debug:
            print(argums)
        cproc = sp.run(argums, universal_newlines=True)
        print(magenta(f'\n<<< END {prog} TEST: '), end='')
        if cproc.returncode == 0:
            print(green('OK!'))
        else:
            print(red('FAILED!'))
            if not args.ignore:
                sys.exit(6)

    def load_excels(just_excel: Filename = None):
        """Load excels from standard and recentrifuge results"""
        nonlocal pd_xlsx, pd_stnd, pd_mock  # type: ignore  # mypy issue #7057
        if just_excel is None:
            print(magenta(f'>>> LOADING TEST RESULTS ... '), end='')
        try:
            if just_excel is None:
                pd_xlsx = pd.ExcelFile(XLSX_PATH,
                                       engine='openpyxl',
                                       )
            else:
                pd_xlsx = pd.ExcelFile(just_excel,
                                       engine='openpyxl',
                                       )
        except OSError:
            if just_excel is None:
                print(red('FAILED!\nERROR!'), 'Results for test not found!')
            else:
                print(red('FAILED!\nERROR!'), f'Problem with {just_excel}.')
                if args.skip:
                    print(blue('HINT:'),
                          'You probably want to rerun without the skip flag')
            raise
        else:
            if just_excel is None:
                print(green('OK!'))
        if just_excel is None:
            print(magenta(f'>>> LOADING STANDARD RESULTS ... '), end='')
            try:
                pd_stnd = pd.ExcelFile(STND_PATH,
                                       engine='openpyxl',
                                       )
            except OSError:
                print(red('FAILED!\nERROR!'), 'Missing standard test results!')
                raise
            else:
                print(green('OK!'))
            print(magenta(f'>>> LOADING MOCK DATA ... '), end='')
            try:
                pd_mock = pd.ExcelFile(MOCK_PATH,
                                       engine='openpyxl',
                                       )
            except OSError:
                print(red('FAILED!\nERROR!'), 'Missing mock xlsx file!')
                raise
            else:
                print(green('OK!'))

    def test_rextract_results():
        """Test rextract results against standard values"""
        prog: Filename = Filename(prefix + 'rextract')
        print(magenta(f'\n>>> CHECKING {prog} RESULTS AGAINST STANDARD ... '),
              end='', flush=True)

        df_mock = pd_mock.parse(
            sheet_name=MOCK_SHEET, header=[0],
            index_col=1, skipfooter=1,
            engine='openpyxl',
        )
        cnt_mck: int
        try:
            cnt_mck = df_mock[REXTRACT_TEST_SAMPLE.rstrip('.out')][
                int(REXTRACT_TEST_TAXID)]
        except KeyError:
            cnt_mck = 0

        with gzip.open(REXTRACTED_FASTQ_GZ, 'rt') as fgz:
            record_iterator = SeqIO.parse(fgz, "quickfastq")
            num_seqs: int = len(list(record_iterator))
        if num_seqs == cnt_mck:
            print(blue(f'{cnt_mck}'), 'fastq seqs', green('OK!'))
        else:
            print(red('FAILED!'))
            print('Difference between test results and standard values:')
            print(f'Expected {cnt_mck} FASTQ seqs and got {num_seqs} instead.')
            if not args.ignore:
                sys.exit(7)

    def analyze_robust_contam_removal():
        """Analyze recentrifuge results for robust contamination removal"""
        print(magenta(f'\n>>> ANALYZING ROBUST CONTAMINATION REMOVAL ... '),
              end='')

        # Figure initialization
        filterwarnings("ignore", category=UserWarning, module="matplotlib")
        fig, ax = plt.subplots(
            nrows=2, ncols=1, squeeze=False, sharey='col', sharex='col',
            figsize=(20, 12)
        )
        fig.set_tight_layout({'rect': [0.0, 0.0, 1.0, 1.0],
                              'w_pad': 0, 'h_pad': 0.1})
        ax_raw = ax[0, 0]
        ax_ctr = ax[1, 0]

        # Load data sheets as pandas DataFrames
        df_stnd = pd_stnd.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',

        )
        df_xlsx = pd_xlsx.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        df_xlsx.columns = df_stnd.columns  # Normalize columns name
        df_key = pd_mock.parse(
            sheet_name=KEY_SHEET, header=[0],
            index_col=0, dtype={'NAME': bytearray, 'COLOR': str, 'HATCH': str,
                                'NATIVES': list, 'RANK': str, 'HIST': bool},
            engine='openpyxl',
        )

        # Data initialization
        numtaxa = len(df_key['HIST'][df_key['HIST']])
        n = len(SAMPLES)
        ind = np.arange(n)  # the x locations for all the samples
        width = 0.90 / numtaxa  # the width of the bars regarding space
        ymax = 1e+5
        ymin = 1e+0

        # Generate bar plot
        rects: List[Any] = []
        for num, (taxid, taxon) in enumerate(df_key.iterrows()):
            if not taxon['HIST']:
                break
            rects.append(
                ax_raw.bar(
                    ind + num * width,
                    [df_xlsx[s][UNASSIGNED][int(taxid)] for s in SAMPLES],
                    width, color=[taxon["COLOR"]], hatch=taxon["HATCH"],
                    alpha=0.5, label=taxon["NAME"])
            )
            ax_ctr.bar(ind + num * width,
                       [0.1, 0.1, 0.1] +
                       [df_xlsx[s][UNASSIGNED][int(taxid)] for s in CTRL_SP],
                       width, color=[taxon["COLOR"]], hatch=taxon["HATCH"],
                       alpha=0.5, label=taxon["NAME"])
        # Add mintaxa threshold
        rects.append(ax_raw.axhline(y=MINTAXA_DEFAULT, color='r', lw=1))
        rects.append(ax_ctr.axhline(y=MINTAXA_DEFAULT, color='r', lw=1))
        # Set scale, axis lims, ticks, and grid
        ax_raw.set_yscale("log", nonpositive='clip')
        ax_raw.set_ylim(top=ymax, bottom=ymin)
        xmin = ind[0] - 0.5 + width * (numtaxa - 1) / 2
        xmax = ind[-1] + 0.5 + width * (numtaxa - 1) / 2
        ax_raw.set_xlim(left=xmin, right=xmax)
        ax_raw.set_xticks(ind + width * (numtaxa - 1) / 2)
        ax_raw.set_xticks(ind[:-1] + 0.5 +
                          width * (numtaxa - 1) / 2, minor=True)
        ax_raw.xaxis.grid(True, which='minor', linewidth=5, color='k')
        ax_raw.yaxis.grid(True, alpha=0.5)
        ax_ctr.xaxis.grid(True, which='minor', linewidth=5, color='k')
        ax_ctr.yaxis.grid(True, alpha=0.5)
        ax_ctr.set_xticklabels(SAMPLES, fontdict={'fontsize': 'xx-large'})
        ax_ctr.legend(rects,
                      [pair[1]['NAME'] for pair in df_key.iterrows()
                       if pair[1]['HIST']] +
                      [r'$\mathtt{mintaxa}$'],
                      loc='best', framealpha=0.9,
                      prop={'family': 'sans-serif', 'size': 'medium'})

        # Add inner titles
        ax_raw.text(
            ind[n - 2] + width * (numtaxa - 1) / 2, ymax / 1.3,
            'Species or below in raw samples', size=20,
            ha="center", va="center",
            bbox=dict(
                boxstyle="round", ec=(1., 0.4, 0.4),
                fc=(1., 0.7, 0.7)
            )
        )
        ax_ctr.text(
            ind[n - 2] + width * (numtaxa - 1) / 2, ymax / 1.3,
            'Species after robust contamination removal', size=20,
            ha="center", va="center",
            bbox=dict(
                boxstyle="round", ec=(0.2, 0.8, 0.2),
                fc=(0.5, 0.8, 0.5)
            )
        )

        # Label negative controls
        ax_raw.fill_between([xmin, ind[2] + 0.5 + width * (numtaxa - 1) / 2],
                            ymax, color='c', alpha=0.1)
        ax_raw.text(
            ind[2] + 0.4 + width * (numtaxa - 1) / 2, ymax / 10,
            'negative controls', ha="center", va="center",
            rotation=-90, fontsize='x-large', color='b'
        )
        ax_ctr.fill_between([xmin, ind[2] + 0.5 + width * (numtaxa - 1) / 2],
                            ymax, color='c', alpha=0.1)
        ax_ctr.text(
            ind[2] + 0.4 + width * (numtaxa - 1) / 2, ymax / 10,
            'negative controls', ha="center", va="center",
            rotation=-90, fontsize='x-large', color='b'
        )

        # Save and show figure
        fig.savefig(os.path.join(TEST_OUTPUT_DIR, TEST_PDF_FILE),
                    bbox_inches='tight')
        try:
            fig.show()
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to show plot.')
        try:
            plt.close(fig)
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to close figure.')
        else:
            print(green('OK!'))

    def get_roc(plot_roc: bool = True, silent: bool = False
                ) -> Tuple[Dict[Sample, float], Dict[Sample, float]]:
        """Get ROC parameters and plot ROC"""
        if not silent:
            print(magenta(f'\n>>> GET ROC ... '))
            print(magenta(f'>> GET PARAMETERS... '), end='', flush=True)

        # Load data sheets as pandas DataFrames
        df_stnd = pd_stnd.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        df_xlsx = pd_xlsx.parse(
            sheet_name=str(Extra.FULL), header=[0, 1],
            skiprows=None, index_col=0,
            engine='openpyxl',
        )
        df_mock = pd_mock.parse(
            sheet_name=MOCK_SHEET, header=[0],
            index_col=1, skipfooter=1,
            engine='openpyxl',
        )
        del df_mock['RECENTRIFUGE MOCK']
        df_key = pd_mock.parse(
            sheet_name=KEY_SHEET, header=[0],
            index_col=0, dtype={'NAME': bytearray, 'COLOR': str, 'HATCH': str,
                                'NATIVES': list, 'RANK': str, 'HIST': bool},
            engine='openpyxl',
        )

        # Normalize column names to the standard names (without directory)
        stnd_samples = df_stnd.columns.get_level_values(0)
        rename_cols = {TEST_OUTPUT_DIR + spl: spl for spl in stnd_samples}
        df_xlsx.rename(columns=rename_cols, inplace=True, level=0)

        # Get data
        positives: Counter[Sample] = col.Counter()
        negatives: Counter[Sample] = col.Counter()
        true_pos_raw: Counter[Sample] = col.Counter()
        fake_pos_raw: Counter[Sample] = col.Counter()
        true_pos_ctr: Counter[Sample] = col.Counter()
        fake_pos_ctr: Counter[Sample] = col.Counter()
        cnt_mck: int
        cnt_raw: int
        cnt_ctr: int
        for taxid, taxon in df_key.iterrows():
            if 'species' not in taxon['RANK']:
                continue
            for spl in SAMPLEX[3:]:
                try:
                    cnt_mck = df_mock[spl][int(taxid)]
                except KeyError:
                    cnt_mck = 0
                try:
                    cnt_raw = df_xlsx[spl][UNASSIGNED][int(taxid)]
                except KeyError:
                    cnt_raw = 0
                try:
                    cnt_ctr = df_xlsx[spl
                                      + STR_CTRL_SP][UNASSIGNED][int(taxid)]
                except KeyError:
                    cnt_ctr = 0
                if spl in taxon['NATIVES']:
                    positives[spl] += cnt_mck
                    true_pos_raw[spl] += cnt_raw
                    true_pos_ctr[spl] += cnt_ctr
                else:
                    negatives[spl] += cnt_mck
                    fake_pos_raw[spl] += cnt_raw
                    fake_pos_ctr[spl] += cnt_ctr
        # Get TPR and FPR
        tpr_raw: Dict[Sample, float] = {}
        fpr_raw: Dict[Sample, float] = {}
        tpr_ctr: Dict[Sample, float] = {}
        fpr_ctr: Dict[Sample, float] = {}
        segs: List[List[Tuple[float, float]]] = []
        for spl in SAMPLEX[3:]:
            # For raw samples
            if true_pos_raw[spl] > positives[spl]:
                fake_pos_raw[spl] += (true_pos_raw[spl] - positives[spl])
                true_pos_raw[spl] = positives[spl]
            tpr_raw[spl] = true_pos_raw[spl] / positives[spl]
            fpr_raw[spl] = fake_pos_raw[spl] / negatives[spl]
            # For CTRL samples
            if true_pos_ctr[spl] > positives[spl]:
                fake_pos_ctr[spl] += (true_pos_ctr[spl] - positives[spl])
                true_pos_ctr[spl] = positives[spl]
            tpr_ctr[spl] = true_pos_ctr[spl] / positives[spl]
            fpr_ctr[spl] = fake_pos_ctr[spl] / negatives[spl]
            segs.append([(fpr_raw[spl], tpr_raw[spl]),
                         (fpr_ctr[spl], tpr_ctr[spl])])
        if not silent:
            print(green('OK!'), flush=True)
        vprint('TPR (raw samples) =', tpr_raw)
        vprint('FPR (raw samples) =', fpr_raw)
        vprint('TPR (CTRL samples) =', tpr_ctr)
        vprint('FPR (CTRL samples) =', fpr_ctr)
        vprint(f'Coordinates (for {DEBUG_SP}) =',
               segs[SAMPLEX.index(DEBUG_SP)-3])  # 3 = num of ctrl samples
        vprint(f'P (for {DEBUG_SP}) =', positives[DEBUG_SP])
        vprint(f'N (for {DEBUG_SP}) =', negatives[DEBUG_SP])
        vprint('TP (...raw) =', true_pos_raw[DEBUG_SP])
        vprint('FP (...raw) =', fake_pos_raw[DEBUG_SP])
        vprint('TP (...CTRL) =', true_pos_ctr[DEBUG_SP])
        vprint('FP (...CTRL) =', fake_pos_ctr[DEBUG_SP])

        if plot_roc:
            if not silent:
                print(magenta(f'>> PLOT ROC... '), end='', flush=True)
            # Figure initialization
            filterwarnings("ignore", category=UserWarning, module="matplotlib")
            fig, ax = plt.subplots(figsize=(6, 6))
            ax.set_xlim(-0.02, 1.02)
            ax.set_ylim(-0.02, 1.02)
            ax.set_aspect('equal')
            ax.set_xlabel('FPR | (1 - specificity)')
            ax.set_ylabel('TPR | Sensitivity | Recall')
            ax.set_title('Evolution from raw samples to CTRL_species samples')

            # Generate arrow plot
            arrows: List[Any] = []
            for spl, color in zip(SAMPLEX[3:], ['r', 'g', 'b', 'm', 'c']):
                arrows.append(
                    ax.arrow(fpr_raw[spl], tpr_raw[spl],
                             fpr_ctr[spl] - fpr_raw[spl],
                             tpr_ctr[spl] - tpr_raw[spl],
                             shape='full', lw=1, length_includes_head=True,
                             head_width=0.02, head_length=0.01, color=color)
                )
            arrows.append(
                ax.arrow(1.2, 1.2, -1.4, -1.4, lw=1, linestyle=':',
                         head_width=0, head_length=0, color='0.8', alpha=0.5))
            ax.legend(arrows,
                      SAMPLEX[3:] + ['ROC plot diagonal'], loc='lower right',
                      prop={'family': 'sans-serif', 'size': 'medium'},
                      framealpha=0.9)
            # Save and show figure
            fig.savefig(os.path.join(TEST_OUTPUT_DIR, ROC_CTRL_PDF_FILE),
                        bbox_inches='tight')
            try:
                fig.show()
            except KeyboardInterrupt:
                print(gray(' User'), yellow('interrupted!'))
                raise
            except Exception:
                print(yellow('WARNING!'), 'Unable to show plot.')
            try:
                plt.close(fig)
            except KeyboardInterrupt:
                print(gray(' User'), yellow('interrupted!'))
                raise
            except Exception:
                print(yellow('WARNING!'), 'Unable to close figure.')
            else:
                if not silent:
                    print(green('OK!'), flush=True)

        # Return ROC values
        return tpr_ctr, fpr_ctr

    def get_mintaxa_roc():
        """Get ROC parameters function of mintaxa and plot ROC"""
        print(magenta(f'\n>>> GET MINTAXA ROC ... '), end='', flush=True)
        tpr_mintax: Dict[int, Dict[Sample, float]] = {}
        fpr_mintax: Dict[int, Dict[Sample, float]] = {}
        vals = list(range(11)) + [
            12, 15, 25, 50, 100, 150, 200, 250, 400, 500, 800, 1000, 1500,
            2000, 2500, 4000, 5000, 8000, 10000, 15000, 20000, 40000, 80000]
        html_name = [Filename(TEST_PREFIX + '_mintaxa' + str(v) + HTML_SUFFIX)
                     for v in vals]
        for mintax, html in zip(vals, html_name):
            print(magenta(f' {mintax},'), end='', flush=True)
            if not args.skip:
                test_rcf(out_prefix=html, mintaxa=mintax)
            xlsx_test_filename: Filename = Filename(
                TEST_PREFIX + '_mintaxa' + str(mintax) + XLSX_SUFFIX)
            load_excels(just_excel=Filename(
                os.path.join(TEST_OUTPUT_DIR, xlsx_test_filename)))
            (tpr_ctr, fpr_ctr) = get_roc(plot_roc=False, silent=True)
            tpr_mintax[mintax] = tpr_ctr
            fpr_mintax[mintax] = fpr_ctr
        vprint('\t All TPR =', tpr_mintax)
        vprint('\t All FPR =', fpr_mintax)

        # Figure initialization
        filterwarnings("ignore", category=UserWarning, module="matplotlib")
        fig, ax = plt.subplots(figsize=(6, 6))
        ax.set_xlim(-0.02, 1.02)
        ax.set_ylim(-0.02, 1.02)
        ax.set_aspect('equal')
        ax.set_xlabel('FPR | (1 - specificity)')
        ax.set_ylabel('TPR | Sensitivity | Recall')
        ax.set_title('Evolution of ROC with mintaxa variation')
        rocs: List[Any] = []
        for spl, color in zip(SAMPLEX[3:], ['r', 'g', 'b', 'm', 'c']):
            x_fpr = [fpr_mintax[mintax][spl] for mintax in vals]
            y_tpr = [tpr_mintax[mintax][spl] for mintax in vals]
            rocs.append(  # Append element 0 of axis.plot
                ax.plot(x_fpr, y_tpr, color=color, linestyle='-', marker='',
                        linewidth=2, label=spl)[0])
        x_fpr_avg = [mean(fpr_mintax[mintax].values()) for mintax in vals]
        y_tpr_avg = [mean(tpr_mintax[mintax].values()) for mintax in vals]
        rocs.append(  # Append element 0 of axis.plot
            ax.plot(x_fpr_avg, y_tpr_avg, color='orange',
                    linestyle=':', marker='', linewidth=3, label='MEAN')[0])
        rocs.append(
            ax.arrow(1.2, 1.2, -1.4, -1.4, lw=1, linestyle=':',
                     head_width=0, head_length=0, color='0.8',
                     alpha=0.5, label='ROC plot diagonal'))
        ax.legend(rocs, SAMPLEX[3:] + ['Mean of samples', 'ROC plot diagonal'],
                  prop={'family': 'sans-serif', 'size': 'medium'},
                  framealpha=0.9, loc='lower right')
        # Annotate ROC by mintaxa
        vals_show = [0, 2, 3, 4, 5, 15, 25, 200]
        for num, mintax in enumerate(vals_show):
            x_pos = mean(fpr_mintax[mintax].values())
            y_pos = mean(tpr_mintax[mintax].values())
            ax.annotate(str(mintax), xy=(x_pos, y_pos),
                        xytext=(x_pos+0.4+0.01*num,
                                y_pos-0.06*(len(vals_show)-num/2)),
                        ha="center", va="center", color='0.2',
                        fontsize='small', fontfamily='monospace',
                        arrowprops=dict(color='0.2', shrink=0.02,
                                        width=0, headwidth=2,
                                        linestyle='-', linewidth=0.1)
                        )
        vals_show = [500, 1000, 2000, 4000, 5000, 8000, 20000, 40000]
        for num, mintax in enumerate(vals_show):
            x_pos = mean(fpr_mintax[mintax].values())
            y_pos = mean(tpr_mintax[mintax].values())
            ax.annotate(str(mintax), xy=(x_pos, y_pos),
                        xytext=(x_pos+0.12+0.04*(len(vals_show)-num),
                                y_pos), ha="center", va="center", color='0.2',
                        fontsize='small', fontfamily='monospace',
                        arrowprops=dict(color='0.2', shrink=0.02,
                                        width=0, headwidth=2,
                                        linestyle='-', linewidth=0.1)
                        )

    # Save and show figure
        fig.savefig(os.path.join(TEST_OUTPUT_DIR, ROC_MINTAX_PDF_FILE),
                    bbox_inches='tight')
        try:
            fig.show()
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to show plot.')
        try:
            plt.close(fig)
        except KeyboardInterrupt:
            print(gray(' User'), yellow('interrupted!'))
            raise
        except Exception:
            print(yellow('WARNING!'), 'Unable to close figure.')
        else:
            print(green(' OK!'), flush=True)

    def test_rcf_results():
        """Test recentrifuge results against standard"""
        prog: Filename = Filename(prefix + 'rcf')
        print(magenta(f'\n>>> COMPARING {prog} RESULTS WITH STANDARD:'))

        for sheet in [STATS_SHEET_NAME, str(Extra.FULL)]:
            print(magenta(f'>> TEST FOR {sheet}... '), end='')
            df_stnd = pd_stnd.parse(
                sheet_name=sheet, header=[0, 1],
                skiprows=None, index_col=0,
                engine='openpyxl',
            )
            df_xlsx = pd_xlsx.parse(
                sheet_name=sheet, header=[0, 1],
                skiprows=None, index_col=0,
                engine='openpyxl',
            )
            df_xlsx.columns = df_stnd.columns  # Normalize columns name
            try:
                assert_frame_equal(df_xlsx, df_stnd, check_exact=False)
            except AssertionError as e:
                print(red('FAILED!'))
                print(e)
                df_mask = df_xlsx.ne(df_stnd)
                print(blue('Test results are:'))
                df_drop = df_xlsx[df_mask].dropna(axis=0, how='all')
                df_drop.dropna(axis=1, how='all', inplace=True)
                print(df_drop)
                print(blue('... but standard results are:'))
                df_drop = df_stnd[df_mask].dropna(axis=0, how='all')
                df_drop.dropna(axis=1, how='all', inplace=True)
                print(df_drop)
                if not args.ignore:
                    sys.exit(8)
            else:
                print(green('OK!'))

    # Timing initialization
    start_time: float = time.time()

    # Program header
    print(f'\n=-= {sys.argv[0]} =-= v{__version__} - {__date__}'
          f' =-= by {__author__} =-=\n')
    sys.stdout.flush()

    # Parse arguments
    argparser = configure_parser()
    args = argparser.parse_args()
    minscore: Score = args.minscore
    prefix: Filename = Filename('')
    if args.local:
        prefix = Filename('./')

    check_debug_strain()
    test_version()
    test_retaxdump()
    test_remock()
    test_rcf()
    test_rextract()

    pd_xlsx: pd.ExcelFile = None
    pd_stnd: pd.ExcelFile = None
    pd_mock: pd.ExcelFile = None

    load_excels()
    test_rextract_results()
    if args.strain:
        print(blue('INFO:'),
              gray('Skipping further tests due to the --strain option!'))
    else:
        analyze_robust_contam_removal()
        if args.roc:
            get_roc(plot_roc=True)
        if args.mintaxa:
            get_mintaxa_roc()

        load_excels()
        test_rcf_results()

    # Timing results
    print(gray('Total test time:'), time.strftime(
        "%H:%M:%S", time.gmtime(time.time() - start_time)))


if __name__ == '__main__':
    main()
