Package Bio :: Package SeqIO :: Module AbiIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.AbiIO

  1  # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com) 
  2  # Revisions copyright 2011 by Peter Cock. 
  3  # This code is part of the Biopython distribution and governed by its 
  4  # license. Please see the LICENSE file that should have been included 
  5  # as part of this package. 
  6   
  7  """Bio.SeqIO parser for the ABI format. 
  8   
  9  ABI is the format used by Applied Biosystem's sequencing machines to store 
 10  sequencing results. 
 11   
 12  For more details on the format specification, visit: 
 13  http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf 
 14   
 15  """ 
 16   
 17  __docformat__ = "epytext en" 
 18   
 19  import datetime 
 20  import struct 
 21   
 22  from os.path import basename 
 23   
 24  from Bio import Alphabet 
 25  from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna 
 26  from Bio.Seq import Seq 
 27  from Bio.SeqRecord import SeqRecord 
 28  from Bio._py3k import _bytes_to_string, _as_bytes 
 29   
 30  # dictionary for determining which tags goes into SeqRecord annotation 
 31  # each key is tag_name + tag_number 
 32  # if a tag entry needs to be added, just add its key and its key 
 33  # for the annotations dictionary as the value 
 34  _EXTRACT = { 
 35      'TUBE1': 'sample_well', 
 36      'DySN1': 'dye', 
 37      'GTyp1': 'polymer', 
 38      'MODL1': 'machine_model', 
 39  } 
 40  # dictionary for tags that require preprocessing before use in creating 
 41  # seqrecords 
 42  _SPCTAGS = [ 
 43      'PBAS2',    # base-called sequence 
 44      'PCON2',    # quality values of base-called sequence 
 45      'SMPL1',    # sample id inputted before sequencing run 
 46      'RUND1',    # run start date 
 47      'RUND2',    # run finish date 
 48      'RUNT1',    # run start time 
 49      'RUNT2',    # run finish time 
 50  ] 
 51  # dictionary for data unpacking format 
 52  _BYTEFMT = { 
 53      1: 'b',     # byte 
 54      2: 's',     # char 
 55      3: 'H',     # word 
 56      4: 'h',     # short 
 57      5: 'i',     # long 
 58      6: '2i',    # rational, legacy unsupported 
 59      7: 'f',     # float 
 60      8: 'd',     # double 
 61      10: 'h2B',  # date 
 62      11: '4B',   # time 
 63      12: '2i2b',  # thumb 
 64      13: 'B',    # bool 
 65      14: '2h',   # point, legacy unsupported 
 66      15: '4h',   # rect, legacy unsupported 
 67      16: '2i',   # vPoint, legacy unsupported 
 68      17: '4i',   # vRect, legacy unsupported 
 69      18: 's',    # pString 
 70      19: 's',    # cString 
 71      20: '2i',   # tag, legacy unsupported 
 72  } 
 73  # header data structure (exluding 4 byte ABIF marker) 
 74  _HEADFMT = '>H4sI2H3I' 
 75  # directory data structure 
 76  _DIRFMT = '>4sI2H4I' 
 77   
 78   
79 -def AbiIterator(handle, alphabet=None, trim=False):
80 """Iterator for the Abi file format. 81 """ 82 # raise exception is alphabet is not dna 83 if alphabet is not None: 84 if isinstance(Alphabet._get_base_alphabet(alphabet), 85 Alphabet.ProteinAlphabet): 86 raise ValueError( 87 "Invalid alphabet, ABI files do not hold proteins.") 88 if isinstance(Alphabet._get_base_alphabet(alphabet), 89 Alphabet.RNAAlphabet): 90 raise ValueError("Invalid alphabet, ABI files do not hold RNA.") 91 92 # raise exception if handle mode is not 'rb' 93 if hasattr(handle, 'mode'): 94 if set('rb') != set(handle.mode.lower()): 95 raise ValueError("ABI files has to be opened in 'rb' mode.") 96 97 # check if input file is a valid Abi file 98 handle.seek(0) 99 marker = handle.read(4) 100 if not marker: 101 # handle empty file gracefully 102 raise StopIteration 103 if marker != _as_bytes('ABIF'): 104 raise IOError('File should start ABIF, not %r' % marker) 105 106 # dirty hack for handling time information 107 times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', } 108 109 # initialize annotations 110 annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT))) 111 112 # parse header and extract data from directories 113 header = struct.unpack(_HEADFMT, 114 handle.read(struct.calcsize(_HEADFMT))) 115 116 for tag_name, tag_number, tag_data in _abi_parse_header(header, handle): 117 # stop iteration if all desired tags have been extracted 118 # 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3, 119 # and seq, qual, id 120 # todo 121 122 key = tag_name + str(tag_number) 123 124 # PBAS2 is base-called sequence 125 if key == 'PBAS2': 126 seq = tag_data 127 ambigs = 'KYWMRS' 128 if alphabet is None: 129 if set(seq).intersection(ambigs): 130 alphabet = ambiguous_dna 131 else: 132 alphabet = unambiguous_dna 133 # PCON2 is quality values of base-called sequence 134 elif key == 'PCON2': 135 qual = [ord(val) for val in tag_data] 136 # SMPL1 is sample id entered before sequencing run 137 elif key == 'SMPL1': 138 sample_id = tag_data 139 elif key in times: 140 times[key] = tag_data 141 else: 142 # extract sequence annotation as defined in _EXTRACT 143 if key in _EXTRACT: 144 annot[_EXTRACT[key]] = tag_data 145 146 # set time annotations 147 annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1']) 148 annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2']) 149 150 # use the file name as SeqRecord.name if available 151 try: 152 file_name = basename(handle.name).replace('.ab1', '') 153 except: 154 file_name = "" 155 156 record = SeqRecord(Seq(seq, alphabet), 157 id=sample_id, name=file_name, 158 description='', 159 annotations=annot, 160 letter_annotations={'phred_quality': qual}) 161 162 if not trim: 163 yield record 164 else: 165 yield _abi_trim(record)
166 167
168 -def _AbiTrimIterator(handle):
169 """Iterator for the Abi file format that yields trimmed SeqRecord objects. 170 """ 171 return AbiIterator(handle, trim=True)
172 173
174 -def _abi_parse_header(header, handle):
175 """Generator that returns directory contents. 176 """ 177 # header structure (after ABIF marker): 178 # file version, tag name, tag number, 179 # element type code, element size, number of elements 180 # data size, data offset, handle (not file handle) 181 head_elem_size = header[4] 182 head_elem_num = header[5] 183 head_offset = header[7] 184 index = 0 185 186 while index < head_elem_num: 187 start = head_offset + index * head_elem_size 188 # add directory offset to tuple 189 # to handle directories with data size <= 4 bytes 190 handle.seek(start) 191 dir_entry = struct.unpack(_DIRFMT, 192 handle.read(struct.calcsize(_DIRFMT))) + (start,) 193 index += 1 194 # only parse desired dirs 195 key = _bytes_to_string(dir_entry[0]) 196 key += str(dir_entry[1]) 197 if key in (list(_EXTRACT.keys()) + _SPCTAGS): 198 tag_name = _bytes_to_string(dir_entry[0]) 199 tag_number = dir_entry[1] 200 elem_code = dir_entry[2] 201 elem_num = dir_entry[4] 202 data_size = dir_entry[5] 203 data_offset = dir_entry[6] 204 tag_offset = dir_entry[8] 205 # if data size <= 4 bytes, data is stored inside tag 206 # so offset needs to be changed 207 if data_size <= 4: 208 data_offset = tag_offset + 20 209 handle.seek(data_offset) 210 data = handle.read(data_size) 211 yield tag_name, tag_number, \ 212 _parse_tag_data(elem_code, elem_num, data)
213 214
215 -def _abi_trim(seq_record):
216 """Trims the sequence using Richard Mott's modified trimming algorithm. 217 218 seq_record - SeqRecord object to be trimmed. 219 220 Trimmed bases are determined from their segment score, which is a 221 cumulative sum of each base's score. Base scores are calculated from 222 their quality values. 223 224 More about the trimming algorithm: 225 http://www.phrap.org/phredphrap/phred.html 226 http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html 227 """ 228 229 start = False # flag for starting position of trimmed sequence 230 segment = 20 # minimum sequence length 231 trim_start = 0 # init start index 232 cutoff = 0.05 # default cutoff value for calculating base score 233 234 if len(seq_record) <= segment: 235 return seq_record 236 else: 237 # calculate base score 238 score_list = [cutoff - (10 ** (qual / -10.0)) for qual in 239 seq_record.letter_annotations['phred_quality']] 240 241 # calculate cummulative score 242 # if cummulative value < 0, set it to 0 243 # first value is set to 0, because of the assumption that 244 # the first base will always be trimmed out 245 cummul_score = [0] 246 for i in range(1, len(score_list)): 247 score = cummul_score[-1] + score_list[i] 248 if score < 0: 249 cummul_score.append(0) 250 else: 251 cummul_score.append(score) 252 if not start: 253 # trim_start = value when cummulative score is first > 0 254 trim_start = i 255 start = True 256 257 # trim_finish = index of highest cummulative score, 258 # marking the end of sequence segment with highest cummulative score 259 trim_finish = cummul_score.index(max(cummul_score)) 260 261 return seq_record[trim_start:trim_finish]
262 263
264 -def _parse_tag_data(elem_code, elem_num, raw_data):
265 """Returns single data value. 266 267 elem_code - What kind of data 268 elem_num - How many data points 269 raw_data - abi file object from which the tags would be unpacked 270 """ 271 if elem_code in _BYTEFMT: 272 # because '>1s' unpack differently from '>s' 273 if elem_num == 1: 274 num = '' 275 else: 276 num = str(elem_num) 277 fmt = '>' + num + _BYTEFMT[elem_code] 278 279 assert len(raw_data) == struct.calcsize(fmt) 280 data = struct.unpack(fmt, raw_data) 281 282 # no need to use tuple if len(data) == 1 283 # also if data is date / time 284 if elem_code not in [10, 11] and len(data) == 1: 285 data = data[0] 286 287 # account for different data types 288 if elem_code == 2: 289 return _bytes_to_string(data) 290 elif elem_code == 10: 291 return str(datetime.date(*data)) 292 elif elem_code == 11: 293 return str(datetime.time(*data[:3])) 294 elif elem_code == 13: 295 return bool(data) 296 elif elem_code == 18: 297 return _bytes_to_string(data[1:]) 298 elif self.elem_code == 19: 299 return _bytes_to_string(data[:-1]) 300 else: 301 return data 302 else: 303 return None
304 305 if __name__ == '__main__': 306 pass 307