Package Bio :: Package SearchIO :: Package HmmerIO :: Module hmmer2_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.HmmerIO.hmmer2_text

  1  # Copyright 2012 by Kai Blin. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for HMMER 2 text output.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._utils import read_forward 
 12  from Bio.Alphabet import generic_protein 
 13  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 14   
 15  from ._base import _BaseHmmerTextIndexer 
 16   
 17  __all__ = ['Hmmer2TextParser', 'Hmmer2TextIndexer'] 
 18   
 19  _HSP_ALIGN_LINE = re.compile(r'(\S+):\s+domain (\d+) of (\d+)') 
 20   
 21   
22 -class _HitPlaceholder(object):
23 - def createHit(self, hsp_list):
24 hit = Hit(hsp_list) 25 hit.id_ = self.id_ 26 hit.evalue = self.evalue 27 hit.bitscore = self.bitscore 28 if self.description: 29 hit.description = self.description 30 hit.domain_obs_num = self.domain_obs_num 31 return hit
32 33
34 -class Hmmer2TextParser(object):
35 """Iterator for the HMMER 2.0 text output.""" 36
37 - def __init__(self, handle):
38 self.handle = handle 39 self.buf = [] 40 self._meta = self.parse_preamble()
41
42 - def __iter__(self):
43 for qresult in self.parse_qresult(): 44 qresult.program = self._meta.get('program') 45 qresult.target = self._meta.get('target') 46 qresult.version = self._meta.get('version') 47 yield qresult
48
49 - def read_next(self, rstrip=True):
50 """Return the next non-empty line, trailing whitespace removed""" 51 if len(self.buf) > 0: 52 return self.buf.pop() 53 self.line = self.handle.readline() 54 while self.line and rstrip and not self.line.strip(): 55 self.line = self.handle.readline() 56 if self.line: 57 if rstrip: 58 self.line = self.line.rstrip() 59 return self.line
60
61 - def push_back(self, line):
62 """Un-read a line that should not be parsed yet""" 63 self.buf.append(line)
64
65 - def parse_key_value(self):
66 """Parse key-value pair separated by colon (:)""" 67 key, value = self.line.split(':', 1) 68 return key.strip(), value.strip()
69
70 - def parse_preamble(self):
71 """Parse HMMER2 preamble.""" 72 meta = {} 73 state = "GENERIC" 74 while self.read_next(): 75 if state == "GENERIC": 76 if self.line.startswith('hmm'): 77 meta['program'] = self.line.split('-')[0].strip() 78 elif self.line.startswith('HMMER is'): 79 continue 80 elif self.line.startswith('HMMER'): 81 meta['version'] = self.line.split()[1] 82 elif self.line.count('-') == 36: 83 state = "OPTIONS" 84 continue 85 86 assert state == "OPTIONS" 87 assert 'program' in meta 88 89 if self.line.count('-') == 32: 90 break 91 92 key, value = self.parse_key_value() 93 if meta['program'] == 'hmmsearch': 94 if key == 'Sequence database': 95 meta['target'] = value 96 continue 97 elif meta['program'] == 'hmmpfam': 98 if key == 'HMM file': 99 meta['target'] = value 100 continue 101 meta[key] = value 102 103 return meta
104
105 - def parse_qresult(self):
106 """Parse a HMMER2 query block.""" 107 while self.read_next(): 108 if not self.line.startswith('Query'): 109 raise StopIteration() 110 _, id_ = self.parse_key_value() 111 self.qresult = QueryResult(id=id_) 112 113 description = None 114 115 while self.read_next() and not self.line.startswith('Scores'): 116 if self.line.startswith('Accession'): 117 self.qresult.accession = self.parse_key_value()[1] 118 if self.line.startswith('Description'): 119 description = self.parse_key_value()[1] 120 121 hit_placeholders = self.parse_hits() 122 if len(hit_placeholders) > 0: 123 self.parse_hsps(hit_placeholders) 124 self.parse_hsp_alignments() 125 126 while not self.line.startswith('Query'): 127 self.read_next() 128 if not self.line: 129 break 130 self.buf.append(self.line) 131 132 if description is not None: 133 self.qresult.description = description 134 yield self.qresult
135
136 - def parse_hits(self):
137 """Parse a HMMER2 hit block, beginning with the hit table.""" 138 139 hit_placeholders = [] 140 while self.read_next(): 141 if self.line.startswith('Parsed'): 142 break 143 if self.line.find('no hits') > -1: 144 break 145 146 if self.line.startswith('Sequence') or \ 147 self.line.startswith('Model') or \ 148 self.line.startswith('-------- '): 149 continue 150 151 fields = self.line.split() 152 id_ = fields.pop(0) 153 domain_obs_num = int(fields.pop()) 154 evalue = float(fields.pop()) 155 bitscore = float(fields.pop()) 156 description = ' '.join(fields).strip() 157 158 159 hit = _HitPlaceholder() 160 hit.id_ = id_ 161 hit.evalue = evalue 162 hit.bitscore = bitscore 163 hit.description = description 164 hit.domain_obs_num = domain_obs_num 165 hit_placeholders.append(hit) 166 167 return hit_placeholders
168
169 - def parse_hsps(self, hit_placeholders):
170 """Parse a HMMER2 hsp block, beginning with the hsp table.""" 171 # HSPs may occur in different order than the hits 172 # so store Hit objects separately first 173 unordered_hits = {} 174 while self.read_next(): 175 if self.line.startswith('Alignments') or \ 176 self.line.startswith('Histogram') or \ 177 self.line == '//': 178 break 179 if self.line.startswith('Model') or \ 180 self.line.startswith('Sequence') or \ 181 self.line.startswith('--------'): 182 continue 183 184 id_, domain, seq_f, seq_t, seq_compl, hmm_f, hmm_t, hmm_compl, \ 185 score, evalue = self.line.split() 186 187 frag = HSPFragment(id_, self.qresult.id) 188 frag.alphabet = generic_protein 189 if self._meta['program'] == 'hmmpfam': 190 frag.hit_start = int(hmm_f) - 1 191 frag.hit_end = int(hmm_t) 192 frag.query_start = int(seq_f) - 1 193 frag.query_end = int(seq_t) 194 elif self._meta['program'] == 'hmmsearch': 195 frag.query_start = int(hmm_f) - 1 196 frag.query_end = int(hmm_t) 197 frag.hit_start = int(seq_f) - 1 198 frag.hit_end = int(seq_t) 199 200 hsp = HSP([frag]) 201 hsp.evalue = float(evalue) 202 hsp.bitscore = float(score) 203 hsp.domain_index = int(domain.split('/')[0]) 204 if self._meta['program'] == 'hmmpfam': 205 hsp.hit_endtype = hmm_compl 206 hsp.query_endtype = seq_compl 207 elif self._meta['program'] == 'hmmsearch': 208 hsp.query_endtype = hmm_compl 209 hsp.hit_endtype = seq_compl 210 211 if id_ not in unordered_hits: 212 placeholder = [ p for p in hit_placeholders if p.id_ == id_][0] 213 hit = placeholder.createHit([hsp]) 214 unordered_hits[id_] = hit 215 else: 216 hit = unordered_hits[id_] 217 hsp.hit_description = hit.description 218 hit.append(hsp) 219 220 # The placeholder list is in the correct order, so use that order for 221 # the Hit objects in the qresult 222 for p in hit_placeholders: 223 self.qresult.append(unordered_hits[p.id_])
224
225 - def parse_hsp_alignments(self):
226 """Parse a HMMER2 HSP alignment block.""" 227 if not self.line.startswith('Alignments'): 228 return 229 230 while self.read_next(): 231 if self.line == '//' or self.line.startswith('Histogram'): 232 break 233 234 match = re.search(_HSP_ALIGN_LINE, self.line) 235 if match is None: 236 continue 237 238 id_ = match.group(1) 239 idx = int(match.group(2)) 240 num = int(match.group(3)) 241 242 hit = self.qresult[id_] 243 if hit.domain_obs_num != num: 244 continue 245 246 frag = hit[idx-1][0] 247 248 hmmseq = '' 249 consensus = '' 250 otherseq = '' 251 structureseq = '' 252 pad = 0 253 while self.read_next() and self.line.startswith(' '): 254 # if there's structure information, parse that 255 if self.line[16:18] == 'CS': 256 structureseq += self.line[19:].strip() 257 258 if not self.read_next(): 259 break 260 261 # skip the *-> start marker if it exists 262 if self.line[19] == '*': 263 seq = self.line[22:] 264 pad = 3 265 else: 266 seq = self.line[19:] 267 pad = 0 268 269 # get rid of the end marker 270 if seq.endswith('<-*'): 271 seq = seq[:-3] 272 273 hmmseq += seq 274 line_len = len(seq) 275 if not self.read_next(rstrip=False): 276 break 277 consensus += self.line[19+pad:19+pad+line_len] 278 # If there's no consensus sequence, hmmer2 doesn't 279 # bother to put spaces here, so add extra padding 280 extra_padding = len(hmmseq) - len(consensus) 281 consensus += ' ' * extra_padding 282 283 if not self.read_next(): 284 break 285 otherseq += self.line[19:].split()[0].strip() 286 287 self.push_back(self.line) 288 289 # add similarity sequence to annotation 290 frag.aln_annotation['similarity'] = consensus 291 292 # if there's structure information, add it to the fragment 293 if structureseq: 294 frag.aln_annotation['CS'] = structureseq 295 296 if self._meta['program'] == 'hmmpfam': 297 frag.hit = hmmseq 298 frag.query = otherseq 299 else: 300 frag.hit = otherseq 301 frag.query = hmmseq
302 303
304 -class Hmmer2TextIndexer(_BaseHmmerTextIndexer):
305 306 """Indexer for hmmer2-text format.""" 307 308 _parser = Hmmer2TextParser 309 qresult_start = _as_bytes('Query') 310 # qresults_ends for hmmpfam and hmmsearch 311 # need to anticipate both since hmmsearch have different query end mark 312 qresult_end = _as_bytes('//') 313
314 - def __iter__(self):
315 handle = self._handle 316 handle.seek(0) 317 start_offset = handle.tell() 318 regex_id = re.compile(_as_bytes(r'Query\s*(?:sequence|HMM)?:\s*(.*)')) 319 320 # determine flag for hmmsearch 321 is_hmmsearch = False 322 line = read_forward(handle) 323 if line.startswith(_as_bytes('hmmsearch')): 324 is_hmmsearch = True 325 326 while True: 327 end_offset = handle.tell() 328 329 if line.startswith(self.qresult_start): 330 regx = re.search(regex_id, line) 331 qresult_key = regx.group(1).strip() 332 # qresult start offset is the offset of this line 333 # (starts with the start mark) 334 start_offset = end_offset - len(line) 335 elif line.startswith(self.qresult_end): 336 yield _bytes_to_string(qresult_key), start_offset, 0 337 start_offset = end_offset 338 elif not line: 339 # HACK: since hmmsearch can only have one query result 340 if is_hmmsearch: 341 yield _bytes_to_string(qresult_key), start_offset, 0 342 break 343 344 line = read_forward(handle)
345 346 # if not used as a module, run the doctest 347 if __name__ == "__main__": 348 from Bio._utils import run_doctest 349 run_doctest() 350