Source code for WORC.plotting.plot_SVR

#!/usr/bin/env python

# Copyright 2016-2019 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
from sklearn.metrics import r2_score, mean_squared_error
import sys
import WORC.plotting.compute_CI
import pandas as pd
import os
import lifelines as ll
import WORC.processing.label_processing as lp
from scipy.stats import pearsonr, spearmanr
from WORC.classification.metrics import ICC


[docs]def plot_single_SVR(prediction, label_data, label_type, survival=False, show_plots=False, alpha=0.95): if type(prediction) is not pd.core.frame.DataFrame: if os.path.isfile(prediction): prediction = pd.read_hdf(prediction) keys = prediction.keys() SVRs = list() label = keys[0] SVRs = prediction[label]['classifiers'] Y_test = prediction[label]['Y_test'] X_test = prediction[label]['X_test'] Y_train = prediction[label]['X_train'] if survival: # Also extract time to event and if event occurs from label data labels = [[label_type], ['E'], ['T']] else: labels = [[label_type]] if type(label_data) is not dict: if os.path.isfile(label_data): label_data = lp.load_labels(label_data, labels) patient_IDs = label_data['patient_IDs'] labels = label_data['label'] # Initialize scoring metrics r2score = list() MSE = list() coefICC = list() PearsonC = list() PearsonP = list() SpearmanC = list() SpearmanP = list() if survival: cindex = list() coxp = list() coxcoef = list() patient_MSE = dict() for i in range(0, len(Y_test)): test_patient_IDs = prediction[label]['patient_ID_test'][i] # FIXME: Put some wrong patient IDs in test files for num in range(0, len(test_patient_IDs)): if 'features_' in test_patient_IDs[num]: test_patient_IDs[num] = test_patient_IDs[num][9::] if '__tpl.hdf5' in test_patient_IDs[num]: test_patient_IDs[num] = test_patient_IDs[num][0:-10] test_patient_IDs = np.asarray(test_patient_IDs) X_temp = X_test[i] test_indices = list() for i_ID in test_patient_IDs: # FIXME: Error in specific study if i_ID == '112_recurrence-preop': i_ID = '112_recurrence_preop' test_indices.append(np.where(patient_IDs == i_ID)[0][0]) y_truth = [labels[0][k][0] for k in test_indices] if type(SVRs) == list or type(SVRs) == tuple: estimator = SVRs[i] else: estimator = SVRs scaler = estimator.best_scaler try: y_prediction = estimator.predict(scaler.transform(X_temp)) except ValueError: y_prediction = estimator.predict(X_temp) y_truth = np.asarray(y_truth) # if survival: # # Normalize the scores # y_prediction = np.subtract(1.01, np.divide(y_prediction, np.max(y_prediction))) print("Truth: " + y_truth) print("Prediction: " + y_prediction) # Compute error per patient for i_truth, i_predict, i_test_ID in zip(y_truth, y_prediction, test_patient_IDs): if i_test_ID not in patient_MSE.keys(): patient_MSE[i_test_ID] = list() patient_MSE[i_test_ID].append((i_truth - i_predict)**2) # Compute evaluation metrics r2score.append(r2_score(y_truth, y_prediction)) MSE.append(mean_squared_error(y_truth, y_prediction)) coefICC.append(ICC(np.column_stack((y_prediction, y_truth)))) C = pearsonr(y_prediction, y_truth) PearsonC.append(C[0]) PearsonP.append(C[1]) C = spearmanr(y_prediction, y_truth) SpearmanC.append(C.correlation) SpearmanP.append(C.pvalue) if survival: # Extract time to event and event from label data E_truth = np.asarray([labels[1][k][0] for k in test_indices]) T_truth = np.asarray([labels[2][k][0] for k in test_indices]) # Concordance index cindex.append(1 - ll.utils.concordance_index(T_truth, y_prediction, E_truth)) # Fit Cox model using SVR output, time to event and event data = {'predict': y_prediction, 'E': E_truth, 'T': T_truth} data = pd.DataFrame(data=data, index=test_patient_IDs) cph = ll.CoxPHFitter() cph.fit(data, duration_col='T', event_col='E') coxcoef.append(cph.summary['coef']['predict']) coxp.append(cph.summary['p']['predict']) # Compute confidence intervals for given metrics N_1 = float(len(Y_train[0])) N_2 = float(len(Y_test[0])) if len(r2score) == 1: # No confidence intevals, just take the scores stats = dict() stats["r2_score:"] = str(r2score[0]) stats["MSE:"] = str(MSE[0]) stats["ICC:"] = str(coefICC[0]) stats["PearsonC:"] = str(PearsonC[0]) stats["SpearmanC: "] = str(SpearmanC[0]) stats["PearsonP:"] = str(PearsonP[0]) stats["SpearmanP: "] = str(SpearmanP[0]) if survival: stats["Concordance:"] = str(cindex[0]) stats["Cox coef.:"] = str(coxcoef[0]) stats["Cox p:"] = str(coxp[0]) else: # Compute confidence intervals from cross validations stats = dict() stats["r2_score 95%:"] = str(compute_CI.compute_confidence(r2score, N_1, N_2, alpha)) stats["MSE 95%:"] = str(compute_CI.compute_confidence(MSE, N_1, N_2, alpha)) stats["ICC 95%:"] = str(compute_CI.compute_confidence(coefICC, N_1, N_2, alpha)) stats["PearsonC 95%:"] = str(compute_CI.compute_confidence(PearsonC, N_1, N_2, alpha)) stats["SpearmanC 95%: "] = str(compute_CI.compute_confidence(SpearmanC, N_1, N_2, alpha)) stats["PearsonP 95%:"] = str(compute_CI.compute_confidence(PearsonP, N_1, N_2, alpha)) stats["SpearmanP 95%: "] = str(compute_CI.compute_confidence(SpearmanP, N_1, N_2, alpha)) if survival: stats["Concordance 95%:"] = str(compute_CI.compute_confidence(cindex, N_1, N_2, alpha)) stats["Cox coef. 95%:"] = str(compute_CI.compute_confidence(coxcoef, N_1, N_2, alpha)) stats["Cox p 95%:"] = str(compute_CI.compute_confidence(coxp, N_1, N_2, alpha)) for k, v in stats.iteritems(): print(k, v) # Calculate and sort individual patient MSE patient_MSE = {k: np.mean(v) for k, v in patient_MSE.iteritems()} order = np.argsort(patient_MSE.values()) sortedkeys = np.asarray(patient_MSE.keys())[order].tolist() sortedvalues = np.asarray(patient_MSE.values())[order].tolist() patient_MSE = [(k, v) for k, v in zip(sortedkeys, sortedvalues)] for p in patient_MSE: print(p[0], p[1]) stats["Patient_MSE"] = patient_MSE if show_plots: # TODO: Plot metrics, see also plot_SVM pass return stats
[docs]def main(): if len(sys.argv) == 1: prediction = '/media/martijn/DATA/CLM_MICCAI/Results/classification_all_SVR.hdf5' label_data = '/home/martijn/git/RadTools/CLM_MICCAI/pinfo_CLM_MICCAI_test_months.txt' label_type = 'KM' survival = True elif len(sys.argv) != 3: raise IOError("This function accepts two arguments") else: prediction = sys.argv[1] label_data = sys.argv[2] plot_single_SVR(prediction, label_data, label_type, survival)
if __name__ == '__main__': main()