Package Bio :: Package SeqIO :: Module AbiIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.AbiIO

  1  # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com) 
  2  # Revisions copyright 2011 by Peter Cock. 
  3  # This code is part of the Biopython distribution and governed by its 
  4  # license. Please see the LICENSE file that should have been included 
  5  # as part of this package. 
  6   
  7  """Bio.SeqIO parser for the ABI format. 
  8   
  9  ABI is the format used by Applied Biosystem's sequencing machines to store 
 10  sequencing results. 
 11   
 12  For more details on the format specification, visit: 
 13  http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf 
 14   
 15  """ 
 16   
 17  __docformat__ = "epytext en" 
 18   
 19  import datetime 
 20  import struct 
 21   
 22  from os.path import basename 
 23   
 24  from Bio import Alphabet 
 25  from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna 
 26  from Bio.Seq import Seq 
 27  from Bio.SeqRecord import SeqRecord 
 28   
 29  from Bio._py3k import _bytes_to_string, _as_bytes 
 30  from Bio._py3k import zip 
 31   
 32  # dictionary for determining which tags goes into SeqRecord annotation 
 33  # each key is tag_name + tag_number 
 34  # if a tag entry needs to be added, just add its key and its key 
 35  # for the annotations dictionary as the value 
 36  _EXTRACT = { 
 37      'TUBE1': 'sample_well', 
 38      'DySN1': 'dye', 
 39      'GTyp1': 'polymer', 
 40      'MODL1': 'machine_model', 
 41  } 
 42  # dictionary for tags that require preprocessing before use in creating 
 43  # seqrecords 
 44  _SPCTAGS = [ 
 45      'PBAS2',    # base-called sequence 
 46      'PCON2',    # quality values of base-called sequence 
 47      'SMPL1',    # sample id inputted before sequencing run 
 48      'RUND1',    # run start date 
 49      'RUND2',    # run finish date 
 50      'RUNT1',    # run start time 
 51      'RUNT2',    # run finish time 
 52  ] 
 53  # dictionary for data unpacking format 
 54  _BYTEFMT = { 
 55      1: 'b',     # byte 
 56      2: 's',     # char 
 57      3: 'H',     # word 
 58      4: 'h',     # short 
 59      5: 'i',     # long 
 60      6: '2i',    # rational, legacy unsupported 
 61      7: 'f',     # float 
 62      8: 'd',     # double 
 63      10: 'h2B',  # date 
 64      11: '4B',   # time 
 65      12: '2i2b',  # thumb 
 66      13: 'B',    # bool 
 67      14: '2h',   # point, legacy unsupported 
 68      15: '4h',   # rect, legacy unsupported 
 69      16: '2i',   # vPoint, legacy unsupported 
 70      17: '4i',   # vRect, legacy unsupported 
 71      18: 's',    # pString 
 72      19: 's',    # cString 
 73      20: '2i',   # tag, legacy unsupported 
 74  } 
 75  # header data structure (exluding 4 byte ABIF marker) 
 76  _HEADFMT = '>H4sI2H3I' 
 77  # directory data structure 
 78  _DIRFMT = '>4sI2H4I' 
 79   
 80   
81 -def AbiIterator(handle, alphabet=None, trim=False):
82 """Iterator for the Abi file format. 83 """ 84 # raise exception is alphabet is not dna 85 if alphabet is not None: 86 if isinstance(Alphabet._get_base_alphabet(alphabet), 87 Alphabet.ProteinAlphabet): 88 raise ValueError( 89 "Invalid alphabet, ABI files do not hold proteins.") 90 if isinstance(Alphabet._get_base_alphabet(alphabet), 91 Alphabet.RNAAlphabet): 92 raise ValueError("Invalid alphabet, ABI files do not hold RNA.") 93 94 # raise exception if handle mode is not 'rb' 95 if hasattr(handle, 'mode'): 96 if set('rb') != set(handle.mode.lower()): 97 raise ValueError("ABI files has to be opened in 'rb' mode.") 98 99 # check if input file is a valid Abi file 100 handle.seek(0) 101 marker = handle.read(4) 102 if not marker: 103 # handle empty file gracefully 104 raise StopIteration 105 if marker != _as_bytes('ABIF'): 106 raise IOError('File should start ABIF, not %r' % marker) 107 108 # dirty hack for handling time information 109 times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', } 110 111 # initialize annotations 112 annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT))) 113 114 # parse header and extract data from directories 115 header = struct.unpack(_HEADFMT, 116 handle.read(struct.calcsize(_HEADFMT))) 117 118 for tag_name, tag_number, tag_data in _abi_parse_header(header, handle): 119 # stop iteration if all desired tags have been extracted 120 # 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3, 121 # and seq, qual, id 122 # todo 123 124 key = tag_name + str(tag_number) 125 126 # PBAS2 is base-called sequence 127 if key == 'PBAS2': 128 seq = tag_data 129 ambigs = 'KYWMRS' 130 if alphabet is None: 131 if set(seq).intersection(ambigs): 132 alphabet = ambiguous_dna 133 else: 134 alphabet = unambiguous_dna 135 # PCON2 is quality values of base-called sequence 136 elif key == 'PCON2': 137 qual = [ord(val) for val in tag_data] 138 # SMPL1 is sample id entered before sequencing run 139 elif key == 'SMPL1': 140 sample_id = tag_data 141 elif key in times: 142 times[key] = tag_data 143 else: 144 # extract sequence annotation as defined in _EXTRACT 145 if key in _EXTRACT: 146 annot[_EXTRACT[key]] = tag_data 147 148 # set time annotations 149 annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1']) 150 annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2']) 151 152 # use the file name as SeqRecord.name if available 153 try: 154 file_name = basename(handle.name).replace('.ab1', '') 155 except: 156 file_name = "" 157 158 record = SeqRecord(Seq(seq, alphabet), 159 id=sample_id, name=file_name, 160 description='', 161 annotations=annot, 162 letter_annotations={'phred_quality': qual}) 163 164 if not trim: 165 yield record 166 else: 167 yield _abi_trim(record)
168 169
170 -def _AbiTrimIterator(handle):
171 """Iterator for the Abi file format that yields trimmed SeqRecord objects. 172 """ 173 return AbiIterator(handle, trim=True)
174 175
176 -def _abi_parse_header(header, handle):
177 """Generator that returns directory contents. 178 """ 179 # header structure (after ABIF marker): 180 # file version, tag name, tag number, 181 # element type code, element size, number of elements 182 # data size, data offset, handle (not file handle) 183 head_elem_size = header[4] 184 head_elem_num = header[5] 185 head_offset = header[7] 186 index = 0 187 188 while index < head_elem_num: 189 start = head_offset + index * head_elem_size 190 # add directory offset to tuple 191 # to handle directories with data size <= 4 bytes 192 handle.seek(start) 193 dir_entry = struct.unpack(_DIRFMT, 194 handle.read(struct.calcsize(_DIRFMT))) + (start,) 195 index += 1 196 # only parse desired dirs 197 key = _bytes_to_string(dir_entry[0]) 198 key += str(dir_entry[1]) 199 if key in (list(_EXTRACT) + _SPCTAGS): 200 tag_name = _bytes_to_string(dir_entry[0]) 201 tag_number = dir_entry[1] 202 elem_code = dir_entry[2] 203 elem_num = dir_entry[4] 204 data_size = dir_entry[5] 205 data_offset = dir_entry[6] 206 tag_offset = dir_entry[8] 207 # if data size <= 4 bytes, data is stored inside tag 208 # so offset needs to be changed 209 if data_size <= 4: 210 data_offset = tag_offset + 20 211 handle.seek(data_offset) 212 data = handle.read(data_size) 213 yield tag_name, tag_number, \ 214 _parse_tag_data(elem_code, elem_num, data)
215 216
217 -def _abi_trim(seq_record):
218 """Trims the sequence using Richard Mott's modified trimming algorithm. 219 220 seq_record - SeqRecord object to be trimmed. 221 222 Trimmed bases are determined from their segment score, which is a 223 cumulative sum of each base's score. Base scores are calculated from 224 their quality values. 225 226 More about the trimming algorithm: 227 http://www.phrap.org/phredphrap/phred.html 228 http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html 229 """ 230 231 start = False # flag for starting position of trimmed sequence 232 segment = 20 # minimum sequence length 233 trim_start = 0 # init start index 234 cutoff = 0.05 # default cutoff value for calculating base score 235 236 if len(seq_record) <= segment: 237 return seq_record 238 else: 239 # calculate base score 240 score_list = [cutoff - (10 ** (qual / -10.0)) for qual in 241 seq_record.letter_annotations['phred_quality']] 242 243 # calculate cummulative score 244 # if cummulative value < 0, set it to 0 245 # first value is set to 0, because of the assumption that 246 # the first base will always be trimmed out 247 cummul_score = [0] 248 for i in range(1, len(score_list)): 249 score = cummul_score[-1] + score_list[i] 250 if score < 0: 251 cummul_score.append(0) 252 else: 253 cummul_score.append(score) 254 if not start: 255 # trim_start = value when cummulative score is first > 0 256 trim_start = i 257 start = True 258 259 # trim_finish = index of highest cummulative score, 260 # marking the end of sequence segment with highest cummulative score 261 trim_finish = cummul_score.index(max(cummul_score)) 262 263 return seq_record[trim_start:trim_finish]
264 265
266 -def _parse_tag_data(elem_code, elem_num, raw_data):
267 """Returns single data value. 268 269 elem_code - What kind of data 270 elem_num - How many data points 271 raw_data - abi file object from which the tags would be unpacked 272 """ 273 if elem_code in _BYTEFMT: 274 # because '>1s' unpack differently from '>s' 275 if elem_num == 1: 276 num = '' 277 else: 278 num = str(elem_num) 279 fmt = '>' + num + _BYTEFMT[elem_code] 280 281 assert len(raw_data) == struct.calcsize(fmt) 282 data = struct.unpack(fmt, raw_data) 283 284 # no need to use tuple if len(data) == 1 285 # also if data is date / time 286 if elem_code not in [10, 11] and len(data) == 1: 287 data = data[0] 288 289 # account for different data types 290 if elem_code == 2: 291 return _bytes_to_string(data) 292 elif elem_code == 10: 293 return str(datetime.date(*data)) 294 elif elem_code == 11: 295 return str(datetime.time(*data[:3])) 296 elif elem_code == 13: 297 return bool(data) 298 elif elem_code == 18: 299 return _bytes_to_string(data[1:]) 300 elif elem_code == 19: 301 return _bytes_to_string(data[:-1]) 302 else: 303 return data 304 else: 305 return None
306 307 if __name__ == '__main__': 308 pass 309