Package Bio :: Package SearchIO :: Package HmmerIO :: Module hmmer2_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.HmmerIO.hmmer2_text

  1  # Copyright 2012 by Kai Blin. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for HMMER 2 text output.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._utils import read_forward 
 12  from Bio.Alphabet import generic_protein 
 13  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 14   
 15  from ._base import _BaseHmmerTextIndexer 
 16   
 17  __all__ = ['Hmmer2TextParser', 'Hmmer2TextIndexer'] 
 18   
 19  __docformat__ = "restructuredtext en" 
 20   
 21  _HSP_ALIGN_LINE = re.compile(r'(\S+):\s+domain (\d+) of (\d+)') 
 22   
 23   
24 -class _HitPlaceholder(object):
25 - def createHit(self, hsp_list):
26 hit = Hit(hsp_list) 27 hit.id_ = self.id_ 28 hit.evalue = self.evalue 29 hit.bitscore = self.bitscore 30 if self.description: 31 hit.description = self.description 32 hit.domain_obs_num = self.domain_obs_num 33 return hit
34 35
36 -class Hmmer2TextParser(object):
37 """Iterator for the HMMER 2.0 text output.""" 38
39 - def __init__(self, handle):
40 self.handle = handle 41 self.buf = [] 42 self._meta = self.parse_preamble()
43
44 - def __iter__(self):
45 for qresult in self.parse_qresult(): 46 qresult.program = self._meta.get('program') 47 qresult.target = self._meta.get('target') 48 qresult.version = self._meta.get('version') 49 yield qresult
50
51 - def read_next(self, rstrip=True):
52 """Return the next non-empty line, trailing whitespace removed""" 53 if len(self.buf) > 0: 54 return self.buf.pop() 55 self.line = self.handle.readline() 56 while self.line and rstrip and not self.line.strip(): 57 self.line = self.handle.readline() 58 if self.line: 59 if rstrip: 60 self.line = self.line.rstrip() 61 return self.line
62
63 - def push_back(self, line):
64 """Un-read a line that should not be parsed yet""" 65 self.buf.append(line)
66
67 - def parse_key_value(self):
68 """Parse key-value pair separated by colon (:)""" 69 key, value = self.line.split(':', 1) 70 return key.strip(), value.strip()
71
72 - def parse_preamble(self):
73 """Parse HMMER2 preamble.""" 74 meta = {} 75 state = "GENERIC" 76 while self.read_next(): 77 if state == "GENERIC": 78 if self.line.startswith('hmm'): 79 meta['program'] = self.line.split('-')[0].strip() 80 elif self.line.startswith('HMMER is'): 81 continue 82 elif self.line.startswith('HMMER'): 83 meta['version'] = self.line.split()[1] 84 elif self.line.count('-') == 36: 85 state = "OPTIONS" 86 continue 87 88 assert state == "OPTIONS" 89 assert 'program' in meta 90 91 if self.line.count('-') == 32: 92 break 93 94 key, value = self.parse_key_value() 95 if meta['program'] == 'hmmsearch': 96 if key == 'Sequence database': 97 meta['target'] = value 98 continue 99 elif meta['program'] == 'hmmpfam': 100 if key == 'HMM file': 101 meta['target'] = value 102 continue 103 meta[key] = value 104 105 return meta
106
107 - def parse_qresult(self):
108 """Parse a HMMER2 query block.""" 109 while self.read_next(): 110 if not self.line.startswith('Query'): 111 raise StopIteration() 112 _, id_ = self.parse_key_value() 113 self.qresult = QueryResult(id=id_) 114 115 description = None 116 117 while self.read_next() and not self.line.startswith('Scores'): 118 if self.line.startswith('Accession'): 119 self.qresult.accession = self.parse_key_value()[1] 120 if self.line.startswith('Description'): 121 description = self.parse_key_value()[1] 122 123 hit_placeholders = self.parse_hits() 124 if len(hit_placeholders) > 0: 125 self.parse_hsps(hit_placeholders) 126 self.parse_hsp_alignments() 127 128 while not self.line.startswith('Query'): 129 self.read_next() 130 if not self.line: 131 break 132 self.buf.append(self.line) 133 134 if description is not None: 135 self.qresult.description = description 136 yield self.qresult
137
138 - def parse_hits(self):
139 """Parse a HMMER2 hit block, beginning with the hit table.""" 140 141 hit_placeholders = [] 142 while self.read_next(): 143 if self.line.startswith('Parsed'): 144 break 145 if self.line.find('no hits') > -1: 146 break 147 148 if self.line.startswith('Sequence') or \ 149 self.line.startswith('Model') or \ 150 self.line.startswith('-------- '): 151 continue 152 153 fields = self.line.split() 154 id_ = fields.pop(0) 155 domain_obs_num = int(fields.pop()) 156 evalue = float(fields.pop()) 157 bitscore = float(fields.pop()) 158 description = ' '.join(fields).strip() 159 160 hit = _HitPlaceholder() 161 hit.id_ = id_ 162 hit.evalue = evalue 163 hit.bitscore = bitscore 164 hit.description = description 165 hit.domain_obs_num = domain_obs_num 166 hit_placeholders.append(hit) 167 168 return hit_placeholders
169
170 - def parse_hsps(self, hit_placeholders):
171 """Parse a HMMER2 hsp block, beginning with the hsp table.""" 172 # HSPs may occur in different order than the hits 173 # so store Hit objects separately first 174 unordered_hits = {} 175 while self.read_next(): 176 if self.line.startswith('Alignments') or \ 177 self.line.startswith('Histogram') or \ 178 self.line == '//': 179 break 180 if self.line.startswith('Model') or \ 181 self.line.startswith('Sequence') or \ 182 self.line.startswith('--------'): 183 continue 184 185 id_, domain, seq_f, seq_t, seq_compl, hmm_f, hmm_t, hmm_compl, \ 186 score, evalue = self.line.split() 187 188 frag = HSPFragment(id_, self.qresult.id) 189 frag.alphabet = generic_protein 190 if self._meta['program'] == 'hmmpfam': 191 frag.hit_start = int(hmm_f) - 1 192 frag.hit_end = int(hmm_t) 193 frag.query_start = int(seq_f) - 1 194 frag.query_end = int(seq_t) 195 elif self._meta['program'] == 'hmmsearch': 196 frag.query_start = int(hmm_f) - 1 197 frag.query_end = int(hmm_t) 198 frag.hit_start = int(seq_f) - 1 199 frag.hit_end = int(seq_t) 200 201 hsp = HSP([frag]) 202 hsp.evalue = float(evalue) 203 hsp.bitscore = float(score) 204 hsp.domain_index = int(domain.split('/')[0]) 205 if self._meta['program'] == 'hmmpfam': 206 hsp.hit_endtype = hmm_compl 207 hsp.query_endtype = seq_compl 208 elif self._meta['program'] == 'hmmsearch': 209 hsp.query_endtype = hmm_compl 210 hsp.hit_endtype = seq_compl 211 212 if id_ not in unordered_hits: 213 placeholder = [p for p in hit_placeholders if p.id_ == id_][0] 214 hit = placeholder.createHit([hsp]) 215 unordered_hits[id_] = hit 216 else: 217 hit = unordered_hits[id_] 218 hsp.hit_description = hit.description 219 hit.append(hsp) 220 221 # The placeholder list is in the correct order, so use that order for 222 # the Hit objects in the qresult 223 for p in hit_placeholders: 224 self.qresult.append(unordered_hits[p.id_])
225
226 - def parse_hsp_alignments(self):
227 """Parse a HMMER2 HSP alignment block.""" 228 if not self.line.startswith('Alignments'): 229 return 230 231 while self.read_next(): 232 if self.line == '//' or self.line.startswith('Histogram'): 233 break 234 235 match = re.search(_HSP_ALIGN_LINE, self.line) 236 if match is None: 237 continue 238 239 id_ = match.group(1) 240 idx = int(match.group(2)) 241 num = int(match.group(3)) 242 243 hit = self.qresult[id_] 244 if hit.domain_obs_num != num: 245 continue 246 247 frag = hit[idx-1][0] 248 249 hmmseq = '' 250 consensus = '' 251 otherseq = '' 252 structureseq = '' 253 pad = 0 254 while self.read_next() and self.line.startswith(' '): 255 # if there's structure information, parse that 256 if self.line[16:18] == 'CS': 257 structureseq += self.line[19:].strip() 258 259 if not self.read_next(): 260 break 261 262 # skip the *-> start marker if it exists 263 if self.line[19] == '*': 264 seq = self.line[22:] 265 pad = 3 266 else: 267 seq = self.line[19:] 268 pad = 0 269 270 # get rid of the end marker 271 if seq.endswith('<-*'): 272 seq = seq[:-3] 273 274 hmmseq += seq 275 line_len = len(seq) 276 if not self.read_next(rstrip=False): 277 break 278 consensus += self.line[19+pad:19+pad+line_len] 279 # If there's no consensus sequence, hmmer2 doesn't 280 # bother to put spaces here, so add extra padding 281 extra_padding = len(hmmseq) - len(consensus) 282 consensus += ' ' * extra_padding 283 284 if not self.read_next(): 285 break 286 otherseq += self.line[19:].split()[0].strip() 287 288 self.push_back(self.line) 289 290 # add similarity sequence to annotation 291 frag.aln_annotation['similarity'] = consensus 292 293 # if there's structure information, add it to the fragment 294 if structureseq: 295 frag.aln_annotation['CS'] = structureseq 296 297 if self._meta['program'] == 'hmmpfam': 298 frag.hit = hmmseq 299 frag.query = otherseq 300 else: 301 frag.hit = otherseq 302 frag.query = hmmseq
303 304
305 -class Hmmer2TextIndexer(_BaseHmmerTextIndexer):
306 307 """Indexer for hmmer2-text format.""" 308 309 _parser = Hmmer2TextParser 310 qresult_start = _as_bytes('Query') 311 # qresults_ends for hmmpfam and hmmsearch 312 # need to anticipate both since hmmsearch have different query end mark 313 qresult_end = _as_bytes('//') 314
315 - def __iter__(self):
316 handle = self._handle 317 handle.seek(0) 318 start_offset = handle.tell() 319 regex_id = re.compile(_as_bytes(r'Query\s*(?:sequence|HMM)?:\s*(.*)')) 320 321 # determine flag for hmmsearch 322 is_hmmsearch = False 323 line = read_forward(handle) 324 if line.startswith(_as_bytes('hmmsearch')): 325 is_hmmsearch = True 326 327 while True: 328 end_offset = handle.tell() 329 330 if line.startswith(self.qresult_start): 331 regx = re.search(regex_id, line) 332 qresult_key = regx.group(1).strip() 333 # qresult start offset is the offset of this line 334 # (starts with the start mark) 335 start_offset = end_offset - len(line) 336 elif line.startswith(self.qresult_end): 337 yield _bytes_to_string(qresult_key), start_offset, 0 338 start_offset = end_offset 339 elif not line: 340 # HACK: since hmmsearch can only have one query result 341 if is_hmmsearch: 342 yield _bytes_to_string(qresult_key), start_offset, 0 343 break 344 345 line = read_forward(handle)
346 347 # if not used as a module, run the doctest 348 if __name__ == "__main__": 349 from Bio._utils import run_doctest 350 run_doctest() 351