1
2
3
4
5
6 """Bio.SearchIO parser for HMMER plain text output format."""
7
8 import re
9
10 from Bio._py3k import _as_bytes, _bytes_to_string
11 from Bio._utils import read_forward
12 from Bio.Alphabet import generic_protein
13 from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment
14
15 from _base import _BaseHmmerTextIndexer
16
17 __all__ = ['Hmmer3TextParser', 'Hmmer3TextIndexer']
18
19
20
21
22 _RE_PROGRAM = re.compile(r'^# (\w*hmm\w+) :: .*$')
23
24 _RE_VERSION = re.compile(r'# \w+ ([\w+\.]+) .*; http.*$')
25
26 _RE_OPT = re.compile(r'^# (.+):\s+(.+)$')
27
28 _QRE_ID_LEN_PTN = r'^Query:\s*(.*)\s+\[\w=(\d+)\]'
29 _QRE_ID_LEN = re.compile(_QRE_ID_LEN_PTN)
30
31 _HRE_VALIDATE = re.compile(r'score:\s(-?\d+\.?\d+)\sbits.*value:\s(.*)')
32
33 _HRE_ANNOT_LINE = re.compile(r'^(\s+)(.+)\s(\w+)')
34 _HRE_ID_LINE = re.compile(r'^(\s+\S+\s+[0-9-]+ )(.+?)(\s+[0-9-]+)')
35
36
37 -class Hmmer3TextParser(object):
38
39 """Parser for the HMMER 3.0 text output."""
40
41 - def __init__(self, handle):
42 self.handle = handle
43 self.line = read_forward(self.handle)
44 self._meta = self._parse_preamble()
45
47 for qresult in self._parse_qresult():
48 yield qresult
49
50 - def _read_until(self, bool_func):
51 """Reads the file handle until the given function returns True."""
52 while True:
53 if not self.line or bool_func(self.line):
54 return
55 else:
56 self.line = read_forward(self.handle)
57
59 """Parses HMMER preamble (lines beginning with '#')."""
60 meta = {}
61
62
63 has_opts = False
64 while True:
65
66 if not self.line.startswith('#'):
67 break
68
69
70 elif '- - -' in self.line:
71 if not has_opts:
72
73
74 has_opts = True
75 else:
76
77
78 break
79 elif not has_opts:
80
81 regx = re.search(_RE_PROGRAM, self.line)
82 if regx:
83 meta['program'] = regx.group(1)
84
85 regx = re.search(_RE_VERSION, self.line)
86 if regx:
87 meta['version'] = regx.group(1)
88 elif has_opts:
89 regx = re.search(_RE_OPT, self.line)
90
91 if 'target' in regx.group(1):
92 meta['target'] = regx.group(2)
93 else:
94 meta[regx.group(1)] = regx.group(2)
95
96 self.line = read_forward(self.handle)
97
98 return meta
99
100 - def _parse_qresult(self):
101 """Parses a HMMER3 query block."""
102
103 self._read_until(lambda line: line.startswith('Query:'))
104
105 while self.line:
106
107
108 regx = re.search(_QRE_ID_LEN, self.line)
109 qid = regx.group(1).strip()
110
111 qresult_attrs = {
112 'seq_len': int(regx.group(2)),
113 'program': self._meta.get('program'),
114 'version': self._meta.get('version'),
115 'target': self._meta.get('target'),
116 }
117
118
119 desc = ''
120 while not self.line.startswith('Scores for '):
121 self.line = read_forward(self.handle)
122
123 if self.line.startswith('Accession:'):
124 acc = self.line.strip().split(' ', 1)[1]
125 qresult_attrs['accession'] = acc.strip()
126 elif self.line.startswith('Description:'):
127 desc = self.line.strip().split(' ', 1)[1]
128 qresult_attrs['description'] = desc.strip()
129
130
131 while self.line and '//' not in self.line:
132 hit_list = self._parse_hit(qid)
133
134
135 if self.line.startswith('Internal pipeline'):
136 while self.line and '//' not in self.line:
137 self.line = read_forward(self.handle)
138
139
140 qresult = QueryResult(qid, hits=hit_list)
141 for attr, value in qresult_attrs.items():
142 setattr(qresult, attr, value)
143 yield qresult
144 self.line = read_forward(self.handle)
145
146 - def _parse_hit(self, qid):
147 """Parses a HMMER3 hit block, beginning with the hit table."""
148
149 self._read_until(lambda line:
150 line.startswith(' ------- ------ -----'))
151 self.line = read_forward(self.handle)
152
153
154
155 is_included = True
156
157
158 hit_attr_list = []
159 while True:
160 if not self.line:
161 return []
162 elif self.line.startswith(' ------ inclusion'):
163 is_included = False
164 self.line = read_forward(self.handle)
165
166
167 elif self.line.startswith(' [No hits detected that satisfy '
168 'reporting'):
169 while True:
170 self.line = read_forward(self.handle)
171 if self.line.startswith('Internal pipeline'):
172 assert len(hit_attr_list) == 0
173 return []
174 elif self.line.startswith('Domain annotation for each '):
175 hit_list = self._create_hits(hit_attr_list, qid)
176 return hit_list
177
178
179 row = filter(None, self.line.strip().split(' '))
180
181 if len(row) > 10:
182 row[9] = ' '.join(row[9:])
183
184 elif len(row) < 10:
185 row.append('')
186 assert len(row) == 10
187
188 hit_attrs = {
189 'id': row[8],
190 'query_id': qid,
191 'evalue': float(row[0]),
192 'bitscore': float(row[1]),
193 'bias': float(row[2]),
194
195
196 'domain_exp_num': float(row[6]),
197 'domain_obs_num': int(row[7]),
198 'description': row[9],
199 'is_included': is_included,
200 }
201 hit_attr_list.append(hit_attrs)
202
203 self.line = read_forward(self.handle)
204
205 - def _create_hits(self, hit_attrs, qid):
206 """Parses a HMMER3 hsp block, beginning with the hsp table."""
207
208 self._read_until(lambda line: line.startswith('Internal pipeline')
209 or line.startswith('>>'))
210
211
212 hit_list = []
213 while True:
214 if self.line.startswith('Internal pipeline'):
215
216 assert len(hit_attrs) == 0
217 return hit_list
218 assert self.line.startswith('>>')
219 hid, hdesc = self.line[len('>> '):].split(' ', 1)
220
221
222 self._read_until(lambda line:
223 line.startswith(' --- ------ ----- --------') or \
224 line.startswith(' [No individual domains'))
225 self.line = read_forward(self.handle)
226
227
228 hsp_list = []
229 while True:
230
231
232 if self.line.startswith(' [No targets detected that satisfy') or \
233 self.line.startswith(' [No individual domains') or \
234 self.line.startswith('Internal pipeline statistics summary:') or \
235 self.line.startswith(' Alignments for each domain:') or \
236 self.line.startswith('>>'):
237
238 hit_attr = hit_attrs.pop(0)
239 hit = Hit(hsp_list)
240 for attr, value in hit_attr.items():
241 setattr(hit, attr, value)
242 hit_list.append(hit)
243 break
244
245 parsed = filter(None, self.line.strip().split(' '))
246 assert len(parsed) == 16
247
248
249
250
251 frag = HSPFragment(hid, qid)
252
253 frag.alphabet = generic_protein
254
255
256
257 if self._meta.get('program') == 'hmmscan':
258
259 frag.hit_start = int(parsed[6]) - 1
260 frag.hit_end = int(parsed[7])
261 frag.query_start = int(parsed[9]) - 1
262 frag.query_end = int(parsed[10])
263 elif self._meta.get('program') in ['hmmsearch', 'phmmer']:
264
265 frag.hit_start = int(parsed[9]) - 1
266 frag.hit_end = int(parsed[10])
267 frag.query_start = int(parsed[6]) - 1
268 frag.query_end = int(parsed[7])
269
270 frag.hit_strand = frag.query_strand = 0
271
272 hsp = HSP([frag])
273 hsp.domain_index = int(parsed[0])
274 hsp.is_included = parsed[1] == '!'
275 hsp.bitscore = float(parsed[2])
276 hsp.bias = float(parsed[3])
277 hsp.evalue_cond = float(parsed[4])
278 hsp.evalue = float(parsed[5])
279 if self._meta.get('program') == 'hmmscan':
280
281 hsp.hit_endtype = parsed[8]
282 hsp.query_endtype = parsed[11]
283 elif self._meta.get('program') in ['hmmsearch', 'phmmer']:
284
285 hsp.hit_endtype = parsed[11]
286 hsp.query_endtype = parsed[8]
287
288 hsp.env_start = int(parsed[12]) - 1
289 hsp.env_end = int(parsed[13])
290 hsp.env_endtype = parsed[14]
291 hsp.acc_avg = float(parsed[15])
292
293 hsp_list.append(hsp)
294 self.line = read_forward(self.handle)
295
296
297 if self.line.startswith(' Alignments for each domain:'):
298 self._parse_aln_block(hid, hit.hsps)
299
300 - def _parse_aln_block(self, hid, hsp_list):
301 """Parses a HMMER3 HSP alignment block."""
302 self.line = read_forward(self.handle)
303 dom_counter = 0
304 while True:
305 if self.line.startswith('>>') or \
306 self.line.startswith('Internal pipeline'):
307 return hsp_list
308 assert self.line.startswith(' == domain %i' % (dom_counter + 1))
309
310
311
312 frag = hsp_list[dom_counter][0]
313
314
315
316
317 hmmseq = ''
318 aliseq = ''
319 annot = {}
320 self.line = self.handle.readline()
321
322
323 while True:
324
325 regx = None
326
327
328
329
330 regx = re.search(_HRE_ID_LINE, self.line)
331 if regx:
332
333 if len(hmmseq) == len(aliseq):
334 hmmseq += regx.group(2)
335
336
337 elif len(hmmseq) > len(aliseq):
338 aliseq += regx.group(2)
339 assert len(hmmseq) >= len(aliseq)
340
341 elif self.line.startswith(' == domain') or \
342 self.line.startswith('>>') or \
343 self.line.startswith('Internal pipeline'):
344 frag.aln_annotation = annot
345 if self._meta.get('program') == 'hmmscan':
346 frag.hit = hmmseq
347 frag.query = aliseq
348 elif self._meta.get('program') in ['hmmsearch', 'phmmer']:
349 frag.hit = aliseq
350 frag.query = hmmseq
351 dom_counter += 1
352 hmmseq = ''
353 aliseq = ''
354 annot = {}
355 break
356
357
358
359
360 elif len(hmmseq) == len(aliseq):
361 regx = re.search(_HRE_ANNOT_LINE, self.line)
362 if regx:
363 annot_name = regx.group(3)
364 if annot_name in annot:
365 annot[annot_name] += regx.group(2)
366 else:
367 annot[annot_name] = regx.group(2)
368
369 self.line = self.handle.readline()
370
371
372 -class Hmmer3TextIndexer(_BaseHmmerTextIndexer):
373
374 """Indexer class for HMMER plain text output."""
375
376 _parser = Hmmer3TextParser
377 qresult_start = _as_bytes('Query: ')
378 qresult_end = _as_bytes('//')
379
380 - def __iter__(self):
381 handle = self._handle
382 handle.seek(0)
383 start_offset = handle.tell()
384 regex_id = re.compile(_as_bytes(_QRE_ID_LEN_PTN))
385
386 while True:
387 line = read_forward(handle)
388 end_offset = handle.tell()
389
390 if line.startswith(self.qresult_start):
391 regx = re.search(regex_id, line)
392 qresult_key = regx.group(1).strip()
393
394
395 start_offset = end_offset - len(line)
396 elif line.startswith(self.qresult_end):
397 yield _bytes_to_string(qresult_key), start_offset, 0
398 start_offset = end_offset
399 elif not line:
400 break
401
402
403 if __name__ == "__main__":
404 from Bio._utils import run_doctest
405 run_doctest()
406