Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate plain text output format.""" 
  7   
  8  import re 
  9  from itertools import chain 
 10   
 11  from Bio._py3k import _as_bytes, _bytes_to_string 
 12  from Bio._py3k import zip 
 13   
 14   
 15  from Bio.Alphabet import generic_protein 
 16   
 17  from ._base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP, \ 
 18          _parse_hit_or_query_line 
 19  from .exonerate_vulgar import _RE_VULGAR 
 20   
 21   
 22  __all__ = ['ExonerateTextParser', 'ExonerateTextIndexer'] 
 23   
 24   
 25  # for capturing sequences in alignment blocks 
 26  # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA :    472' 
 27  _RE_ALN_ROW = re.compile(r'\s*\d+\s+: (.*) :\s+\d+') 
 28  # for splitting the line based on intron annotations 
 29  # e.g. '  >>>> Target Intron 1 >>>>  ' or 'gt.........................ag' 
 30  _RE_EXON = re.compile(r'[atgc ]{2,}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2,}?') 
 31  # captures the intron length 
 32  # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' 
 33  _RE_EXON_LEN = re.compile(r'(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)') 
 34  # for splitting lines in the NER model 
 35  _RE_NER = re.compile(r'--<\s+\d+\s+>--') 
 36  # for capturing NER gap lengths 
 37  _RE_NER_LEN = re.compile(r'--<\s+(\d+)\s+>--') 
 38  # regexes for capturing the letters inside curly braces 
 39  # no. of letters is either 1 or 2, since they are split codons 
 40  _RE_SCODON_START = re.compile(r'\{(\w{1,2})\}$') 
 41  _RE_SCODON_END = re.compile(r'^\{(\w{1,2})\}') 
 42   
 43   
44 -def _flip_codons(codon_seq, target_seq):
45 """Flips the codon characters from one seq to another.""" 46 a, b = '', '' 47 for char1, char2 in zip(codon_seq, target_seq): 48 # no need to do anything if the codon seq line has nothing 49 if char1 == ' ': 50 a += char1 51 b += char2 52 else: 53 a += char2 54 b += char1 55 56 return a, b
57 58
59 -def _get_block_coords(parsed_seq, row_dict, has_ner=False):
60 """Returns a list of start, end coordinates for each given block in the sequence.""" 61 start = 0 62 coords = [] 63 if not has_ner: 64 splitter = _RE_EXON 65 else: 66 splitter = _RE_NER 67 68 # use the query line for reference 69 seq = parsed_seq[row_dict['query']] 70 71 for block in re.split(splitter, seq): 72 start += seq[start:].find(block) 73 end = start + len(block) 74 coords.append((start, end)) 75 76 return coords
77 78
79 -def _get_inter_coords(coords, strand=1):
80 """From the given pairs of coordinates, returns a list of pairs 81 covering the intervening ranges.""" 82 # adapted from Python's itertools guide 83 # if strand is -1, adjust coords to the ends and starts are chained 84 if strand == -1: 85 sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] 86 inter_coords = list(chain(*sorted_coords))[1:-1] 87 return list(zip(inter_coords[1::2], inter_coords[::2])) 88 else: 89 inter_coords = list(chain(*coords))[1:-1] 90 return list(zip(inter_coords[::2], inter_coords[1::2]))
91 92
93 -def _stitch_rows(raw_rows):
94 """Stitches together the parsed alignment rows and returns them in a list.""" 95 # deal with possible codon surprise! 96 # (i.e. alignments with codons using cdna2genome model) 97 # by creating additional rows to contain the codons 98 try: 99 max_len = max(len(x) for x in raw_rows) 100 for row in raw_rows: 101 assert len(row) == max_len 102 except AssertionError: 103 for idx, row in enumerate(raw_rows): 104 if len(row) != max_len: 105 # codons must be present in the query and hit (so +2) 106 assert len(row) + 2 == max_len 107 # add additional empty lines to contain codons 108 raw_rows[idx] = [' ' * len(row[0])] + row + [' ' * len(row[0])] 109 110 cmbn_rows = [] 111 for idx, row in enumerate(raw_rows[0]): 112 cmbn_row = ''.join(aln_row[idx] for aln_row in raw_rows) 113 cmbn_rows.append(cmbn_row) 114 115 # the real aligned sequence is always the 'outer' one, so we want 116 # to flip them with their 'inner' pairs 117 if len(cmbn_rows) == 5: 118 # flip query sequence 119 cmbn_rows[0], cmbn_rows[1] = \ 120 _flip_codons(cmbn_rows[0], cmbn_rows[1]) 121 # flip hit sequence 122 cmbn_rows[4], cmbn_rows[3] = \ 123 _flip_codons(cmbn_rows[4], cmbn_rows[3]) 124 125 return cmbn_rows
126 127
128 -def _get_row_dict(row_len, model):
129 """Returns a dictionary of row indices for parsing alignment blocks.""" 130 idx = {} 131 # 3 lines, usually in dna vs dna models 132 if row_len == 3: 133 idx['query'] = 0 134 idx['midline'] = 1 135 idx['hit'] = 2 136 idx['qannot'], idx['hannot'] = None, None 137 # 4 lines, in protein vs dna models or dna vs protein models 138 # TODO: currently we check this from the model string; is there 139 # a better way to do it? 140 elif row_len == 4: 141 if 'protein2' in model: 142 idx['query'] = 0 143 idx['midline'] = 1 144 idx['hit'] = 2 145 idx['hannot'] = 3 146 idx['qannot'] = None 147 elif '2protein' in model: 148 idx['query'] = 1 149 idx['midline'] = 2 150 idx['hit'] = 3 151 idx['hannot'] = None 152 idx['qannot'] = 0 153 else: 154 raise ValueError("Unexpected model: " + model) 155 # 5 lines, translated dna vs translated dna 156 elif row_len == 5: 157 # set sequence indexes 158 idx['qannot'] = 0 159 idx['query'] = 1 160 idx['midline'] = 2 161 idx['hit'] = 3 162 idx['hannot'] = 4 163 else: 164 raise ValueError("Unexpected row count in alignment block: " 165 "%i" % row_len) 166 return idx
167 168
169 -def _get_blocks(rows, coords, idx):
170 """Returns a list of dictionaries of sequences split by the coordinates.""" 171 for idx_name in ('query', 'hit', 'midline', 'qannot', 'hannot'): 172 assert idx_name in idx 173 blocks = [] 174 for start, end in coords: 175 block = {} 176 # get seqs according to index 177 block['query'] = rows[idx['query']][start:end] 178 block['hit'] = rows[idx['hit']][start:end] 179 block['similarity'] = rows[idx['midline']][start:end] 180 if idx['qannot'] is not None: 181 block['query_annotation'] = rows[idx['qannot']][start:end] 182 if idx['hannot'] is not None: 183 block['hit_annotation'] = rows[idx['hannot']][start:end] 184 blocks.append(block) 185 186 return blocks
187 188
189 -def _get_scodon_moves(tmp_seq_blocks):
190 """Returns a dictionary of split codon locations relative to each 191 fragment's end""" 192 scodon_moves = {'query': [], 'hit': []} 193 for seq_type in scodon_moves: 194 scoords = [] 195 for block in tmp_seq_blocks: 196 # check both ends of the sequence for residues in curly braces 197 m_start = re.search(_RE_SCODON_START, block[seq_type]) 198 m_end = re.search(_RE_SCODON_END, block[seq_type]) 199 if m_start: 200 m_start = len(m_start.group(1)) 201 scoords.append((m_start, 0)) 202 else: 203 scoords.append((0, 0)) 204 if m_end: 205 m_end = len(m_end.group(1)) 206 scoords.append((0, m_end)) 207 else: 208 scoords.append((0, 0)) 209 scodon_moves[seq_type] = scoords 210 211 return scodon_moves
212 213
214 -def _clean_blocks(tmp_seq_blocks):
215 """Removes curly braces (split codon markers) from the given sequences.""" 216 seq_blocks = [] 217 for seq_block in tmp_seq_blocks: 218 for line_name in seq_block: 219 seq_block[line_name] = \ 220 seq_block[line_name].replace('{', '').replace('}', '') 221 seq_blocks.append(seq_block) 222 223 return seq_blocks
224 225
226 -def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens):
227 """Returns the length of introns between fragments.""" 228 # set opposite type, for setting introns 229 opp_type = 'hit' if seq_type == 'query' else 'query' 230 # list of flags to denote if an intron follows a block 231 # it reads e.g. this line: 232 # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" 233 # and sets the opposing sequence type's intron (since this 234 # line is present on the opposite sequence type line) 235 has_intron_after = ['Intron' in x[seq_type] for x in 236 inter_blocks] 237 assert len(has_intron_after) == len(raw_inter_lens) 238 # create list containing coord adjustments incorporating 239 # intron lengths 240 inter_lens = [] 241 for flag, parsed_len in zip(has_intron_after, raw_inter_lens): 242 if flag: 243 # joint introns 244 if all(parsed_len[:2]): 245 # intron len is [0] if opp_type is query, otherwise it's [1] 246 intron_len = int(parsed_len[0]) if opp_type == 'query' \ 247 else int(parsed_len[1]) 248 # single hit/query introns 249 elif parsed_len[2]: 250 intron_len = int(parsed_len[2]) 251 else: 252 raise ValueError("Unexpected intron parsing " 253 "result: %r" % parsed_len) 254 else: 255 intron_len = 0 256 257 inter_lens.append(intron_len) 258 259 return inter_lens
260 261
262 -def _comp_coords(hsp, seq_type, inter_lens):
263 """Fill the block coordinates of the given hsp dictionary.""" 264 assert seq_type in ('hit', 'query') 265 # manually fill the first coord 266 seq_step = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 267 fstart = hsp['%s_start' % seq_type] 268 # fend is fstart + number of residues in the sequence, minus gaps 269 fend = fstart + len( 270 hsp[seq_type][0].replace('-', '').replace('>', 271 '').replace('<', '')) * seq_step 272 coords = [(fstart, fend)] 273 # and start from the second block, after the first inter seq 274 for idx, block in enumerate(hsp[seq_type][1:]): 275 bstart = coords[-1][1] + inter_lens[idx] * seq_step 276 bend = bstart + seq_step * \ 277 len(block.replace('-', '')) 278 coords.append((bstart, bend)) 279 280 # adjust the coords so the smallest is [0], if strand is -1 281 # couldn't do this in the previous steps since we need the initial 282 # block ordering 283 if seq_step != 1: 284 for idx, coord in enumerate(coords): 285 coords[idx] = coords[idx][1], coords[idx][0] 286 287 return coords
288 289
290 -def _comp_split_codons(hsp, seq_type, scodon_moves):
291 """Computes the positions of split codons and puts the values in the given 292 HSP dictionary.""" 293 scodons = [] 294 for idx in range(len(scodon_moves[seq_type])): 295 pair = scodon_moves[seq_type][idx] 296 if not any(pair): 297 continue 298 else: 299 assert not all(pair) 300 a, b = pair 301 anchor_pair = hsp['%s_ranges' % seq_type][idx // 2] 302 strand = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 303 304 if a: 305 func = max if strand == 1 else min 306 anchor = func(anchor_pair) 307 start_c, end_c = anchor + a * strand * -1, anchor 308 elif b: 309 func = min if strand == 1 else max 310 anchor = func(anchor_pair) 311 start_c, end_c = anchor + b * strand, anchor 312 scodons.append((min(start_c, end_c), max(start_c, end_c))) 313 314 return scodons
315 316
317 -class ExonerateTextParser(_BaseExonerateParser):
318 319 """Parser for Exonerate plain text output.""" 320 321 _ALN_MARK = 'C4 Alignment:' 322
323 - def parse_alignment_block(self, header):
324 qresult = header['qresult'] 325 hit = header['hit'] 326 hsp = header['hsp'] 327 # check for values that must have been set by previous methods 328 for val_name in ('query_start', 'query_end', 'hit_start', 'hit_end', 329 'query_strand', 'hit_strand'): 330 assert val_name in hsp, hsp 331 332 # get the alignment rows 333 # and stitch them so we have the full sequences in single strings 334 raw_aln_blocks, vulgar_comp = self._read_alignment() 335 # cmbn_rows still has split codon markers (curly braces) 336 cmbn_rows = _stitch_rows(raw_aln_blocks) 337 row_dict = _get_row_dict(len(cmbn_rows), qresult['model']) 338 # get the sequence blocks 339 has_ner = 'NER' in qresult['model'].upper() 340 seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) 341 tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) 342 # get split codon temp coords for later use 343 # this result in pairs of base movement for both ends of each row 344 scodon_moves = _get_scodon_moves(tmp_seq_blocks) 345 # remove the split codon markers 346 seq_blocks = _clean_blocks(tmp_seq_blocks) 347 348 # adjust strands 349 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 350 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 351 # cast coords into ints 352 hsp['query_start'] = int(hsp['query_start']) 353 hsp['query_end'] = int(hsp['query_end']) 354 hsp['hit_start'] = int(hsp['hit_start']) 355 hsp['hit_end'] = int(hsp['hit_end']) 356 # cast score into ints 357 hsp['score'] = int(hsp['score']) 358 # set sequences 359 hsp['query'] = [x['query'] for x in seq_blocks] 360 hsp['hit'] = [x['hit'] for x in seq_blocks] 361 hsp['aln_annotation'] = {} 362 # set the alphabet 363 # currently only limited to models with protein queries 364 if 'protein2' in qresult['model'] or 'coding2' in qresult['model'] \ 365 or '2protein' in qresult['model']: 366 hsp['alphabet'] = generic_protein 367 # get the annotations if they exist 368 for annot_type in ('similarity', 'query_annotation', 'hit_annotation'): 369 try: 370 hsp['aln_annotation'][annot_type] = \ 371 [x[annot_type] for x in seq_blocks] 372 except KeyError: 373 pass 374 375 # use vulgar coordinates if vulgar line is present and return 376 # if vulgar_comp is not None: 377 # hsp = parse_vulgar_comp(hsp, vulgar_comp) 378 379 # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} 380 381 # otherwise we need to get the coordinates from the alignment 382 # get the intervening blocks first, so we can use them 383 # to adjust the coordinates 384 if not has_ner: 385 # get intervening coordinates and blocks, only if model is not ner 386 # ner models have a much more simple coordinate calculation 387 inter_coords = _get_inter_coords(seq_coords) 388 inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) 389 # returns a three-component tuple of intron lengths 390 # first two component filled == intron in hit and query 391 # last component filled == intron in hit or query 392 raw_inter_lens = re.findall(_RE_EXON_LEN, 393 cmbn_rows[row_dict['midline']]) 394 395 # compute start and end coords for each block 396 for seq_type in ('query', 'hit'): 397 398 # ner blocks and intron blocks require different adjustments 399 if not has_ner: 400 opp_type = 'hit' if seq_type == 'query' else 'query' 401 inter_lens = _comp_intron_lens(seq_type, inter_blocks, 402 raw_inter_lens) 403 else: 404 # for NER blocks, the length of the inter-fragment gaps is 405 # written on the same strand, so opp_type is seq_type 406 opp_type = seq_type 407 inter_lens = [int(x) for x in 408 re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]])] 409 410 # check that inter_lens's length is len opp_type block - 1 411 assert len(inter_lens) == len(hsp[opp_type])-1, \ 412 "%r vs %r" % (len(inter_lens), len(hsp[opp_type])-1) 413 # fill the hsp query and hit coordinates 414 hsp['%s_ranges' % opp_type] = \ 415 _comp_coords(hsp, opp_type, inter_lens) 416 # and fill the split codon coordinates, if model != ner 417 # can't do this in the if-else clause above since we need to 418 # compute the ranges first 419 if not has_ner: 420 hsp['%s_split_codons' % opp_type] = \ 421 _comp_split_codons(hsp, opp_type, scodon_moves) 422 423 # now that we've finished parsing coords, we can set the hit and start 424 # coord according to Biopython's convention (start <= end) 425 for seq_type in ('query', 'hit'): 426 if hsp['%s_strand' % seq_type] == -1: 427 n_start = '%s_start' % seq_type 428 n_end = '%s_end' % seq_type 429 hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] 430 431 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
432
433 - def _read_alignment(self):
434 """Reads the raw alignment block strings, returns them in a list.""" 435 raw_aln_blocks = [] 436 # flag to check whether we're in an aligment row 437 in_aln_row = False 438 # flag for vulgar line, if present, we can parse coordinates from it 439 vulgar_comp = None 440 while True: 441 442 match = re.search(_RE_ALN_ROW, self.line.strip()) 443 # if we have a match, set flags and values 444 if match and not in_aln_row: 445 start_idx = self.line.index(match.group(1)) 446 row_len = len(match.group(1)) 447 in_aln_row = True 448 raw_aln_block = [] 449 # if we're in an alignment row, grab the sequence 450 if in_aln_row: 451 raw_aln_block.append(self.line[start_idx:start_idx+row_len]) 452 # reset flags and values if the line matches, we're in an alignment 453 # row, and there are more than 1 line in rows 454 if match and in_aln_row and len(raw_aln_block) > 1: 455 raw_aln_blocks.append(raw_aln_block) 456 start_idx = None 457 row_len = None 458 in_aln_row = False 459 460 self.line = self.handle.readline() 461 # try to parse vulgar line if present 462 if self.line.startswith('vulgar'): 463 vulgar = re.search(_RE_VULGAR, self.line) 464 vulgar_comp = vulgar.group(10) 465 if not self.line or self.line.startswith(self._ALN_MARK): 466 # HACK: this is so that the parse_qresult method does not 467 # yield the objects before appending the last HSP. We are doing 468 # this to keep the parser compatible with outputs without 469 # human-readable alignment outputs. This also relies on the 470 # fact that repeated readline() always returns '' on EOF. 471 if not self.line: 472 self.line = 'mock' 473 break 474 475 return raw_aln_blocks, vulgar_comp
476 477
478 -class ExonerateTextIndexer(_BaseExonerateIndexer):
479 480 """Indexer class for Exonerate plain text.""" 481 482 _parser = ExonerateTextParser 483 _query_mark = _as_bytes('C4 Alignment') 484
485 - def get_qresult_id(self, pos):
486 """Returns the query ID from the nearest "Query:" line.""" 487 handle = self._handle 488 handle.seek(pos) 489 sentinel = _as_bytes('Query:') 490 491 while True: 492 line = handle.readline().strip() 493 if line.startswith(sentinel): 494 break 495 if not line: 496 raise StopIteration 497 qid, desc = _parse_hit_or_query_line(_bytes_to_string(line)) 498 499 return qid
500
501 - def get_raw(self, offset):
502 """Returns the raw string of a QueryResult object from the given offset.""" 503 handle = self._handle 504 handle.seek(offset) 505 qresult_key = None 506 qresult_raw = _as_bytes('') 507 508 while True: 509 line = handle.readline() 510 if not line: 511 break 512 elif line.startswith(self._query_mark): 513 cur_pos = handle.tell() 514 if qresult_key is None: 515 qresult_key = self.get_qresult_id(cur_pos) 516 else: 517 curr_key = self.get_qresult_id(cur_pos) 518 if curr_key != qresult_key: 519 break 520 handle.seek(cur_pos) 521 qresult_raw += line 522 523 return qresult_raw
524 525 526 # if not used as a module, run the doctest 527 if __name__ == "__main__": 528 from Bio._utils import run_doctest 529 run_doctest() 530