Package Bio :: Package SearchIO :: Package HmmerIO :: Module hmmer3_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.HmmerIO.hmmer3_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for HMMER plain text output format.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._utils import read_forward 
 12  from Bio.Alphabet import generic_protein 
 13  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 14   
 15  from ._base import _BaseHmmerTextIndexer 
 16   
 17  __all__ = ['Hmmer3TextParser', 'Hmmer3TextIndexer'] 
 18   
 19   
 20  # precompile regex patterns for faster processing 
 21  # regex for program name capture 
 22  _RE_PROGRAM = re.compile(r'^# (\w*hmm\w+) :: .*$') 
 23  # regex for version string capture 
 24  _RE_VERSION = re.compile(r'# \w+ ([\w+\.]+) .*; http.*$') 
 25  # regex for option string capture 
 26  _RE_OPT = re.compile(r'^# (.+):\s+(.+)$') 
 27  # regex for parsing query id and length, for parsing 
 28  _QRE_ID_LEN_PTN = r'^Query:\s*(.*)\s+\[\w=(\d+)\]' 
 29  _QRE_ID_LEN = re.compile(_QRE_ID_LEN_PTN) 
 30  # regex for hsp validation 
 31  _HRE_VALIDATE = re.compile(r'score:\s(-?\d+\.?\d+)\sbits.*value:\s(.*)') 
 32  # regexes for parsing hsp alignment blocks 
 33  _HRE_ANNOT_LINE = re.compile(r'^(\s+)(.+)\s(\w+)') 
 34  _HRE_ID_LINE = re.compile(r'^(\s+\S+\s+[0-9-]+ )(.+?)(\s+[0-9-]+)') 
 35   
 36   
37 -class Hmmer3TextParser(object):
38 39 """Parser for the HMMER 3.0 text output.""" 40
41 - def __init__(self, handle):
42 self.handle = handle 43 self.line = read_forward(self.handle) 44 self._meta = self._parse_preamble()
45
46 - def __iter__(self):
47 for qresult in self._parse_qresult(): 48 yield qresult
49
50 - def _read_until(self, bool_func):
51 """Reads the file handle until the given function returns True.""" 52 while True: 53 if not self.line or bool_func(self.line): 54 return 55 else: 56 self.line = read_forward(self.handle)
57
58 - def _parse_preamble(self):
59 """Parses HMMER preamble (lines beginning with '#').""" 60 meta = {} 61 # bool flag for storing state ~ whether we are parsing the option 62 # lines or not 63 has_opts = False 64 while True: 65 # no pound sign means we've left the preamble 66 if not self.line.startswith('#'): 67 break 68 # dashes could either mean we are entering or leaving the options 69 # section ~ so it's a switch for the has_opts flag 70 elif '- - -' in self.line: 71 if not has_opts: 72 # if flag is false, that means we're entering opts 73 # so switch the flag accordingly 74 has_opts = True 75 else: 76 # if flag is true, that means we've reached the end of opts 77 # so we can break out of the function 78 break 79 elif not has_opts: 80 # try parsing program 81 regx = re.search(_RE_PROGRAM, self.line) 82 if regx: 83 meta['program'] = regx.group(1) 84 # try parsing version 85 regx = re.search(_RE_VERSION, self.line) 86 if regx: 87 meta['version'] = regx.group(1) 88 elif has_opts: 89 regx = re.search(_RE_OPT, self.line) 90 # if target in regx.group(1), then we store the key as target 91 if 'target' in regx.group(1): 92 meta['target'] = regx.group(2).strip() 93 else: 94 meta[regx.group(1)] = regx.group(2) 95 96 self.line = read_forward(self.handle) 97 98 return meta
99
100 - def _parse_qresult(self):
101 """Parses a HMMER3 query block.""" 102 103 self._read_until(lambda line: line.startswith('Query:')) 104 105 while self.line: 106 107 # get query id and length 108 regx = re.search(_QRE_ID_LEN, self.line) 109 qid = regx.group(1).strip() 110 # store qresult attributes 111 qresult_attrs = { 112 'seq_len': int(regx.group(2)), 113 'program': self._meta.get('program'), 114 'version': self._meta.get('version'), 115 'target': self._meta.get('target'), 116 } 117 118 # get description and accession, if they exist 119 qdesc = '<unknown description>' # placeholder 120 while not self.line.startswith('Scores for '): 121 self.line = read_forward(self.handle) 122 123 if self.line.startswith('Accession:'): 124 acc = self.line.strip().split(' ', 1)[1] 125 qresult_attrs['accession'] = acc.strip() 126 elif self.line.startswith('Description:'): 127 qdesc = self.line.strip().split(' ', 1)[1].strip() 128 qresult_attrs['description'] = qdesc 129 130 # parse the query hits 131 while self.line and '//' not in self.line: 132 hit_list = self._parse_hit(qid, qdesc) 133 # read through the statistics summary 134 # TODO: parse and store this information? 135 if self.line.startswith('Internal pipeline'): 136 while self.line and '//' not in self.line: 137 self.line = read_forward(self.handle) 138 139 # create qresult, set its attributes and yield 140 # not initializing hit_list directly to handle empty hits 141 # (i.e. need to set its query description manually) 142 qresult = QueryResult(id=qid, hits=hit_list) 143 for attr, value in qresult_attrs.items(): 144 setattr(qresult, attr, value) 145 yield qresult 146 self.line = read_forward(self.handle) 147 148 # HMMER >= 3.1 outputs '[ok]' at the end of all results file, 149 # which means we can break the main loop when we see the line 150 if '[ok]' in self.line: 151 break
152
153 - def _parse_hit(self, qid, qdesc):
154 """Parses a HMMER3 hit block, beginning with the hit table.""" 155 # get to the end of the hit table delimiter and read one more line 156 self._read_until(lambda line: 157 line.startswith(' ------- ------ -----')) 158 self.line = read_forward(self.handle) 159 160 # assume every hit is in inclusion threshold until the inclusion 161 # threshold line is encountered 162 is_included = True 163 164 # parse the hit table 165 hit_attr_list = [] 166 while True: 167 if not self.line: 168 return [] 169 elif self.line.startswith(' ------ inclusion'): 170 is_included = False 171 self.line = read_forward(self.handle) 172 # if there are no hits, then there are no hsps 173 # so we forward-read until 'Internal pipeline..' 174 elif self.line.startswith(' [No hits detected that satisfy ' 175 'reporting'): 176 while True: 177 self.line = read_forward(self.handle) 178 if self.line.startswith('Internal pipeline'): 179 assert len(hit_attr_list) == 0 180 return [] 181 elif self.line.startswith('Domain annotation for each '): 182 hit_list = self._create_hits(hit_attr_list, qid, qdesc) 183 return hit_list 184 # entering hit results row 185 # parse the columns into a list 186 row = [x for x in self.line.strip().split(' ') if x] 187 # join the description words if it's >1 word 188 if len(row) > 10: 189 row[9] = ' '.join(row[9:]) 190 # if there's no description, set it to an empty string 191 elif len(row) < 10: 192 row.append('') 193 assert len(row) == 10 194 # create the hit object 195 hit_attrs = { 196 'id': row[8], 197 'query_id': qid, 198 'evalue': float(row[0]), 199 'bitscore': float(row[1]), 200 'bias': float(row[2]), 201 # row[3:6] is not parsed, since the info is available 202 # at the HSP level 203 'domain_exp_num': float(row[6]), 204 'domain_obs_num': int(row[7]), 205 'description': row[9], 206 'is_included': is_included, 207 } 208 hit_attr_list.append(hit_attrs) 209 210 self.line = read_forward(self.handle)
211
212 - def _create_hits(self, hit_attrs, qid, qdesc):
213 """Parses a HMMER3 hsp block, beginning with the hsp table.""" 214 # read through until the beginning of the hsp block 215 self._read_until(lambda line: line.startswith('Internal pipeline') 216 or line.startswith('>>')) 217 218 # start parsing the hsp block 219 hit_list = [] 220 while True: 221 if self.line.startswith('Internal pipeline'): 222 # by this time we should've emptied the hit attr list 223 assert len(hit_attrs) == 0 224 return hit_list 225 assert self.line.startswith('>>') 226 hid, hdesc = self.line[len('>> '):].split(' ', 1) 227 228 # read through the hsp table header and move one more line 229 self._read_until(lambda line: 230 line.startswith(' --- ------ ----- --------') or \ 231 line.startswith(' [No individual domains')) 232 self.line = read_forward(self.handle) 233 234 # parse the hsp table for the current hit 235 hsp_list = [] 236 while True: 237 # break out of hsp parsing if there are no hits, it's the last hsp 238 # or it's the start of a new hit 239 if self.line.startswith(' [No targets detected that satisfy') or \ 240 self.line.startswith(' [No individual domains') or \ 241 self.line.startswith('Internal pipeline statistics summary:') or \ 242 self.line.startswith(' Alignments for each domain:') or \ 243 self.line.startswith('>>'): 244 245 hit_attr = hit_attrs.pop(0) 246 hit = Hit(hsp_list) 247 for attr, value in hit_attr.items(): 248 setattr(hit, attr, value) 249 if not hit: 250 hit.query_description = qdesc 251 hit_list.append(hit) 252 break 253 254 parsed = [x for x in self.line.strip().split(' ') if x] 255 assert len(parsed) == 16 256 # parsed column order: 257 # index, is_included, bitscore, bias, evalue_cond, evalue 258 # hmmfrom, hmmto, query_ends, hit_ends, alifrom, alito, 259 # envfrom, envto, acc_avg 260 frag = HSPFragment(hid, qid) 261 # HMMER3 alphabets are always protein alphabets 262 frag.alphabet = generic_protein 263 # depending on whether the program is hmmsearch, hmmscan, or phmmer 264 # {hmm,ali}{from,to} can either be hit_{from,to} or query_{from,to} 265 # for hmmscan, hit is the hmm profile, query is the sequence 266 if self._meta.get('program') == 'hmmscan': 267 # adjust 'from' and 'to' coordinates to 0-based ones 268 frag.hit_start = int(parsed[6]) - 1 269 frag.hit_end = int(parsed[7]) 270 frag.query_start = int(parsed[9]) - 1 271 frag.query_end = int(parsed[10]) 272 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 273 # adjust 'from' and 'to' coordinates to 0-based ones 274 frag.hit_start = int(parsed[9]) - 1 275 frag.hit_end = int(parsed[10]) 276 frag.query_start = int(parsed[6]) - 1 277 frag.query_end = int(parsed[7]) 278 # strand is always 0, since HMMER now only handles protein 279 frag.hit_strand = frag.query_strand = 0 280 281 hsp = HSP([frag]) 282 hsp.domain_index = int(parsed[0]) 283 hsp.is_included = parsed[1] == '!' 284 hsp.bitscore = float(parsed[2]) 285 hsp.bias = float(parsed[3]) 286 hsp.evalue_cond = float(parsed[4]) 287 hsp.evalue = float(parsed[5]) 288 if self._meta.get('program') == 'hmmscan': 289 # adjust 'from' and 'to' coordinates to 0-based ones 290 hsp.hit_endtype = parsed[8] 291 hsp.query_endtype = parsed[11] 292 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 293 # adjust 'from' and 'to' coordinates to 0-based ones 294 hsp.hit_endtype = parsed[11] 295 hsp.query_endtype = parsed[8] 296 # adjust 'from' and 'to' coordinates to 0-based ones 297 hsp.env_start = int(parsed[12]) - 1 298 hsp.env_end = int(parsed[13]) 299 hsp.env_endtype = parsed[14] 300 hsp.acc_avg = float(parsed[15]) 301 302 hsp_list.append(hsp) 303 self.line = read_forward(self.handle) 304 305 # parse the hsp alignments 306 if self.line.startswith(' Alignments for each domain:'): 307 self._parse_aln_block(hid, hit.hsps)
308
309 - def _parse_aln_block(self, hid, hsp_list):
310 """Parses a HMMER3 HSP alignment block.""" 311 self.line = read_forward(self.handle) 312 dom_counter = 0 313 while True: 314 if self.line.startswith('>>') or \ 315 self.line.startswith('Internal pipeline'): 316 return hsp_list 317 assert self.line.startswith(' == domain %i' % (dom_counter + 1)) 318 # alias hsp to local var 319 # but note that we're still changing the attrs of the actual 320 # hsp inside the qresult as we're not creating a copy 321 frag = hsp_list[dom_counter][0] 322 # XXX: should we validate again here? regex is expensive.. 323 #regx = re.search(_HRE_VALIDATE, self.line) 324 #assert hsp.bitscore == float(regx.group(1)) 325 #assert hsp.evalue_cond == float(regx.group(2)) 326 hmmseq = '' 327 aliseq = '' 328 annot = {} 329 self.line = self.handle.readline() 330 331 # parse all the alignment blocks in the hsp 332 while True: 333 334 regx = None 335 336 # check for hit or query line 337 # we don't check for the hit or query id specifically 338 # to anticipate special cases where query id == hit id 339 regx = re.search(_HRE_ID_LINE, self.line) 340 if regx: 341 # the first hit/query self.line we encounter is the hmmseq 342 if len(hmmseq) == len(aliseq): 343 hmmseq += regx.group(2) 344 # and for subsequent self.lines, len(hmmseq) is either 345 # > or == len(aliseq) 346 elif len(hmmseq) > len(aliseq): 347 aliseq += regx.group(2) 348 assert len(hmmseq) >= len(aliseq) 349 # check for start of new domain 350 elif self.line.startswith(' == domain') or \ 351 self.line.startswith('>>') or \ 352 self.line.startswith('Internal pipeline'): 353 frag.aln_annotation = annot 354 if self._meta.get('program') == 'hmmscan': 355 frag.hit = hmmseq 356 frag.query = aliseq 357 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 358 frag.hit = aliseq 359 frag.query = hmmseq 360 dom_counter += 1 361 hmmseq = '' 362 aliseq = '' 363 annot = {} 364 break 365 # otherwise check if it's an annotation line and parse it 366 # len(hmmseq) is only != len(aliseq) when the cursor is parsing 367 # the similarity character. Since we're not parsing that, we 368 # check for when the condition is False (i.e. when it's ==) 369 elif len(hmmseq) == len(aliseq): 370 regx = re.search(_HRE_ANNOT_LINE, self.line) 371 if regx: 372 annot_name = regx.group(3) 373 if annot_name in annot: 374 annot[annot_name] += regx.group(2) 375 else: 376 annot[annot_name] = regx.group(2) 377 378 self.line = self.handle.readline()
379 380
381 -class Hmmer3TextIndexer(_BaseHmmerTextIndexer):
382 383 """Indexer class for HMMER plain text output.""" 384 385 _parser = Hmmer3TextParser 386 qresult_start = _as_bytes('Query: ') 387 qresult_end = _as_bytes('//') 388
389 - def __iter__(self):
390 handle = self._handle 391 handle.seek(0) 392 start_offset = handle.tell() 393 regex_id = re.compile(_as_bytes(_QRE_ID_LEN_PTN)) 394 395 while True: 396 line = read_forward(handle) 397 end_offset = handle.tell() 398 399 if line.startswith(self.qresult_start): 400 regx = re.search(regex_id, line) 401 qresult_key = regx.group(1).strip() 402 # qresult start offset is the offset of this line 403 # (starts with the start mark) 404 start_offset = end_offset - len(line) 405 elif line.startswith(self.qresult_end): 406 yield _bytes_to_string(qresult_key), start_offset, 0 407 start_offset = end_offset 408 elif not line: 409 break
410 411 # if not used as a module, run the doctest 412 if __name__ == "__main__": 413 from Bio._utils import run_doctest 414 run_doctest() 415