Coverage for /home/deng/Projects/metatree_drawer/treeprofiler_algo/pastml/pastml/models/HKYModel.py: 27%
89 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
1import logging
3import numpy as np
5from pastml.models import Model, ModelWithFrequencies
7HKY = 'HKY'
8HKY_STATES = np.array(['A', 'C', 'G', 'T'])
9A, C, G, T = 0, 1, 2, 3
10KAPPA = 'kappa'
13class HKYModel(ModelWithFrequencies):
15 def __init__(self, forest_stats, sf=None, frequencies=None, kappa=4, tau=0,
16 frequency_smoothing=False, optimise_tau=False, parameter_file=None, reoptimise=False, **kwargs):
17 self._kappa = None
18 self._optimise_kappa = True
19 kwargs['states'] = HKY_STATES
20 ModelWithFrequencies.__init__(self, forest_stats=forest_stats,
21 sf=sf, tau=tau, optimise_tau=optimise_tau, frequencies=frequencies,
22 frequency_smoothing=frequency_smoothing, reoptimise=reoptimise,
23 parameter_file=parameter_file, **kwargs)
24 if self._kappa is None:
25 self._kappa = kappa
26 self.name = HKY
28 @property
29 def kappa(self):
30 return self._kappa
32 @kappa.setter
33 def kappa(self, kappa):
34 if self._optimise_kappa:
35 self._kappa = kappa
36 else:
37 raise NotImplementedError('The kappa value is preset and cannot be changed.')
39 @Model.states.setter
40 def states(self, states):
41 raise NotImplementedError("The HKY model is only implemented for nucleotides: "
42 "the states are A, C, G, T and cannot be reset")
44 def get_Pij_t(self, t, *args, **kwargs):
45 """
46 Calculates the probability matrix of substitutions i->j over time t,
47 with HKY model [Hasegawa, Kishino and Yano 1985], given state frequencies and kappa.
48 Is only implemented for 4 nucleotide states.
50 :param t: time
51 :type t: float
52 :return: probability matrix
53 :rtype: numpy.ndarray
54 """
55 t = self.transform_t(t)
57 pi_a, pi_c, pi_g, pi_t = self.frequencies
58 pi_ag = pi_a + pi_g
59 pi_ct = pi_c + pi_t
60 beta = .5 / (pi_ag * pi_ct + self.kappa * (pi_a * pi_g + pi_c * pi_t))
62 exp_min_beta_t = np.exp(-beta * t)
63 exp_ct = np.exp(-beta * t * (1. + pi_ct * (self.kappa - 1.))) / pi_ct
64 exp_ag = np.exp(-beta * t * (1. + pi_ag * (self.kappa - 1.))) / pi_ag
65 ct_sum = (pi_ct + pi_ag * exp_min_beta_t) / pi_ct
66 ag_sum = (pi_ag + pi_ct * exp_min_beta_t) / pi_ag
67 p = np.ones((4, 4), dtype=np.float64) * (1 - exp_min_beta_t)
68 p *= self.frequencies
70 p[T, T] = pi_t * ct_sum + pi_c * exp_ct
71 p[T, C] = pi_c * ct_sum - pi_c * exp_ct
73 p[C, T] = pi_t * ct_sum - pi_t * exp_ct
74 p[C, C] = pi_c * ct_sum + pi_t * exp_ct
76 p[A, A] = pi_a * ag_sum + pi_g * exp_ag
77 p[A, G] = pi_g * ag_sum - pi_g * exp_ag
79 p[G, A] = pi_a * ag_sum - pi_a * exp_ag
80 p[G, G] = pi_g * ag_sum + pi_a * exp_ag
82 return p
84 def get_num_params(self):
85 """
86 Returns the number of optimized parameters for this model.
88 :return: the number of optimized parameters
89 """
90 return ModelWithFrequencies.get_num_params(self) + (1 if self._optimise_kappa else 0)
92 def set_params_from_optimised(self, ps, **kwargs):
93 """
94 Update this model parameter values from a vector representing parameters
95 for the likelihood optimization algorithm.
97 :param ps: np.array containing parameters of the likelihood optimization algorithm
98 :param kwargs: dict of eventual other arguments
99 :return: void, update this model
100 """
101 if self.extra_params_fixed():
102 Model.set_params_from_optimised(self, ps, **kwargs)
103 else:
104 ModelWithFrequencies.set_params_from_optimised(self, ps, **kwargs)
105 n_params = ModelWithFrequencies.get_num_params(self)
106 if self._optimise_kappa:
107 self.kappa = ps[n_params]
109 def get_optimised_parameters(self):
110 """
111 Converts this model parameters to a vector representing parameters
112 for the likelihood optimization algorithm.
114 :return: np.array containing parameters of the likelihood optimization algorithm
115 """
116 if self.extra_params_fixed():
117 return Model.get_optimised_parameters(self)
118 return np.hstack((ModelWithFrequencies.get_optimised_parameters(self),
119 [self.kappa] if self._optimise_kappa else []))
121 def get_bounds(self):
122 """
123 Get bounds for parameters for likelihood optimization algorithm.
125 :return: np.array containing lower and upper (potentially infinite) bounds for each parameter
126 """
127 if self.extra_params_fixed():
128 return Model.get_bounds(self)
129 return np.array((*ModelWithFrequencies.get_bounds(self),
130 *([np.array([1e-6, 20.])] if self._optimise_kappa else [])))
132 def parse_parameters(self, params, reoptimise=False):
133 """
134 Update this model's values from the input parameters.
135 For the HKY model, apart from the basic parameters (scaling factor and smoothing factor) and the frequencies
136 (see pastml.models.ModelWithFrequencies),
137 the input might contain an optional kappa value:
138 the key for kappa value is the pastml.models.HKYModel.KAPPA.
140 :param params: dict {key->value}
141 :param reoptimise: whether these model parameters should be treated as starting values (True)
142 or as fixed values (False)
143 :return: dict with parameter values (same as input)
144 """
145 params = ModelWithFrequencies.parse_parameters(self, params, reoptimise)
146 logger = logging.getLogger('pastml')
147 if KAPPA in params:
148 self._kappa = params[KAPPA]
149 try:
150 self._kappa = np.float64(self._kappa)
151 if self._kappa <= 0:
152 logger.error(
153 'Kappa cannot be negative, ignoring the value given in paramaters ({}).'.format(self._kappa))
154 self._kappa = None
155 else:
156 self._optimise_kappa = reoptimise
157 except:
158 logger.error('Kappa ({}) given in parameters is not float, ignoring it.'.format(self._kappa))
159 self._kappa = None
160 return params
162 def _print_parameters(self):
163 """
164 Constructs a string representing parameter values (to be used to logging).
166 :return: str representing parameter values
167 """
168 return '{}' \
169 '\tkappa\t{:.6f}\t{}\n'.format(ModelWithFrequencies.get_optimised_parameters(self),
170 self.kappa, '(optimised)' if self._optimise_kappa else '(fixed)')
172 def freeze(self):
173 """
174 Prohibit parameter optimization by setting all optimization flags to False.
176 :return: void
177 """
178 ModelWithFrequencies.freeze(self)
179 self._optimise_kappa = False
181 def save_parameters(self, filehandle):
182 """
183 Writes this model parameter values to the parameter file (in the same format as the input parameter file).
185 :param filehandle: filehandle for the file where the parameter values should be written.
186 :return: void
187 """
188 ModelWithFrequencies.save_parameters(self, filehandle)
189 filehandle.write('{}\t{:g}\n'.format(KAPPA, self.kappa))