Source code for MIset

import pandas as pd
import numpy as np
from joblib import Parallel,delayed
import warnings
from collections import Counter


[docs] class MIset: """This is the class representation of this library. Invoke the methods of this class to perform feature selection on your dataset. :param max_features: Choose maximum count of important features to be given by the feature selection method, defaults to 1. :type max_features: int :param variant: Choose which feature selection method must be used. The following options are available: * **'jmim'** : **'Joint Mutual Information Maximization'** method as described in this `paper <https://doi.org/10.1016/j.eswa.2015.07.007>`__. * **'njmim'** : **'Normalized Joint Mutual Information Maximization'** method as described in this `paper <https://doi.org/10.1016/j.eswa.2015.07.007>`__. * **'jomic'** : **'Joint Mutual Information with Class Relevance'** method as described in this `paper <https://doi.org/10.1016/j.jcmds.2023.100075>`__. :type variant: str :param verbose: Choose whether to print messages to show feature selection progress. A message is printed once every most relevant feature is found, parameter defaults to False. :type verbose: bool, optional :param n_jobs: The number of jobs to use while computing the feature selection method. Passing -1 means using all processors. Parallelization is done via 'joblib'. :type n_jobs: int, optional """ class _entropy_calc: """This is a private nested class under class MIset. This houses methods used for entropy and mutual information calculation. """ @staticmethod def marginalEntropy(arr): """Calculates the marginal entropy (H(X)) of a variable X :param arr: Values of a variable :type arr: pandas series :return: Marginal Entropy of a variable X :rtype: float """ # Count the number of elements in the array total_elements=len(arr) # Make a dictionary where the key is the element and the value is its frequency cat_dict=dict(Counter(arr)) # Calculate entropy for a single variable return np.round(-1*sum([(value/total_elements)*np.log2(value/total_elements) for value in cat_dict.values()]),8) @staticmethod def jointEntropy(x_arr,y_arr): """Calculates the joint entropy (H(X,Y)) given both variables X and Y :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :return: Joint Entropy of a variable X and Y :rtype: float """ # Count the number of elements in the array total_elements=len(x_arr) # Since joint entropy is being calculated, both the variables must be paired first cat_dict=dict(Counter(list(zip(x_arr,y_arr)))) # Calculate joint entropy # This is marked as H(X,Y) return np.round(-1*sum([(value/total_elements)*np.log2(value/total_elements) for value in cat_dict.values()]),8) @staticmethod def conditionalEntropy(x_arr,y_arr): """Calculates the joint entropy (H(Y|X)) given both variables X and Y :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :return: Conditional Entropy of variables X and Y :rtype: float """ # This calculates H(Y|X) # So H(X|Y) = H(X,Y)- H(Y) return MIset._entropy_calc.jointEntropy(x_arr,y_arr)-MIset._entropy_calc.marginalEntropy(y_arr) @staticmethod def tripleJointEntropy(x_arr,y_arr,c_arr): """Calculates the joint entropy (H(X,Y,Z)) of three variables X,Y and C :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :param c_arr: Values of a variable C :type c_arr: pandas series :return: Joint Entropy of a variables X,Y and C :rtype: float """ # Count the number of elements in the array total_elements=len(x_arr) # Since joint entropy is being calculated, both the variables must be paired first cat_dict=dict(Counter(list(zip(x_arr,y_arr,c_arr)))) # Calculate triple joint entropy # This is marked as H(X,Y,C) return np.round(-1*sum([(value/total_elements)*np.log2(value/total_elements) for value in cat_dict.values()]),8) @staticmethod def mutualInformationScore(x_arr,y_arr): """Calculates the mutual information (I(X;Y)) of variables X and Y :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :return: Mutual Infroamtion score of a variable :rtype: float """ # I(X;Y) = H(X) - H(X|Y) # OR # I(X;Y) = H(X) + H(Y) - H(X,Y) return MIset._entropy_calc.marginalEntropy(x_arr) + MIset._entropy_calc.marginalEntropy(y_arr) - MIset._entropy_calc.jointEntropy(x_arr,y_arr) @staticmethod def jointMutualInformationScore(x_arr,y_arr,c_arr): """Calculates the joint mutual information (I(X,Y;C)) of variables X, Y and C :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :param c_arr: Values of a variable C :type c_arr: pandas series :return: Conditional Entropy of a variable :rtype: float """ # I(X;C|Y) = H(X|Y) - H(X|C,Y) # = H(X,Y) - H(Y) - (H(X,Y,C)-H(C,Y)) mi_score_conditional = MIset._entropy_calc.jointEntropy(x_arr,y_arr) - MIset._entropy_calc.marginalEntropy(y_arr) - (MIset._entropy_calc.tripleJointEntropy(x_arr,y_arr,c_arr) - MIset._entropy_calc.jointEntropy(c_arr,y_arr)) # I(X,Y;C) = I(X;C|Y) + I(Y;C) return mi_score_conditional + MIset._entropy_calc.mutualInformationScore(y_arr,c_arr) @staticmethod def interactionInformation(x_arr,y_arr,c_arr): """Calculates the interaction information (I(X;Y;C)) of variables X, Y and C :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :param c_arr: Values of a variable C :type c_arr: pandas series :return: Interaction Information of variables X,Y and C :rtype: float """ # I(X;Y;C) = I(X,Y;C) - I(X;C) - I(Y;C) return MIset._entropy_calc.jointMutualInformationScore(x_arr,y_arr,c_arr) - MIset._entropy_calc.mutualInformationScore(x_arr,c_arr) - MIset._entropy_calc.mutualInformationScore(y_arr,c_arr) class _core_scores: """This is a private nested class under class MIset. This houses methods that calculate scoring for each feature. """ @staticmethod def computeP1InnerLoopScores(variant,x_arr,y_arr,c_arr): """Calculates the 'Joint Mutual Information Maximization' score or the 'Normalized Mutual Information Maximization' score of a feature :param variant: Feature selection algorithm to be selected according to the variant :type variant: str :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :param c_arr: Values of a variable C :type c_arr: pandas series :return: Feature selection score :rtype: float """ # Calculate joint MI of candidate feature with the selected feature if variant=='jmim': variant_score=MIset._entropy_calc.jointMutualInformationScore(x_arr,y_arr,c_arr) elif variant=='njmim': # Calculate symmetric relevence variant_score=np.round(MIset._entropy_calc.jointMutualInformationScore(x_arr,y_arr,c_arr)/MIset._entropy_calc.tripleJointEntropy(x_arr,y_arr,c_arr),8) else: # This block will never get triggered variant_score=-1 return variant_score @staticmethod def computeP2InnerLoopScores(x_arr,y_arr,c_arr): """Calculates the 'Joint Mutual Information with Class Relevance' score of a feature :param x_arr: Values of a variable X :type x_arr: pandas series :param y_arr: Values of a variable Y :type y_arr: pandas series :param c_arr: Values of a variable C :type c_arr: pandas series :return: Feature selection score :rtype: float """ # Compute mutual information mi_score=MIset._entropy_calc.mutualInformationScore(x_arr,y_arr) jmi_score=MIset._entropy_calc.jointMutualInformationScore(y_arr,x_arr,c_arr) return mi_score,jmi_score class _misc: """This is a private nested class under class MIset. This houses some miscellaneous methods. """ @staticmethod def uniqueArrayIdentifier(col_name,arr): return col_name if arr.nunique()==1 else None def __init__(self,max_features=1,variant='jmim',verbose=False,n_jobs=None): """Constructor of the class MIset """ self.max_features=max_features self.variant=variant self.verbose=verbose self.n_jobs=n_jobs def _paper1FS(self,df,feature_list,class_feature_name,feature_index_dict): """Private method which implements the 'Joint Mutual Information Maximization' score or the 'Normalized Mutual Information Maximization' feature selection method :param df: Dataframe containing the data :type df: Pandas DataFrame :param feature_list: List of column names of the DataFrame on which the feature selection algorithm is to be run :type feature_list: list :param class_feature_name: The column name containing the target variable :type class_feature_name: str :param feature_index_dict: A dcitionary where the key is the column name and the value is the index number of the column in the DataFrame :type feature_index_dict: dict :return: Returns None :rtype: void """ # Create an empty list of all selected features selected_feature_list=[] # Create an empty dict of all selected features, and their corresponding Joint MI score selected_feature_score_dict={} # Get array of class variable c_arr=df.iloc[:,feature_index_dict[class_feature_name]] # First iteration dict first_iteration_dict={} # First iteration # Select the best feature with the maximum MI score for candidate_feature in feature_list: # Get array of candidate feature x_arr=df.iloc[:,feature_index_dict[candidate_feature]] first_iteration_dict.update({candidate_feature:MIset._entropy_calc.mutualInformationScore(x_arr,c_arr)}) # Get column name with maximum mutual information first_iteration_max_mi_feature_name = max(first_iteration_dict, key=first_iteration_dict.get) # Append the first iteration of feature in the list selected_feature_list.append(first_iteration_max_mi_feature_name) # Update the score dictionary selected_feature_score_dict.update({first_iteration_max_mi_feature_name:first_iteration_dict[first_iteration_max_mi_feature_name]}) # Remove the first feature from the candidate feature list feature_list.remove(first_iteration_max_mi_feature_name) if self.verbose: print(f"No.{len(selected_feature_list)} feature, '{first_iteration_max_mi_feature_name}' added.") # No need to run second part of the algorithm if max features is given as 1 if self.max_features==1: pass else: # Select subsequent features # -1 is added as one feature has already been added above while len(selected_feature_list)<=self.max_features-1: # Break out of the loop if the entire candidate feature set is exhausted if len(feature_list)==0: break else: # Initialize dictionary to hold all features which have the minimum MI score from redundency calculation of selected dict relevancy_dict={} for candidate_feature in feature_list: # Get array of candidate feature x_arr=df.iloc[:,feature_index_dict[candidate_feature]] # This list will store the joint mutual information score (in the case of 'jmim' variant) # Or symmetric relevence (in the case of 'njmim' variant) candidate_feature_selected_feature_set_score_list=[] # Implement joblib module to parallelize this process candidate_feature_selected_feature_set_score_list = Parallel(n_jobs=self.n_jobs)( delayed(MIset._core_scores.computeP1InnerLoopScores)( self.variant, x_arr, df.iloc[:,feature_index_dict[selected_feature]], # This is y_arr c_arr ) for selected_feature in selected_feature_list ) # Store the minimum joint mutual information / symmetric relevence score of a candidate feature against the entire selected feature set relevancy_dict.update({candidate_feature:min(candidate_feature_selected_feature_set_score_list)}) # Take the max MI score in relevancy dictionary # Implement the maximum of the minimum approach max_relevancy_feature_name=max(relevancy_dict, key=relevancy_dict.get) # Append the feature into the selected feature list selected_feature_list.append(max_relevancy_feature_name) # Update the score dictionary selected_feature_score_dict.update({max_relevancy_feature_name:relevancy_dict[max_relevancy_feature_name]}) # Remove the selected feature from the candidate feature base feature_list.remove(max_relevancy_feature_name) # Use verbosity parameter if self.verbose: print(f"No.{len(selected_feature_list)} feature, '{max_relevancy_feature_name}' added.") # Initialize instance variables self.selected_feature_list=selected_feature_list self.selected_feature_score_dict=selected_feature_score_dict return def _paper2FS(self,df,feature_list,class_feature_name,feature_index_dict): """Private method which implements the 'Joint Mutual Information with Class Relevance' feature selection method :param df: DataFrame containing the data :type df: Pandas DataFrame :param feature_list: List of column names of the DataFrame on which the feature selection algorithm is to be run :type feature_list: list :param class_feature_name: The column name containing the target variable :type class_feature_name: str :param feature_index_dict: A dcitionary where the key is the column name and the value is the index number of the column in the DataFrame :type feature_index_dict: dict :return: Returns None :rtype: void """ # Create an empty list of all selected features selected_feature_list=[] # Create an empty dict of all selected features, and their corresponding relevancy score selected_feature_score_dict={} # Get array of class variable c_arr=df.iloc[:,feature_index_dict[class_feature_name]] # First iteration dict first_iteration_dict={} # First iteration for candidate_feature in feature_list: # Get array of candidate feature x_arr=df.iloc[:,feature_index_dict[candidate_feature]] first_iteration_dict.update({candidate_feature:MIset._entropy_calc.mutualInformationScore(x_arr,c_arr)}) # Get column name with maximum mutual information first_iteration_max_mi_feature_name = max(first_iteration_dict, key=first_iteration_dict.get) # Append the first iteration of feature in the list selected_feature_list.append(first_iteration_max_mi_feature_name) # Update the score dictionary selected_feature_score_dict.update({first_iteration_max_mi_feature_name:first_iteration_dict[first_iteration_max_mi_feature_name]}) # Remove the first feature from the candidate feature list feature_list.remove(first_iteration_max_mi_feature_name) if self.verbose: print(f"No.{len(selected_feature_list)} feature, '{first_iteration_max_mi_feature_name}' added.") # No need to run second part of the algorithm if max features is given as 1 if self.max_features==1: pass else: # Select subsequent features # -1 is added as one feature has already been added above while len(selected_feature_list)<=(self.max_features-1): # Break out of the loop if the entire candidate feature set is exhausted if len(feature_list)==0: break else: # Initialize dictionary to hold all features which have the minimum MI score from redundency calculation of selected dict relevant_score_dict={} for candidate_feature in feature_list: # Get array of candidate feature x_arr=df.iloc[:,feature_index_dict[candidate_feature]] # Initialize variables sum_jmi=0 sum_mi=0 # Implement joblib module to parallelize this process # Output of this operation will be a list of tuples, with each tuple containing two elements # The first element of the tuple is the mutual information score # The second element of the tuple is the joint mutual information score score_results = Parallel(n_jobs=self.n_jobs)( delayed(MIset._core_scores.computeP2InnerLoopScores)( x_arr, df.iloc[:,feature_index_dict[selected_feature]], # This is y_arr c_arr ) for selected_feature in selected_feature_list ) sum_mi = sum(score_tuple[0] for score_tuple in score_results) sum_jmi = sum(score_tuple[1] for score_tuple in score_results) # Compute relevant score # relevant_score = average(jmi) - average(mi) relevant_score=(sum_jmi/len(selected_feature_list))-(sum_mi/len(selected_feature_list)) # Update the relevant score dictionary relevant_score_dict.update({candidate_feature:relevant_score}) # Take the max relevancy score in dictionary max_relevancy_feature_name=max(relevant_score_dict, key=relevant_score_dict.get) # Append the feature into the selected feature list selected_feature_list.append(max_relevancy_feature_name) # Update the score dictionary selected_feature_score_dict.update({max_relevancy_feature_name:relevant_score_dict[max_relevancy_feature_name]}) # Remove the selected feature from the candidate feature base feature_list.remove(max_relevancy_feature_name) # Use verbosity parameter if self.verbose: print(f"No.{len(selected_feature_list)} feature, '{max_relevancy_feature_name}' added.") # Initialize instance variables self.selected_feature_list=selected_feature_list self.selected_feature_score_dict=selected_feature_score_dict return
[docs] def fit(self,df,feature_list,class_feature_name): """Fit the feature selection algorithm on your dataset. :param df: Pandas DataFrame :type df: Pandas DataFrame :param feature_list: List of column names of the DataFrame on which feature selection is to be performed. :type feature_list: list :param class_feature_name: Name of the column containing your target variable. :type class_feature_name: str :return: Returns None :rtype: None """ if not isinstance(self.max_features,int): raise ValueError("Invalid value for parameter 'max_features'. This parameter only accepts a value of 'integer' datatype.") if self.max_features<1: raise ValueError("Invalid value for parameter 'max_features'. Value must be >= 1.") if not isinstance(self.variant,str): raise ValueError("Invalid value for parameter 'variant'. This parameter only accepts a value of 'string' datatype.") if self.variant not in ['jmim','njmim','jomic']: raise ValueError("Invalid value for parameter 'variant'. Supported parameters are 'jmim','njmim','jomic'") if not isinstance(self.verbose,bool): raise ValueError("Invalid value for parameter 'verbose'. This parameter only accepts a value of 'boolean' datatype.") if not isinstance(df, pd.DataFrame): raise ValueError("Invalid value for parameter 'df'. This parameter only accepts a value of 'Pandas DataFrame' datatype.") if not isinstance(feature_list,list): raise ValueError("Invalid value for parameter 'feature_list'. This parameter only accepts a value of 'list' datatype.") if not isinstance(class_feature_name,str): raise ValueError("Invalid value for parameter 'class_feature_name'. This parameter only accepts a value of 'string' datatype.") if len(feature_list)==0: raise ValueError("Parameter 'feature_list' is empty. At least one feature name must be present.") if class_feature_name not in list(df.columns): raise ValueError("Class variable is absent in the DataFrame.") if class_feature_name in feature_list: raise ValueError("Class feature name must not be present in the candidate feature list.") if df[class_feature_name].nunique()!=2: raise ValueError("Class variable does not have two unique classes.") if df[feature_list+[class_feature_name]].isnull().values.any(): raise ValueError("At least one null value is present in the DataFrame.") # Create copies of input parameters df=df.copy() feature_list=list(feature_list) class_feature_name=str(class_feature_name) # Create a mapping of column name against DataFrame index feature_index_dict = {col: df.columns.get_loc(col) for col in (feature_list+[class_feature_name])} # Identify features with only one unique value throughout redundent_feature_list = Parallel(n_jobs=self.n_jobs)(delayed(MIset._misc.uniqueArrayIdentifier)(col,df.iloc[:,feature_index_dict[col]]) for col in feature_list) # Remove elements with None type value redundent_feature_list = list(filter(lambda x: x is not None, redundent_feature_list)) if len(redundent_feature_list)>0: warnings.warn(f"Features identified having only one unique value throughout. These features are removed from candidate feature list. These features are : {','.join(redundent_feature_list)}", UserWarning) feature_list=[col for col in feature_list if col not in redundent_feature_list] if len(feature_list)==0: raise ValueError("Parameter 'feature_list' is empty after removing all features with only one unique value. Features with more than one unique value must be present in 'feature_list'.") # Determine which feature selection method to choose match self.variant: case 'jmim': self._paper1FS(df,feature_list,class_feature_name,feature_index_dict) case 'njmim': self._paper1FS(df,feature_list,class_feature_name,feature_index_dict) case 'jomic': self._paper2FS(df,feature_list,class_feature_name,feature_index_dict) case _: # Will never get triggered pass return
[docs] def top_features(self): """Get a list of feature names deemed the most important by the feature selection algorithm. Each entry in the list represents the most important feature selected during that iteration. For example, the first index of the list is the most important feature in the first iteration, the second index of the list is the most important feature in the second iteration and so on. :return: Returns the list of most important features. :rtype: list[str] """ return self.selected_feature_list
[docs] def feature_scores(self): """Get a dictionary where the key is the feature name and the value is its feature importance score as computed by your selected algorithm. :return: Returns a dictionary of feature scores. :rtype: dict """ return self.selected_feature_score_dict
[docs] def feature_selection_order(self): """Get a dictionary which provides information on which feature was deemed as most important at each iteration. The key of the dictionary is the iteration number while the value of the dictionary is the most important feature according to the feature selection algorithm in that iteration. :return: Returns a dictionary of most important feature at each iteration order. :rtype: dict """ order_dict={} counter=1 # Key is the order in which the feature was encountered, value is feature name for key,value in self.selected_feature_score_dict.items(): order_dict.update({counter:key}) counter+=1 return order_dict