Source code for smcpy.mcmc.mcmc_sampler

'''
Notices:
Copyright 2018 United States Government as represented by the Administrator of 
the National Aeronautics and Space Administration. No copyright is claimed in 
the United States under Title 17, U.S. Code. All Other Rights Reserved.
 
Disclaimers
No Warranty: THE SUBJECT SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY OF 
ANY KIND, EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED
TO, ANY WARRANTY THAT THE SUBJECT SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY
IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR
FREEDOM FROM INFRINGEMENT, ANY WARRANTY THAT THE SUBJECT SOFTWARE WILL BE ERROR
FREE, OR ANY WARRANTY THAT DOCUMENTATION, IF PROVIDED, WILL CONFORM TO THE
SUBJECT SOFTWARE. THIS AGREEMENT DOES NOT, IN ANY MANNER, CONSTITUTE AN
ENDORSEMENT BY GOVERNMENT AGENCY OR ANY PRIOR RECIPIENT OF ANY RESULTS,
RESULTING DESIGNS, HARDWARE, SOFTWARE PRODUCTS OR ANY OTHER APPLICATIONS
RESULTING FROM USE OF THE SUBJECT SOFTWARE.  FURTHER, GOVERNMENT AGENCY
DISCLAIMS ALL WARRANTIES AND LIABILITIES REGARDING THIRD-PARTY SOFTWARE, IF
PRESENT IN THE ORIGINAL SOFTWARE, AND DISTRIBUTES IT "AS IS."
 
Waiver and Indemnity:  RECIPIENT AGREES TO WAIVE ANY AND ALL CLAIMS AGAINST THE
UNITED STATES GOVERNMENT, ITS CONTRACTORS AND SUBCONTRACTORS, AS WELL AS ANY
PRIOR RECIPIENT.  IF RECIPIENT'S USE OF THE SUBJECT SOFTWARE RESULTS IN ANY
LIABILITIES, DEMANDS, DAMAGES, EXPENSES OR LOSSES ARISING FROM SUCH USE,
INCLUDING ANY DAMAGES FROM PRODUCTS BASED ON, OR RESULTING FROM, RECIPIENT'S
USE OF THE SUBJECT SOFTWARE, RECIPIENT SHALL INDEMNIFY AND HOLD HARMLESS THE
UNITED STATES GOVERNMENT, ITS CONTRACTORS AND SUBCONTRACTORS, AS WELL AS ANY
PRIOR RECIPIENT, TO THE EXTENT PERMITTED BY LAW.  RECIPIENT'S SOLE REMEDY FOR
ANY SUCH MATTER SHALL BE THE IMMEDIATE, UNILATERAL TERMINATION OF THIS
AGREEMENT.
'''

import numpy as np
import os
import pickle
import pymc

from copy import copy
from step_methods import DelayedRejectionAdaptiveMetropolis
from step_methods import SMC_Metropolis
from scipy.optimize import minimize
from scipy.stats import gaussian_kde


[docs]class MCMCSampler: ''' Class for MCMC sampling; based on PyMC. Uses Bayesian inference, MCMC, and a model to estimate parameters with quantified uncertainty based on a set of observations. ''' def __init__(self, data, model, params, working_dir='./', storage_backend='pickle'): ''' Set data and model class member variables, set working directory, and choose storage backend. :param data: data to use to inform MCMC parameter estimation; should be same type/shape/size as output of model. :type data: array_like :param model: model for which parameters are being estimated via MCMC; should return output in same type/shape/size as data. A baseclass exists in the Model module that is recommended to define the model object; i.e., model.__bases__ == <class model.Model.Model>,) :type model: object :param params: map where keys are the unknown parameter names (string) and values are lists that define the prior distribution of the parameter [dist name, dist. arg #1, dist. arg #2, etc.]. The distribution arguments are defined in the PyMC documentation: https://pymc-devs.github.io/pymc/. :type params: dict :storage_backend: determines which format to store mcmc data, see self.avail_backends for a list of options. :type storage_backend: str ''' self.params = params self.model = model self.working_dir = working_dir # Check for valid storage backend self.avail_backends = ['pickle', 'ram', 'no_trace', 'txt', 'sqlite', 'hdf5'] if storage_backend not in self.avail_backends: msg = 'invalid backend option for storage: %s.' % storage_backend raise NameError(msg) else: self.storage_backend = storage_backend if storage_backend == 'pickle': self.loader = pymc.database.pickle self.backend_ext = '.p' elif storage_backend == 'ram': self.loader = pymc.database.ram elif storage_backend == 'no_trace': self.loader = pymc.database.no_trace elif storage_backend == 'txt': self.loader = pymc.database.txt self.backend_ext = '.txt' elif storage_backend == 'sqlite': self.loader = pymc.database.sqlite self.backend_ext = '.db' elif storage_backend == 'hdf5': self.loader = pymc.database.hdf5 self.backend_ext = '.h5' # verify working dir exists self._verify_working_dir() # Number of data points and parameters, respectively self.n = len(data) self.p = len(params) # Verify data format - for now, must be a list of crack lengths self.data = self._verify_data_format(data) # TODO: allow for samples # Initialize pymc model and MCMC database self.pymc_mod = None self.db = None
[docs] def fit(self, q0=None, plot_residuals=False, plot_fit=False, opt_method='L-BFGS-B', repeats=1, save_results=False, fname='opt_results.p'): ''' Fits the deterministic model given in self.model to the data in self.data using ordinary least squares regression. Returns a parameter map containing the optimized model parameters and the associated sum of squared error. The optional input, q0, should be an initial guess for the parameters in the form of a parameter map (dict). The optional input repeats dictates the number of times the optimization process is repeated, each time using the previous result as the new q0. ''' # check if KDE distribution is in parameters if any(['KDE' in value for value in self.params.values()]): raise ValueError('KDE type distributions cannot be used with fit()') # ensure repeats >= 1 if repeats < 1: raise ValueError('repeats must be >= 1.') # labels are alphabetically sorted parameter keys labels = sorted(self.params.keys(), key=str.lower) # limits are defined by the prior distribution in self. limits = self._get_limits() # bounds are 0 to 1 for all normalized parameters bounds = tuple((0.0, 1.0) for i in labels) # Initial guess if q0 == None: # if none, iterate over labels, and use midpoint of bounds qnorm = [0.5 for key in labels] else: # Normalize initial guess: qnorm = self._true2norm(q0, labels, limits) for i in xrange(repeats): # Ordinary least squares regression res = minimize(self._ols, qnorm, method=opt_method, args=(labels, limits), bounds=bounds) qnorm = res.x ssq_opt = res.fun print res # set optimized result qnorm_opt = qnorm # unNormalize q_opt = self._norm2true(qnorm_opt, labels, limits) print 'q_opt = %s' % q_opt print 'ssq_opt = %s' % ssq_opt # plot residuals if requested if plot_residuals == True: self.plot_residuals(self.model, q_opt, self.data, self.working_dir+'/residuals.png') if plot_fit == True: x = np.tile(np.arange(len(self.data)), [2, 1]).transpose() y = self.model.evaluate(q_opt) y_len = len(y) d_len = len(self.data) self.plot_data(x, np.c_[y, self.data], styles=['-']*y_len+['']*d_len, colors=['k']*y_len+['r']*d_len, markers=['']*y_len+['o']*d_len, xlabel='data index', ylabel='data/fit variable', fname=self.working_dir+'/fit.png') # save results if requested if save_results == True: with open(os.path.join(self.working_dir, fname), 'w') as orf: pickle.dump({'q_opt': q_opt, 'ssq_opt': ssq_opt}, orf) return q_opt, ssq_opt
[docs] def pymcplot(self): ''' Generates a pymc plot for each parameter in self.. This plot includes a trace, histogram, and autocorrelation plot. For more control over the plots, see MCMCplots module. This is meant as a diagnostic tool only. ''' from pymc.Matplot import plot as pymc_plot for name in self.params.keys(): pymc_plot(self.db.trace(name), format='png', path=self.working_dir) print 'pymc plots generated for = %s' % self.params.keys()
[docs] def sample(self, num_samples, burnin, step_method='adaptive', interval=1000, delay=0, tune_throughout=False, scales=None, cov=None, thin=1, phi=None, verbose=0): ''' Initiates MCMC sampling of posterior distribution using the model defined using the generate_pymc_model method. Sampling is conducted using the PyMC module. Parameters are as follows: - num_samples : number of samples to draw (int) - burnin : number of samples for burn-in (int) - adaptive : toggles adaptive metropolis sampling (bool) - step_method : step method for sampling; options are: o adaptive - regular adaptive metropolis o DRAM - delayed rejection adaptive metropolis o metropolis - standard metropolis aglorithm - interval : defines frequency of covariance updates (only applicable to adaptive methods) - delay : how long before first cov update occurs (only applicable to adaptive methods) - tune_throughout : True > tune proposal covariance even after burnin, else only tune proposal covariance during burn in - scales : scale factors for the diagonal of the multivariate normal proposal distribution; must be dictionary with keys = .keys() and values = scale for that param. - phi : cooling step; only used for SMC sampler ''' # Check if pymc_mod is defined if self.pymc_mod == None: raise NameError('Cannot sample; self.pymc_mod not defined!') # Setup pymc sampler (set to output results as pickle file if self.storage_backend != 'ram': dbname = self.working_dir+'/mcmc'+self.backend_ext else: dbname = None self.MCMC = pymc.MCMC(self.pymc_mod, db=self.storage_backend, dbname=dbname) #TODO: dbmode option # Set as random variables (stochastics) # TODO rewrite using .index(param name) parameter_RVs = [self.pymc_mod[i] for i in xrange(self.p)] # Set scales if scales != None: if len(scales) != len(parameter_RVs): err_msg = 'len(scales) must be equal to num ' raise ValueError(err_msg) scales = {rv: scales[rv.__name__] for rv in parameter_RVs} else: scales = None # assign step method if step_method.lower() == 'adaptive': print 'adaptation delay = %s' % delay print 'adaptation interval = %s' % interval self.MCMC.use_step_method(pymc.AdaptiveMetropolis, parameter_RVs, shrink_if_necessary=True, interval=interval, delay=delay, scales=scales, cov=cov, verbose=verbose) elif step_method.lower() == 'dram': print 'adaptation delay = %s' % delay print 'adaptation interval = %s' % interval self.MCMC.use_step_method(DelayedRejectionAdaptiveMetropolis, parameter_RVs, shrink_if_necessary=True, interval=interval, delay=delay, scales=scales, cov=cov, verbose=verbose) elif step_method.lower() == 'smc_metropolis': self.MCMC.use_step_method(SMC_Metropolis, parameter_RVs, cov=cov, phi=phi, verbose=verbose) elif step_method.lower() == 'metropolis': # if no scales provided, if scales is None: scales = dict() for rv in parameter_RVs: scales[rv] = abs(rv.value/10.) # set Metropolis step method for each random variable for RV, scale in scales.iteritems(): self.MCMC.use_step_method(pymc.Metropolis, RV, scale=scale, verbose=verbose) else: raise KeyError('Unknown step method passed to sample()') # sample if tune_throughout: print 'tuning proposal distribution throughout mcmc' self.MCMC.sample(num_samples, burnin, tune_throughout=tune_throughout, thin=thin, verbose=verbose) self.MCMC.db.commit()
[docs] def generate_pymc_model(self, q0=None, ssq0=None, std_dev0=None, fix_var=False, model_output_stored=False): ''' PyMC stochastic model generator that uses the parameter dictionary, self. and optional inputs: - q0 : a dictionary of initial values corresponding to keys in - std_dev0 : an estimate of the initial standard deviation - ssq0 : the sum of squares error using q0 and self.data. Only used if initial var, var0, is None. - fixed_var : determines whether or not variance will be sampled (i.e., fixed_var == False) or fixed. ''' # Set up pymc objects from self. parents, pymc_mod, pymc_mod_order = self.generate_pymc_(self, q0) # Assign std_dev as random variable or fix if fix_var == True and std_dev0 != None: precision = 1./std_dev0**2 pymc_mod_addon = [] pymc_mod_order_addon = [] elif fix_var == True and std_dev0 == None and ssq0 == None: raise ValueError('If fix_var == True, standard deviation, std_dev0', ' or ssq0 must be specified!') elif fix_var == True and ssq0 != None and std_dev0 == None: std_dev0 = np.sqrt(ssq0/(self.n-self.p)) print 'Variance fixed at an estimated value of %s' %std_dev0**2 precision = 1./std_dev0**2 pymc_mod_addon = [] pymc_mod_order_addon = [] else: print 'Standard deviation will be sampled via MCMC' std_dev = pymc.Uniform('std_dev', lower=0, upper=1000) # Since variance will be sampled, need to set initial value # if ssq0 provided, calculated std_dev0 if std_dev0 == None and ssq0 != None: if self.n <= self.p: raise TypeError('Length of data <= number of parameters! ', 'Variance approximation is invalid. More ', 'data is required, or an initial std dev ', 'should be given (can be None).') std_dev0 = np.sqrt(ssq0/(self.n-self.p)) # if ssq0 and std_dev0 not provided, set default value elif std_dev0 == None and ssq0 == None: std_dev0 = 0.1 else: print 'Setting initial estimate of std_dev = %s' %std_dev0 std_dev.value = std_dev0 # assign variance as random variable or fixed value @pymc.deterministic(plot=False) def var(std_dev=std_dev): return std_dev**2 # Set up precision random variable @pymc.deterministic(plot=False) def precision(var=var): return 1.0/var # add to pymc_mod pymc_mod_addon = [precision, std_dev, var] # var must be last pymc_mod_order_addon = ['precision', 'std_dev', 'var'] # Define deterministic model model = pymc.Deterministic(self.model.evaluate, name='model_RV', doc='model_RV', parents=parents, trace=model_output_stored, plot=False) # Posterior (random variable, normally distributed, centered at model) results = pymc.Normal('results', mu=model, tau=precision, value=self.data, observed=True) # Assemble model and return pymc_mod += [model, results] + pymc_mod_addon # var is last pymc_mod_order += ['model', 'results'] + pymc_mod_order_addon self.pymc_mod = pymc_mod self.pymc_mod_order = pymc_mod_order
[docs] def generate_pymc_(self, params, q0=None): ''' Creates PyMC objects for each param in dictionary NOTE: the second argument for normal distributions is VARIANCE Prior option: An arbitrary prior distribution derived from a set of samples (e.g., a previous mcmc run) can be passed with the following syntax: = {<name> : ['KDE', <pymc_database>, <param_names>]} where <name> is the name of the distribution (e.g., 'prior' or 'joint_dist'), <pymc_database> is the pymc database containing the samples from which the prior distribution will be estimated, and <param_names> are the children parameter names corresponding to the dimension of the desired sample array. This method will use all samples of the Markov chain contained in <pymc_database> for all traces named in <param_names>. Gaussian kernel-density estimation is used to derive the joint parameter distribution, which is then treated as a prior in subsequent mcmc analyses using the current class instance. The parameters named in <param_names> will be traced as will the multivariate distribution named <name>. ''' pymc_mod = [] pymc_mod_order = [] parents = dict() # Iterate through , assign prior distributions for key, args in self.params.iteritems(): # Distribution name should be first entry in [key] dist = args[0] if dist == 'Normal': if q0 == None: RV = [pymc.Normal(key, mu=args[1], tau=1/args[2])] else: RV = [pymc.Normal(key, mu=args[1], tau=1/args[2], value=q0[key])] elif dist == 'Uniform': if q0 == None: RV = [pymc.Uniform(key, lower=args[1], upper=args[2])] else: RV = [pymc.Uniform(key, lower=args[1], upper=args[2], value=q0[key])] elif dist == 'DiscreteUniform': if q0 == None: RV = [pymc.DiscreteUniform(key, lower=args[1], upper=args[2])] else: RV = [pymc.DiscreteUniform(key, lower=args[1], upper=args[2], value=q0[key])] elif dist == 'TruncatedNormal': if q0 == None: RV = [pymc.TruncatedNormal(key, mu=args[1], tau=1/args[2], a=args[3], b=args[4])] else: RV = [pymc.TruncatedNormal(key, mu=args[1], tau=1/args[2], a=args[3], b=args[4], value=q0[key])] elif dist == 'KDE': kde = multivariate_kde_from_samples(args[1], args[2]) kde_rv, rvs = self._create_kde_stochastic(kde, key, args[2]) if q0 != None: kde_rv.value = q0 RV = [kde_rv] for rv_key, rv_value in rvs.iteritems(): parents[rv_key] = rv_value RV.append(rv_value) else: raise KeyError('The distribution "'+dist+'" is not supported.') parents[key] = RV[0] pymc_mod_order.append(key) pymc_mod += RV return parents, pymc_mod, pymc_mod_order
[docs] def save_model(self, fname='model.p'): ''' Saves model in pickle file with name working_dir + fname. ''' # store model model = {'model':self.model} # dump with open(self.working_dir+'/'+fname, 'w') as f: pickle.dump(model, f)
def _verify_working_dir(self): ''' Ensure specified working directory exists. ''' if not os.path.isdir(self.working_dir): print 'Working directory does not exist; creating...' os.mkdir(self.working_dir) self.working_dir = os.path.realpath(self.working_dir) def _verify_data_format(self, data): ''' Ensures that data is a single list. ''' # For now, data should be a list of floats (e.g., of crack lengths) if type(data) == list or type(data) == np.ndarray: return np.array(data) else: raise TypeError('Data must be a single list of floats.') def _check_mcmc_database(self, mcmc_database): ''' Checks whether a PyMC database is already loaded. If mcmc_database is given (ie not None), then it will override the currently loaded database. If neither exist, raise IOError. ''' # Check if MCMC database is already defined, load from mcmc_database if mcmc_database != None: # throw warning if overwriting self.db if self.db != None: raise Warning('Overwriting currently loaded MCMC database!') # overwrite / load if os.path.isfile(mcmc_database): self.db = self.loader.load(mcmc_database) else: raise IOError('MCMC database file '+mcmc_database+\ ' does not exist!') elif self.db == None and mcmc_database == None: raise IOError('MCMC database cannot be found: either self.db must' ' be defined (using the sample() method), or the arg' ' mcmc_database must be passed to ict().') def _create_kde_stochastic(self, kde, kde_name, param_names): ''' Creates custom pymc stochastic object based on a multivariate kernel density estimate (kde should be kde object from scipy). ''' # build kde stochastic logp = lambda value: kde.logpdf(value) random = lambda value: kde.resample(1).flatten() KDE = pymc.Stochastic(logp = logp, doc = 'multivariate KDE', value=random(0), name=kde_name, parents=dict(), random=random, trace=True, dtype=float, observed=False, plot=True) # build deterministics dependent on kde stochastic rvs = dict() eval_func_dict = dict() for i, pn in enumerate(param_names): eval_func_dict[pn] = lambda i=i, **kwargs: kwargs[kde_name][i] rvs[pn] = pymc.Deterministic(eval=eval_func_dict[pn], name=pn, parents={kde_name:KDE}, doc='model param %s' % pn, trace=True, dtype=float, plot=True) return KDE, rvs def _get_limits(self): ''' Determines parameter limits based on prior distribution in self.. ''' limits = dict() for key, val in self.params.iteritems(): dist = val[0] if dist == 'Uniform': limits[key] = [val[1], val[2]] elif dist == 'Normal': # use +/- 6 std deviations limits[key] = [val[1]-6*np.sqrt(1/val[2])] elif dist == 'TruncatedNormal': limits[key] = [val[3], val[4]] elif dist == 'DiscreteUniform': limits[key] = [val[1], val[2]] elif dist == 'KDE': limits[key] = [None, None] else: raise KeyError('The distribution "'+val[0]+'" is not supported') return limits def _ols(self, qnorm, labels, limits): ''' Least squares function ''' q = self._norm2true(qnorm, labels, limits) f = np.array(self.model.evaluate(q)) v = np.array(self.data) ssq = np.linalg.norm(v-f)**2 print 'DEBUG: this is in MCMC._ols: ssq = %s' % ssq return ssq def _true2norm(self, q, labels, limits): ''' Normalizes parameters on [0,1]. q is paramMap dictionary. qnorm is list, in alphabetical order by key. ''' qnorm = [] # step through q dictionary in alphabetical order: for key in labels: a, b = limits[key] qnorm += [(q[key]-a)/(b-a)] return qnorm def _norm2true(self, qnorm, labels, limits): ''' Translates parameter values back from normalized [0,1] list to paramMap dictionary. ''' q = dict() for i,key in enumerate(labels): a, b = limits[key] q[key] = qnorm[i]*(b-a)+a return q @staticmethod def _raise_missing_trace_error(*args, **kwargs): raise ValueError('no trace to plot; use sample() first')
# HELPFUL FUNCTIONS: def multivariate_kde_from_samples(prior_pymc_db, param_names): ''' Kernel density estimation of an arbitrary, n-dimensional distribution using gaussian kernels. Returns a custom pymc object representing this estimated distribution that can be used as a prior for mcmc. Only inputs required are a pymc database object containing the param markov chain and a list of parameter names to extract from the database. A trace will be extracted for each parameter given and a multivariate distribution will be fit using KDE to the joint traces. ''' samples = [prior_pymc_db.trace(pn)[:] for pn in param_names] samples = np.array(samples) return gaussian_kde(samples) def thin_mcmc_trace(mcmc_filename, thin): ''' Thins an existing pymc chain object with a step size <thin>. ''' mcmc = _load_mcmc_file_as_pickle(mcmc_filename) for key in mcmc.keys(): if key != '_state_': mcmc[key][0] = mcmc[key][0][0::thin] # assume 1 trace per key save_dir = os.path.split(mcmc_filename)[0] _save_thinned_mcmc_obj(mcmc, save_dir) return None def _load_mcmc_file_as_pickle(mcmc_filename): try: with open(mcmc_filename, 'r') as pf: mcmc = pickle.load(pf) except IOError: raise IOError('mcmc file %s does not exist!' % mcmc_filename) return mcmc def _save_thinned_mcmc_obj(mcmc, working_dir): thinned_path = os.path.join(working_dir, 'thinned_mcmc.p') with open(thinned_path, 'w') as pf: pickle.dump(mcmc, pf)