1
2
3
4
5
6 """Deal with representations of Markov Models.
7 """
8
9 import copy
10 import math
11 import random
12
13
14
15
16 from Bio._py3k import range
17
18 from Bio.Seq import MutableSeq
19
20
22 """ Return an array of n random numbers, where the elements of the array sum
23 to 1.0"""
24 randArray = [random.random() for i in range(n)]
25 total = sum(randArray)
26 normalizedRandArray = [x/total for x in randArray]
27
28 return normalizedRandArray
29
30
32 """Calculate which symbols can be emitted in each state
33 """
34
35
36 emissions = dict()
37 for state, symbol in emission_probs:
38 try:
39 emissions[state].append(symbol)
40 except KeyError:
41 emissions[state] = [symbol]
42
43 return emissions
44
45
47 """Calculate which 'from transitions' are allowed for each state
48
49 This looks through all of the trans_probs, and uses this dictionary
50 to determine allowed transitions. It converts this information into
51 a dictionary, whose keys are source states and whose values are
52 lists of destination states reachable from the source state via a
53 transition.
54 """
55 transitions = dict()
56 for from_state, to_state in trans_probs:
57 try:
58 transitions[from_state].append(to_state)
59 except KeyError:
60 transitions[from_state] = [to_state]
61
62 return transitions
63
64
66 """Calculate which 'to transitions' are allowed for each state
67
68 This looks through all of the trans_probs, and uses this dictionary
69 to determine allowed transitions. It converts this information into
70 a dictionary, whose keys are destination states and whose values are
71 lists of source states from which the destination is reachable via a
72 transition.
73 """
74 transitions = dict()
75 for from_state, to_state in trans_probs:
76 try:
77 transitions[to_state].append(from_state)
78 except KeyError:
79 transitions[to_state] = [from_state]
80
81 return transitions
82
83
85 """Interface to build up a Markov Model.
86
87 This class is designed to try to separate the task of specifying the
88 Markov Model from the actual model itself. This is in hopes of making
89 the actual Markov Model classes smaller.
90
91 So, this builder class should be used to create Markov models instead
92 of trying to initiate a Markov Model directly.
93 """
94
95 DEFAULT_PSEUDO = 1
96
97 - def __init__(self, state_alphabet, emission_alphabet):
98 """Initialize a builder to create Markov Models.
99
100 Arguments:
101
102 o state_alphabet -- An alphabet containing all of the letters that
103 can appear in the states
104
105 o emission_alphabet -- An alphabet containing all of the letters for
106 states that can be emitted by the HMM.
107 """
108 self._state_alphabet = state_alphabet
109 self._emission_alphabet = emission_alphabet
110
111
112
113 self.initial_prob = {}
114
115
116
117 self.transition_prob = {}
118 self.emission_prob = self._all_blank(state_alphabet,
119 emission_alphabet)
120
121
122 self.transition_pseudo = {}
123 self.emission_pseudo = self._all_pseudo(state_alphabet,
124 emission_alphabet)
125
126 - def _all_blank(self, first_alphabet, second_alphabet):
127 """Return a dictionary with all counts set to zero.
128
129 This uses the letters in the first and second alphabet to create
130 a dictionary with keys of two tuples organized as
131 (letter of first alphabet, letter of second alphabet). The values
132 are all set to 0.
133 """
134 all_blank = {}
135 for first_state in first_alphabet.letters:
136 for second_state in second_alphabet.letters:
137 all_blank[(first_state, second_state)] = 0
138
139 return all_blank
140
141 - def _all_pseudo(self, first_alphabet, second_alphabet):
142 """Return a dictionary with all counts set to a default value.
143
144 This takes the letters in first alphabet and second alphabet and
145 creates a dictionary with keys of two tuples organized as:
146 (letter of first alphabet, letter of second alphabet). The values
147 are all set to the value of the class attribute DEFAULT_PSEUDO.
148 """
149 all_counts = {}
150 for first_state in first_alphabet.letters:
151 for second_state in second_alphabet.letters:
152 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
153
154 return all_counts
155
157 """Return the markov model corresponding with the current parameters.
158
159 Each markov model returned by a call to this function is unique
160 (ie. they don't influence each other).
161 """
162
163
164 if not self.initial_prob:
165 raise Exception("set_initial_probabilities must be called to " +
166 "fully initialize the Markov model")
167
168 initial_prob = copy.deepcopy(self.initial_prob)
169 transition_prob = copy.deepcopy(self.transition_prob)
170 emission_prob = copy.deepcopy(self.emission_prob)
171 transition_pseudo = copy.deepcopy(self.transition_pseudo)
172 emission_pseudo = copy.deepcopy(self.emission_pseudo)
173
174 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
175 transition_pseudo, emission_pseudo)
176
178 """Set initial state probabilities.
179
180 initial_prob is a dictionary mapping states to probabilities.
181 Suppose, for example, that the state alphabet is ['A', 'B']. Call
182 set_initial_prob({'A': 1}) to guarantee that the initial
183 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
184 to make each initial state equally probable.
185
186 This method must now be called in order to use the Markov model
187 because the calculation of initial probabilities has changed
188 incompatibly; the previous calculation was incorrect.
189
190 If initial probabilities are set for all states, then they should add up
191 to 1. Otherwise the sum should be <= 1. The residual probability is
192 divided up evenly between all the states for which the initial
193 probability has not been set. For example, calling
194 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
195 for the above example.
196 """
197 self.initial_prob = copy.copy(initial_prob)
198
199
200 for state in initial_prob:
201 assert state in self._state_alphabet.letters, \
202 "State %s was not found in the sequence alphabet" % state
203
204
205 num_states_not_set =\
206 len(self._state_alphabet.letters) - len(self.initial_prob)
207 if num_states_not_set < 0:
208 raise Exception("Initial probabilities can't exceed # of states")
209 prob_sum = sum(self.initial_prob.values())
210 if prob_sum > 1.0:
211 raise Exception("Total initial probability cannot exceed 1.0")
212 if num_states_not_set > 0:
213 prob = (1.0 - prob_sum) / num_states_not_set
214 for state in self._state_alphabet.letters:
215 if state not in self.initial_prob:
216 self.initial_prob[state] = prob
217
219 """Reset all probabilities to be an average value.
220
221 Resets the values of all initial probabilities and all allowed
222 transitions and all allowed emissions to be equal to 1 divided by the
223 number of possible elements.
224
225 This is useful if you just want to initialize a Markov Model to
226 starting values (ie. if you have no prior notions of what the
227 probabilities should be -- or if you are just feeling too lazy
228 to calculate them :-).
229
230 Warning 1 -- this will reset all currently set probabilities.
231
232 Warning 2 -- This just sets all probabilities for transitions and
233 emissions to total up to 1, so it doesn't ensure that the sum of
234 each set of transitions adds up to 1.
235 """
236
237
238 new_initial_prob = float(1) / float(len(self.transition_prob))
239 for state in self._state_alphabet.letters:
240 self.initial_prob[state] = new_initial_prob
241
242
243 new_trans_prob = float(1) / float(len(self.transition_prob))
244 for key in self.transition_prob:
245 self.transition_prob[key] = new_trans_prob
246
247
248 new_emission_prob = float(1) / float(len(self.emission_prob))
249 for key in self.emission_prob:
250 self.emission_prob[key] = new_emission_prob
251
253 """Set all initial state probabilities to a randomly generated distribution.
254 Returns the dictionary containing the initial probabilities.
255 """
256 initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
257 for state in self._state_alphabet.letters:
258 self.initial_prob[state] = initial_freqs.pop()
259
260 return self.initial_prob
261
263 """Set all allowed transition probabilities to a randomly generated distribution.
264 Returns the dictionary containing the transition probabilities.
265 """
266
267 if not self.transition_prob:
268 raise Exception("No transitions have been allowed yet. " +
269 "Allow some or all transitions by calling " +
270 "allow_transition or allow_all_transitions first.")
271
272 transitions_from = _calculate_from_transitions(self.transition_prob)
273 for from_state in transitions_from:
274 freqs = _gen_random_array(len(transitions_from[from_state]))
275 for to_state in transitions_from[from_state]:
276 self.transition_prob[(from_state, to_state)] = freqs.pop()
277
278 return self.transition_prob
279
281 """Set all allowed emission probabilities to a randomly generated
282 distribution. Returns the dictionary containing the emission
283 probabilities.
284 """
285
286 if not self.emission_prob:
287 raise Exception("No emissions have been allowed yet. " +
288 "Allow some or all emissions.")
289
290 emissions = _calculate_emissions(self.emission_prob)
291 for state in emissions:
292 freqs = _gen_random_array(len(emissions[state]))
293 for symbol in emissions[state]:
294 self.emission_prob[(state, symbol)] = freqs.pop()
295
296 return self.emission_prob
297
307
308
309
311 """A convenience function to create transitions between all states.
312
313 By default all transitions within the alphabet are disallowed; this
314 is a way to change this to allow all possible transitions.
315 """
316
317
318 all_probs = self._all_blank(self._state_alphabet,
319 self._state_alphabet)
320
321 all_pseudo = self._all_pseudo(self._state_alphabet,
322 self._state_alphabet)
323
324
325
326 for set_key in self.transition_prob:
327 all_probs[set_key] = self.transition_prob[set_key]
328
329 for set_key in self.transition_pseudo:
330 all_pseudo[set_key] = self.transition_pseudo[set_key]
331
332
333 self.transition_prob = all_probs
334 self.transition_pseudo = all_pseudo
335
336 - def allow_transition(self, from_state, to_state, probability=None,
337 pseudocount=None):
338 """Set a transition as being possible between the two states.
339
340 probability and pseudocount are optional arguments
341 specifying the probabilities and pseudo counts for the transition.
342 If these are not supplied, then the values are set to the
343 default values.
344
345 Raises:
346 KeyError -- if the two states already have an allowed transition.
347 """
348
349 for state in [from_state, to_state]:
350 assert state in self._state_alphabet.letters, \
351 "State %s was not found in the sequence alphabet" % state
352
353
354 if (from_state, to_state) not in self.transition_prob and \
355 (from_state, to_state) not in self.transition_pseudo:
356
357 if probability is None:
358 probability = 0
359 self.transition_prob[(from_state, to_state)] = probability
360
361
362 if pseudocount is None:
363 pseudcount = self.DEFAULT_PSEUDO
364 self.transition_pseudo[(from_state, to_state)] = pseudocount
365 else:
366 raise KeyError("Transition from %s to %s is already allowed."
367 % (from_state, to_state))
368
370 """Restrict transitions between the two states.
371
372 Raises:
373 KeyError if the transition is not currently allowed.
374 """
375 try:
376 del self.transition_prob[(from_state, to_state)]
377 del self.transition_pseudo[(from_state, to_state)]
378 except KeyError:
379 raise KeyError("Transition from %s to %s is already disallowed."
380 % (from_state, to_state))
381
383 """Set the probability of a transition between two states.
384
385 Raises:
386 KeyError if the transition is not allowed.
387 """
388 if (from_state, to_state) in self.transition_prob:
389 self.transition_prob[(from_state, to_state)] = probability
390 else:
391 raise KeyError("Transition from %s to %s is not allowed."
392 % (from_state, to_state))
393
395 """Set the default pseudocount for a transition.
396
397 To avoid computational problems, it is helpful to be able to
398 set a 'default' pseudocount to start with for estimating
399 transition and emission probabilities (see p62 in Durbin et al
400 for more discussion on this. By default, all transitions have
401 a pseudocount of 1.
402
403 Raises:
404 KeyError if the transition is not allowed.
405 """
406 if (from_state, to_state) in self.transition_pseudo:
407 self.transition_pseudo[(from_state, to_state)] = count
408 else:
409 raise KeyError("Transition from %s to %s is not allowed."
410 % (from_state, to_state))
411
412
413
415 """Set the probability of a emission from a particular state.
416
417 Raises:
418 KeyError if the emission from the given state is not allowed.
419 """
420 if (seq_state, emission_state) in self.emission_prob:
421 self.emission_prob[(seq_state, emission_state)] = probability
422 else:
423 raise KeyError("Emission of %s from %s is not allowed."
424 % (emission_state, seq_state))
425
427 """Set the default pseudocount for an emission.
428
429 To avoid computational problems, it is helpful to be able to
430 set a 'default' pseudocount to start with for estimating
431 transition and emission probabilities (see p62 in Durbin et al
432 for more discussion on this. By default, all emissions have
433 a pseudocount of 1.
434
435 Raises:
436 KeyError if the emission from the given state is not allowed.
437 """
438 if (seq_state, emission_state) in self.emission_pseudo:
439 self.emission_pseudo[(seq_state, emission_state)] = count
440 else:
441 raise KeyError("Emission of %s from %s is not allowed."
442 % (emission_state, seq_state))
443
444
446 """Represent a hidden markov model that can be used for state estimation.
447 """
448 - def __init__(self, initial_prob, transition_prob, emission_prob,
449 transition_pseudo, emission_pseudo):
450 """Initialize a Markov Model.
451
452 Note: You should use the MarkovModelBuilder class instead of
453 initiating this class directly.
454
455 Arguments:
456
457 o initial_prob - A dictionary of initial probabilities for all states.
458
459 o transition_prob -- A dictionary of transition probabilities for all
460 possible transitions in the sequence.
461
462 o emission_prob -- A dictionary of emission probabilities for all
463 possible emissions from the sequence states.
464
465 o transition_pseudo -- Pseudo-counts to be used for the transitions,
466 when counting for purposes of estimating transition probabilities.
467
468 o emission_pseudo -- Pseudo-counts to be used for the emissions,
469 when counting for purposes of estimating emission probabilities.
470 """
471
472 self.initial_prob = initial_prob
473
474 self._transition_pseudo = transition_pseudo
475 self._emission_pseudo = emission_pseudo
476
477 self.transition_prob = transition_prob
478 self.emission_prob = emission_prob
479
480
481
482
483 self._transitions_from = \
484 _calculate_from_transitions(self.transition_prob)
485
486
487
488
489 self._transitions_to = \
490 _calculate_to_transitions(self.transition_prob)
491
493 """Get the default transitions for the model.
494
495 Returns a dictionary of all of the default transitions between any
496 two letters in the sequence alphabet. The dictionary is structured
497 with keys as (letter1, letter2) and values as the starting number
498 of transitions.
499 """
500 return self._transition_pseudo
501
503 """Get the starting default emmissions for each sequence.
504
505 This returns a dictionary of the default emmissions for each
506 letter. The dictionary is structured with keys as
507 (seq_letter, emmission_letter) and values as the starting number
508 of emmissions.
509 """
510 return self._emission_pseudo
511
513 """Get all destination states to which there are transitions from the
514 state_letter source state.
515
516 This returns all letters which the given state_letter can transition
517 to. An empty list is returned if state_letter has no outgoing
518 transitions.
519 """
520 if state_letter in self._transitions_from:
521 return self._transitions_from[state_letter]
522 else:
523 return []
524
526 """Get all source states from which there are transitions to the
527 state_letter destination state.
528
529 This returns all letters which the given state_letter is reachable
530 from. An empty list is returned if state_letter is unreachable.
531 """
532 if state_letter in self._transitions_to:
533 return self._transitions_to[state_letter]
534 else:
535 return []
536
537 - def viterbi(self, sequence, state_alphabet):
538 """Calculate the most probable state path using the Viterbi algorithm.
539
540 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
541 al for a full explanation -- this is where I took my implementation
542 ideas from), to allow decoding of the state path, given a sequence
543 of emissions.
544
545 Arguments:
546
547 o sequence -- A Seq object with the emission sequence that we
548 want to decode.
549
550 o state_alphabet -- The alphabet of the possible state sequences
551 that can be generated.
552 """
553
554
555 log_initial = self._log_transform(self.initial_prob)
556 log_trans = self._log_transform(self.transition_prob)
557 log_emission = self._log_transform(self.emission_prob)
558
559 viterbi_probs = {}
560 pred_state_seq = {}
561 state_letters = state_alphabet.letters
562
563
564
565
566
567
568 for i in range(0, len(sequence)):
569
570 for cur_state in state_letters:
571
572 emission_part = log_emission[(cur_state, sequence[i])]
573
574 max_prob = 0
575 if i == 0:
576
577
578 max_prob = log_initial[cur_state]
579 else:
580
581 possible_state_probs = {}
582 for prev_state in self.transitions_to(cur_state):
583
584 trans_part = log_trans[(prev_state, cur_state)]
585
586
587 viterbi_part = viterbi_probs[(prev_state, i - 1)]
588 cur_prob = viterbi_part + trans_part
589
590 possible_state_probs[prev_state] = cur_prob
591
592
593 max_prob = max(possible_state_probs.values())
594
595
596 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
597
598 if i > 0:
599
600 for state in possible_state_probs:
601 if possible_state_probs[state] == max_prob:
602 pred_state_seq[(i - 1, cur_state)] = state
603 break
604
605
606
607
608 all_probs = {}
609 for state in state_letters:
610
611 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
612
613 state_path_prob = max(all_probs.values())
614
615
616 last_state = ''
617 for state in all_probs:
618 if all_probs[state] == state_path_prob:
619 last_state = state
620
621 assert last_state != '', "Didn't find the last state to trace from!"
622
623
624 traceback_seq = MutableSeq('', state_alphabet)
625
626 loop_seq = list(range(1, len(sequence)))
627 loop_seq.reverse()
628
629
630
631
632
633 state = last_state
634 traceback_seq.append(state)
635 for i in loop_seq:
636 state = pred_state_seq[(i - 1, state)]
637 traceback_seq.append(state)
638
639
640 traceback_seq.reverse()
641
642 return traceback_seq.toseq(), state_path_prob
643
667