Package Bio :: Package SeqIO :: Module InsdcIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.InsdcIO

   1  # Copyright 2007-2011 by Peter Cock.  All rights reserved. 
   2  # 
   3  # This code is part of the Biopython distribution and governed by its 
   4  # license.  Please see the LICENSE file that should have been included 
   5  # as part of this package.. 
   6   
   7  """Bio.SeqIO support for the "genbank" and "embl" file formats. 
   8   
   9  You are expected to use this module via the Bio.SeqIO functions. 
  10  Note that internally this module calls Bio.GenBank to do the actual 
  11  parsing of GenBank, EMBL and IMGT files. 
  12   
  13  See also: 
  14   
  15  International Nucleotide Sequence Database Collaboration 
  16  http://www.insdc.org/ 
  17    
  18  GenBank 
  19  http://www.ncbi.nlm.nih.gov/Genbank/ 
  20   
  21  EMBL Nucleotide Sequence Database 
  22  http://www.ebi.ac.uk/embl/ 
  23   
  24  DDBJ (DNA Data Bank of Japan) 
  25  http://www.ddbj.nig.ac.jp/ 
  26   
  27  IMGT (use a variant of EMBL format with longer feature indents) 
  28  http://imgt.cines.fr/download/LIGM-DB/userman_doc.html 
  29  http://imgt.cines.fr/download/LIGM-DB/ftable_doc.html 
  30  http://www.ebi.ac.uk/imgt/hla/docs/manual.html 
  31   
  32  """ 
  33   
  34  from Bio.Seq import UnknownSeq 
  35  from Bio.GenBank.Scanner import GenBankScanner, EmblScanner, _ImgtScanner 
  36  from Bio import Alphabet 
  37  from Interfaces import SequentialSequenceWriter 
  38  from Bio import SeqFeature 
  39   
  40  from Bio._py3k import _is_int_or_long 
  41   
  42  # NOTE 
  43  # ==== 
  44  # The "brains" for parsing GenBank, EMBL and IMGT files (and any 
  45  # other flat file variants from the INSDC in future) is in 
  46  # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank) 
  47  # However, all the writing code is in this file. 
  48   
  49   
50 -def GenBankIterator(handle):
51 """Breaks up a Genbank file into SeqRecord objects. 52 53 Every section from the LOCUS line to the terminating // becomes 54 a single SeqRecord with associated annotation and features. 55 56 Note that for genomes or chromosomes, there is typically only 57 one record.""" 58 #This calls a generator function: 59 return GenBankScanner(debug=0).parse_records(handle)
60
61 -def EmblIterator(handle):
62 """Breaks up an EMBL file into SeqRecord objects. 63 64 Every section from the LOCUS line to the terminating // becomes 65 a single SeqRecord with associated annotation and features. 66 67 Note that for genomes or chromosomes, there is typically only 68 one record.""" 69 #This calls a generator function: 70 return EmblScanner(debug=0).parse_records(handle)
71
72 -def ImgtIterator(handle):
73 """Breaks up an IMGT file into SeqRecord objects. 74 75 Every section from the LOCUS line to the terminating // becomes 76 a single SeqRecord with associated annotation and features. 77 78 Note that for genomes or chromosomes, there is typically only 79 one record.""" 80 #This calls a generator function: 81 return _ImgtScanner(debug=0).parse_records(handle)
82
83 -def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
84 """Breaks up a Genbank file into SeqRecord objects for each CDS feature. 85 86 Every section from the LOCUS line to the terminating // can contain 87 many CDS features. These are returned as with the stated amino acid 88 translation sequence (if given). 89 """ 90 #This calls a generator function: 91 return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
92
93 -def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
94 """Breaks up a EMBL file into SeqRecord objects for each CDS feature. 95 96 Every section from the LOCUS line to the terminating // can contain 97 many CDS features. These are returned as with the stated amino acid 98 translation sequence (if given). 99 """ 100 #This calls a generator function: 101 return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
102
103 -def _insdc_feature_position_string(pos, offset=0):
104 """Build a GenBank/EMBL position string (PRIVATE). 105 106 Use offset=1 to add one to convert a start position from python counting. 107 """ 108 if isinstance(pos, SeqFeature.ExactPosition): 109 return "%i" % (pos.position+offset) 110 elif isinstance(pos, SeqFeature.WithinPosition): 111 return "(%i.%i)" % (pos.position + offset, 112 pos.position + pos.extension + offset) 113 elif isinstance(pos, SeqFeature.BetweenPosition): 114 return "(%i^%i)" % (pos.position + offset, 115 pos.position + pos.extension + offset) 116 elif isinstance(pos, SeqFeature.BeforePosition): 117 return "<%i" % (pos.position + offset) 118 elif isinstance(pos, SeqFeature.AfterPosition): 119 return ">%i" % (pos.position + offset) 120 elif isinstance(pos, SeqFeature.OneOfPosition): 121 return "one-of(%s)" \ 122 % ",".join([_insdc_feature_position_string(p,offset) \ 123 for p in pos.position_choices]) 124 elif isinstance(pos, SeqFeature.AbstractPosition): 125 raise NotImplementedError("Please report this as a bug in Biopython.") 126 else: 127 raise ValueError("Expected a SeqFeature position object.")
128 129
130 -def _insdc_location_string_ignoring_strand_and_subfeatures(location, rec_length):
131 if location.ref: 132 ref = "%s:" % location.ref 133 else: 134 ref = "" 135 assert not location.ref_db 136 if isinstance(location.start, SeqFeature.ExactPosition) \ 137 and isinstance(location.end, SeqFeature.ExactPosition) \ 138 and location.start.position == location.end.position: 139 #Special case, for 12:12 return 12^13 140 #(a zero length slice, meaning the point between two letters) 141 if location.end.position == rec_length: 142 #Very special case, for a between position at the end of a 143 #sequence (used on some circular genomes, Bug 3098) we have 144 #N:N so return N^1 145 return "%s%i^1" % (ref, rec_length) 146 else: 147 return "%s%i^%i" % (ref, location.end.position, 148 location.end.position+1) 149 if isinstance(location.start, SeqFeature.ExactPosition) \ 150 and isinstance(location.end, SeqFeature.ExactPosition) \ 151 and location.start.position+1 == location.end.position: 152 #Special case, for 11:12 return 12 rather than 12..12 153 #(a length one slice, meaning a single letter) 154 return "%s%i" % (ref, location.end.position) 155 elif isinstance(location.start, SeqFeature.UnknownPosition) \ 156 or isinstance(location.end, SeqFeature.UnknownPosition): 157 #Special case for features from SwissProt/UniProt files 158 if isinstance(location.start, SeqFeature.UnknownPosition) \ 159 and isinstance(location.end, SeqFeature.UnknownPosition): 160 #import warnings 161 #warnings.warn("Feature with unknown location") 162 #return "?" 163 raise ValueError("Feature with unknown location") 164 elif isinstance(location.start, SeqFeature.UnknownPosition): 165 #Treat the unknown start position as a BeforePosition 166 return "%s<%i..%s" \ 167 % (ref, 168 location.nofuzzy_end, 169 _insdc_feature_position_string(location.end)) 170 else: 171 #Treat the unknown end position as an AfterPosition 172 return "%s%s..>%i" \ 173 % (ref, 174 _insdc_feature_position_string(location.start), 175 location.nofuzzy_start) 176 else: 177 #Typical case, e.g. 12..15 gets mapped to 11:15 178 return ref \ 179 + _insdc_feature_position_string(location.start, +1) \ 180 + ".." + \ 181 _insdc_feature_position_string(location.end)
182
183 -def _insdc_feature_location_string(feature, rec_length):
184 """Build a GenBank/EMBL location string from a SeqFeature (PRIVATE). 185 186 There is a choice of how to show joins on the reverse complement strand, 187 GenBank used "complement(join(1,10),(20,100))" while EMBL used to use 188 "join(complement(20,100),complement(1,10))" instead (but appears to have 189 now adopted the GenBank convention). Notice that the order of the entries 190 is reversed! This function therefore uses the first form. In this situation 191 we expect the parent feature and the two children to all be marked as 192 strand == -1, and in the order 0:10 then 19:100. 193 194 Also need to consider dual-strand examples like these from the Arabidopsis 195 thaliana chloroplast NC_000932: join(complement(69611..69724),139856..140650) 196 gene ArthCp047, GeneID:844801 or its CDS (protein NP_051038.1 GI:7525057) 197 which is further complicated by a splice: 198 join(complement(69611..69724),139856..140087,140625..140650) 199 200 For mixed this mixed strand feature, the parent SeqFeature should have 201 no strand (either 0 or None) while the child features should have either 202 strand +1 or -1 as appropriate, and be listed in the order given here. 203 """ 204 205 if not feature.sub_features: 206 #Non-recursive. 207 #assert feature.location_operator == "", \ 208 # "%s has no subfeatures but location_operator %s" \ 209 # % (repr(feature), feature.location_operator) 210 location = _insdc_location_string_ignoring_strand_and_subfeatures(feature.location, rec_length) 211 if feature.strand == -1: 212 location = "complement(%s)" % location 213 return location 214 # As noted above, treat reverse complement strand features carefully: 215 if feature.strand == -1: 216 for f in feature.sub_features: 217 if f.strand != -1: 218 raise ValueError("Inconsistent strands: %r for parent, %r for child" \ 219 % (feature.strand, f.strand)) 220 return "complement(%s(%s))" \ 221 % (feature.location_operator, 222 ",".join(_insdc_location_string_ignoring_strand_and_subfeatures(f.location, rec_length) \ 223 for f in feature.sub_features)) 224 #if feature.strand == +1: 225 # for f in feature.sub_features: 226 # assert f.strand == +1 227 #This covers typical forward strand features, and also an evil mixed strand: 228 assert feature.location_operator != "" 229 return "%s(%s)" % (feature.location_operator, 230 ",".join([_insdc_feature_location_string(f, rec_length) \ 231 for f in feature.sub_features]))
232 233
234 -class _InsdcWriter(SequentialSequenceWriter):
235 """Base class for GenBank and EMBL writers (PRIVATE).""" 236 MAX_WIDTH = 80 237 QUALIFIER_INDENT = 21 238 QUALIFIER_INDENT_STR = " "*QUALIFIER_INDENT 239 QUALIFIER_INDENT_TMP = " %s " # 21 if %s is empty 240
241 - def _write_feature_qualifier(self, key, value=None, quote=None):
242 if not value: 243 self.handle.write("%s/%s\n" % (self.QUALIFIER_INDENT_STR, key)) 244 return 245 #Quick hack with no line wrapping, may be useful for testing: 246 #self.handle.write('%s/%s="%s"\n' % (self.QUALIFIER_INDENT_STR, key, value)) 247 if quote is None: 248 #Try to mimic unwritten rules about when quotes can be left out: 249 if _is_int_or_long(value): 250 quote = False 251 else: 252 quote = True 253 if quote: 254 line = '%s/%s="%s"' % (self.QUALIFIER_INDENT_STR, key, value) 255 else: 256 line = '%s/%s=%s' % (self.QUALIFIER_INDENT_STR, key, value) 257 if len(line) <= self.MAX_WIDTH: 258 self.handle.write(line+"\n") 259 return 260 while line.lstrip(): 261 if len(line) <= self.MAX_WIDTH: 262 self.handle.write(line+"\n") 263 return 264 #Insert line break... 265 for index in range(min(len(line)-1, self.MAX_WIDTH), 266 self.QUALIFIER_INDENT+1,-1): 267 if line[index] == " " : break 268 if line[index] != " ": 269 #No nice place to break... 270 index = self.MAX_WIDTH 271 assert index <= self.MAX_WIDTH 272 self.handle.write(line[:index] + "\n") 273 line = self.QUALIFIER_INDENT_STR + line[index:].lstrip()
274
275 - def _wrap_location(self, location):
276 """Split a feature location into lines (break at commas).""" 277 #TODO - Rewrite this not to recurse! 278 length = self.MAX_WIDTH - self.QUALIFIER_INDENT 279 if len(location) <= length: 280 return location 281 index = location[:length].rfind(",") 282 if index == -1: 283 #No good place to split (!) 284 import warnings 285 warnings.warn("Couldn't split location:\n%s" % location) 286 return location 287 return location[:index+1] + "\n" + \ 288 self.QUALIFIER_INDENT_STR + self._wrap_location(location[index+1:])
289
290 - def _write_feature(self, feature, record_length):
291 """Write a single SeqFeature object to features table.""" 292 assert feature.type, feature 293 location = _insdc_feature_location_string(feature, record_length) 294 f_type = feature.type.replace(" ","_") 295 line = (self.QUALIFIER_INDENT_TMP % f_type)[:self.QUALIFIER_INDENT] \ 296 + self._wrap_location(location) + "\n" 297 self.handle.write(line) 298 #Now the qualifiers... 299 for key, values in feature.qualifiers.iteritems(): 300 if isinstance(values, list) or isinstance(values, tuple): 301 for value in values: 302 self._write_feature_qualifier(key, value) 303 elif values: 304 #String, int, etc 305 self._write_feature_qualifier(key, values) 306 else: 307 #e.g. a /psuedo entry 308 self._write_feature_qualifier(key)
309
310 - def _get_annotation_str(self, record, key, default=".", just_first=False):
311 """Get an annotation dictionary entry (as a string). 312 313 Some entries are lists, in which case if just_first=True the first entry 314 is returned. If just_first=False (default) this verifies there is only 315 one entry before returning it.""" 316 try: 317 answer = record.annotations[key] 318 except KeyError: 319 return default 320 if isinstance(answer, list): 321 if not just_first : assert len(answer) == 1 322 return str(answer[0]) 323 else: 324 return str(answer)
325
326 - def _split_multi_line(self, text, max_len):
327 """Returns a list of strings. 328 329 Any single words which are too long get returned as a whole line 330 (e.g. URLs) without an exception or warning. 331 """ 332 #TODO - Do the line spliting while preserving white space? 333 text = text.strip() 334 if len(text) <= max_len: 335 return [text] 336 337 words = text.split() 338 text = "" 339 while words and len(text) + 1 + len(words[0]) <= max_len: 340 text += " " + words.pop(0) 341 text = text.strip() 342 #assert len(text) <= max_len 343 answer = [text] 344 while words: 345 text = words.pop(0) 346 while words and len(text) + 1 + len(words[0]) <= max_len: 347 text += " " + words.pop(0) 348 text = text.strip() 349 #assert len(text) <= max_len 350 answer.append(text) 351 assert not words 352 return answer
353
354 - def _split_contig(self, record, max_len):
355 "Returns a list of strings, splits on commas.""" 356 #TODO - Merge this with _write_multi_line method? 357 #It would need the addition of the comma splitting logic... 358 #are there any other cases where that would be sensible? 359 contig = record.annotations.get("contig", "") 360 if isinstance(contig, list) or isinstance(contig, tuple): 361 contig = "".join(contig) 362 contig = self.clean(contig) 363 i = 0 364 answer = [] 365 while contig: 366 if len(contig) > max_len: 367 #Split lines at the commas 368 pos = contig[:max_len-1].rfind(",") 369 if pos == -1: 370 raise ValueError("Could not break up CONTIG") 371 text, contig = contig[:pos+1], contig[pos+1:] 372 else: 373 text, contig = contig, "" 374 answer.append(text) 375 return answer
376
377 -class GenBankWriter(_InsdcWriter):
378 HEADER_WIDTH = 12 379 QUALIFIER_INDENT = 21 380
381 - def _write_single_line(self, tag, text):
382 "Used in the the 'header' of each GenBank record.""" 383 assert len(tag) < self.HEADER_WIDTH 384 if len(text) > self.MAX_WIDTH - self.HEADER_WIDTH: 385 import warnings 386 warnings.warn("Annotation %r too long for %s line" % (text, tag)) 387 self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH), 388 text.replace("\n", " ")))
389
390 - def _write_multi_line(self, tag, text):
391 "Used in the the 'header' of each GenBank record.""" 392 #TODO - Do the line spliting while preserving white space? 393 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 394 lines = self._split_multi_line(text, max_len) 395 self._write_single_line(tag, lines[0]) 396 for line in lines[1:]: 397 self._write_single_line("", line)
398
399 - def _write_multi_entries(self, tag, text_list):
400 #used for DBLINK and any similar later line types. 401 #If the list of strings is empty, nothing is written. 402 for i, text in enumerate(text_list): 403 if i == 0: 404 self._write_single_line(tag, text) 405 else: 406 self._write_single_line("", text)
407
408 - def _get_date(self, record) :
409 default = "01-JAN-1980" 410 try : 411 date = record.annotations["date"] 412 except KeyError : 413 return default 414 #Cope with a list of one string: 415 if isinstance(date, list) and len(date)==1 : 416 date = date[0] 417 #TODO - allow a Python date object 418 if not isinstance(date, basestring) or len(date) != 11 \ 419 or date[2] != "-" or date[6] != "-" \ 420 or not date[:2].isdigit() or not date[7:].isdigit() \ 421 or int(date[:2]) > 31 \ 422 or date[3:6] not in ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", 423 "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"] : 424 #TODO - Check is a valid date (e.g. not 31 Feb) 425 return default 426 return date
427
428 - def _get_data_division(self, record):
429 try: 430 division = record.annotations["data_file_division"] 431 except KeyError: 432 division = "UNK" 433 if division in ["PRI", "ROD", "MAM", "VRT", "INV", "PLN", "BCT", 434 "VRL", "PHG", "SYN", "UNA", "EST", "PAT", "STS", 435 "GSS", "HTG", "HTC", "ENV", "CON"]: 436 #Good, already GenBank style 437 # PRI - primate sequences 438 # ROD - rodent sequences 439 # MAM - other mammalian sequences 440 # VRT - other vertebrate sequences 441 # INV - invertebrate sequences 442 # PLN - plant, fungal, and algal sequences 443 # BCT - bacterial sequences [plus archea] 444 # VRL - viral sequences 445 # PHG - bacteriophage sequences 446 # SYN - synthetic sequences 447 # UNA - unannotated sequences 448 # EST - EST sequences (expressed sequence tags) 449 # PAT - patent sequences 450 # STS - STS sequences (sequence tagged sites) 451 # GSS - GSS sequences (genome survey sequences) 452 # HTG - HTGS sequences (high throughput genomic sequences) 453 # HTC - HTC sequences (high throughput cDNA sequences) 454 # ENV - Environmental sampling sequences 455 # CON - Constructed sequences 456 # 457 #(plus UNK for unknown) 458 pass 459 else: 460 #See if this is in EMBL style: 461 # Division Code 462 # ----------------- ---- 463 # Bacteriophage PHG - common 464 # Environmental Sample ENV - common 465 # Fungal FUN - map to PLN (plants + fungal) 466 # Human HUM - map to PRI (primates) 467 # Invertebrate INV - common 468 # Other Mammal MAM - common 469 # Other Vertebrate VRT - common 470 # Mus musculus MUS - map to ROD (rodent) 471 # Plant PLN - common 472 # Prokaryote PRO - map to BCT (poor name) 473 # Other Rodent ROD - common 474 # Synthetic SYN - common 475 # Transgenic TGN - ??? map to SYN ??? 476 # Unclassified UNC - map to UNK 477 # Viral VRL - common 478 # 479 #(plus XXX for submiting which we can map to UNK) 480 embl_to_gbk = {"FUN":"PLN", 481 "HUM":"PRI", 482 "MUS":"ROD", 483 "PRO":"BCT", 484 "UNC":"UNK", 485 "XXX":"UNK", 486 } 487 try: 488 division = embl_to_gbk[division] 489 except KeyError: 490 division = "UNK" 491 assert len(division)==3 492 return division
493
494 - def _write_the_first_line(self, record):
495 """Write the LOCUS line.""" 496 497 locus = record.name 498 if not locus or locus == "<unknown name>": 499 locus = record.id 500 if not locus or locus == "<unknown id>": 501 locus = self._get_annotation_str(record, "accession", just_first=True) 502 if len(locus) > 16: 503 raise ValueError("Locus identifier %r is too long" % str(locus)) 504 505 if len(record) > 99999999999: 506 #Currently GenBank only officially support up to 350000, but 507 #the length field can take eleven digits 508 raise ValueError("Sequence too long!") 509 510 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 511 a = Alphabet._get_base_alphabet(record.seq.alphabet) 512 if not isinstance(a, Alphabet.Alphabet): 513 raise TypeError("Invalid alphabet") 514 elif isinstance(a, Alphabet.ProteinAlphabet): 515 units = "aa" 516 elif isinstance(a, Alphabet.NucleotideAlphabet): 517 units = "bp" 518 else: 519 #Must be something like NucleotideAlphabet or 520 #just the generic Alphabet (default for fasta files) 521 raise ValueError("Need a Nucleotide or Protein alphabet") 522 523 #Get the molecule type 524 #TODO - record this explicitly in the parser? 525 if isinstance(a, Alphabet.ProteinAlphabet): 526 mol_type = "" 527 elif isinstance(a, Alphabet.DNAAlphabet): 528 mol_type = "DNA" 529 elif isinstance(a, Alphabet.RNAAlphabet): 530 mol_type = "RNA" 531 else: 532 #Must be something like NucleotideAlphabet or 533 #just the generic Alphabet (default for fasta files) 534 raise ValueError("Need a DNA, RNA or Protein alphabet") 535 536 division = self._get_data_division(record) 537 538 assert len(units) == 2 539 assert len(division) == 3 540 #TODO - date 541 #TODO - mol_type 542 line = "LOCUS %s %s %s %s %s %s\n" \ 543 % (locus.ljust(16), 544 str(len(record)).rjust(11), 545 units, 546 mol_type.ljust(6), 547 division, 548 self._get_date(record)) 549 assert len(line) == 79+1, repr(line) #plus one for new line 550 551 assert line[12:28].rstrip() == locus, \ 552 'LOCUS line does not contain the locus at the expected position:\n' + line 553 assert line[28:29] == " " 554 assert line[29:40].lstrip() == str(len(record)), \ 555 'LOCUS line does not contain the length at the expected position:\n' + line 556 557 #Tests copied from Bio.GenBank.Scanner 558 assert line[40:44] in [' bp ', ' aa '] , \ 559 'LOCUS line does not contain size units at expected position:\n' + line 560 assert line[44:47] in [' ', 'ss-', 'ds-', 'ms-'], \ 561 'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line 562 assert line[47:54].strip() == "" \ 563 or line[47:54].strip().find('DNA') != -1 \ 564 or line[47:54].strip().find('RNA') != -1, \ 565 'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line 566 assert line[54:55] == ' ', \ 567 'LOCUS line does not contain space at position 55:\n' + line 568 assert line[55:63].strip() in ['', 'linear', 'circular'], \ 569 'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line 570 assert line[63:64] == ' ', \ 571 'LOCUS line does not contain space at position 64:\n' + line 572 assert line[67:68] == ' ', \ 573 'LOCUS line does not contain space at position 68:\n' + line 574 assert line[70:71] == '-', \ 575 'LOCUS line does not contain - at position 71 in date:\n' + line 576 assert line[74:75] == '-', \ 577 'LOCUS line does not contain - at position 75 in date:\n' + line 578 579 self.handle.write(line)
580
581 - def _write_references(self, record):
582 number = 0 583 for ref in record.annotations["references"]: 584 if not isinstance(ref, SeqFeature.Reference): 585 continue 586 number += 1 587 data = str(number) 588 #TODO - support more complex record reference locations? 589 if ref.location and len(ref.location)==1: 590 a = Alphabet._get_base_alphabet(record.seq.alphabet) 591 if isinstance(a, Alphabet.ProteinAlphabet): 592 units = "residues" 593 else: 594 units = "bases" 595 data += " (%s %i to %i)" % (units, 596 ref.location[0].nofuzzy_start+1, 597 ref.location[0].nofuzzy_end) 598 self._write_single_line("REFERENCE", data) 599 if ref.authors: 600 #We store the AUTHORS data as a single string 601 self._write_multi_line(" AUTHORS", ref.authors) 602 if ref.consrtm: 603 #We store the consortium as a single string 604 self._write_multi_line(" CONSRTM", ref.consrtm) 605 if ref.title: 606 #We store the title as a single string 607 self._write_multi_line(" TITLE", ref.title) 608 if ref.journal: 609 #We store this as a single string - holds the journal name, 610 #volume, year, and page numbers of the citation 611 self._write_multi_line(" JOURNAL", ref.journal) 612 if ref.medline_id: 613 #This line type is obsolete and was removed from the GenBank 614 #flatfile format in April 2005. Should we write it? 615 #Note this has a two space indent: 616 self._write_multi_line(" MEDLINE", ref.medline_id) 617 if ref.pubmed_id: 618 #Note this has a THREE space indent: 619 self._write_multi_line(" PUBMED", ref.pubmed_id) 620 if ref.comment: 621 self._write_multi_line(" REMARK", ref.comment)
622 623
624 - def _write_comment(self, record):
625 #This is a bit complicated due to the range of possible 626 #ways people might have done their annotation... 627 #Currently the parser uses a single string with newlines. 628 #A list of lines is also reasonable. 629 #A single (long) string is perhaps the most natural of all. 630 #This means we may need to deal with line wrapping. 631 comment = record.annotations["comment"] 632 if isinstance(comment, basestring): 633 lines = comment.split("\n") 634 elif isinstance(comment, list) or isinstance(comment, tuple): 635 lines = comment 636 else: 637 raise ValueError("Could not understand comment annotation") 638 self._write_multi_line("COMMENT", lines[0]) 639 for line in lines[1:]: 640 self._write_multi_line("", line)
641
642 - def _write_contig(self, record):
643 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 644 lines = self._split_contig(record, max_len) 645 self._write_single_line("CONTIG", lines[0]) 646 for text in lines[1:] : 647 self._write_single_line("", text)
648
649 - def _write_sequence(self, record):
650 #Loosely based on code from Howard Salis 651 #TODO - Force lower case? 652 LETTERS_PER_LINE = 60 653 SEQUENCE_INDENT = 9 654 655 if isinstance(record.seq, UnknownSeq): 656 #We have already recorded the length, and there is no need 657 #to record a long sequence of NNNNNNN...NNN or whatever. 658 if "contig" in record.annotations: 659 self._write_contig(record) 660 else: 661 self.handle.write("ORIGIN\n") 662 return 663 664 #Catches sequence being None: 665 data = self._get_seq_string(record).lower() 666 seq_len = len(data) 667 self.handle.write("ORIGIN\n") 668 for line_number in range(0, seq_len, LETTERS_PER_LINE): 669 self.handle.write(str(line_number+1).rjust(SEQUENCE_INDENT)) 670 for words in range(line_number, 671 min(line_number+LETTERS_PER_LINE, seq_len), 10): 672 self.handle.write(" %s" % data[words:words+10]) 673 self.handle.write("\n")
674
675 - def write_record(self, record):
676 """Write a single record to the output file.""" 677 handle = self.handle 678 self._write_the_first_line(record) 679 680 accession = self._get_annotation_str(record, "accession", 681 record.id.split(".", 1)[0], 682 just_first=True) 683 acc_with_version = accession 684 if record.id.startswith(accession+"."): 685 try: 686 acc_with_version = "%s.%i" \ 687 % (accession, 688 int(record.id.split(".", 1)[1])) 689 except ValueError: 690 pass 691 gi = self._get_annotation_str(record, "gi", just_first=True) 692 693 descr = record.description 694 if descr == "<unknown description>" : descr = "." 695 self._write_multi_line("DEFINITION", descr) 696 697 self._write_single_line("ACCESSION", accession) 698 if gi != ".": 699 self._write_single_line("VERSION", "%s GI:%s" \ 700 % (acc_with_version, gi)) 701 else: 702 self._write_single_line("VERSION", "%s" % (acc_with_version)) 703 704 #The NCBI only expect two types of link so far, 705 #e.g. "Project:28471" and "Trace Assembly Archive:123456" 706 #TODO - Filter the dbxrefs list to just these? 707 self._write_multi_entries("DBLINK", record.dbxrefs) 708 709 try: 710 #List of strings 711 #Keywords should be given separated with semi colons, 712 keywords = "; ".join(record.annotations["keywords"]) 713 #with a trailing period: 714 if not keywords.endswith(".") : 715 keywords += "." 716 except KeyError: 717 #If no keywords, there should be just a period: 718 keywords = "." 719 self._write_multi_line("KEYWORDS", keywords) 720 721 if "segment" in record.annotations: 722 #Deal with SEGMENT line found only in segmented records, 723 #e.g. AH000819 724 segment = record.annotations["segment"] 725 if isinstance(segment, list): 726 assert len(segment)==1, segment 727 segment = segment[0] 728 self._write_single_line("SEGMENT", segment) 729 730 self._write_multi_line("SOURCE", \ 731 self._get_annotation_str(record, "source")) 732 #The ORGANISM line MUST be a single line, as any continuation is the taxonomy 733 org = self._get_annotation_str(record, "organism") 734 if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH: 735 org = org[:self.MAX_WIDTH - self.HEADER_WIDTH-4]+"..." 736 self._write_single_line(" ORGANISM", org) 737 try: 738 #List of strings 739 #Taxonomy should be given separated with semi colons, 740 taxonomy = "; ".join(record.annotations["taxonomy"]) 741 #with a trailing period: 742 if not taxonomy.endswith(".") : 743 taxonomy += "." 744 except KeyError: 745 taxonomy = "." 746 self._write_multi_line("", taxonomy) 747 748 if "references" in record.annotations: 749 self._write_references(record) 750 751 if "comment" in record.annotations: 752 self._write_comment(record) 753 754 handle.write("FEATURES Location/Qualifiers\n") 755 rec_length = len(record) 756 for feature in record.features: 757 self._write_feature(feature, rec_length) 758 self._write_sequence(record) 759 handle.write("//\n")
760
761 -class EmblWriter(_InsdcWriter):
762 HEADER_WIDTH = 5 763 QUALIFIER_INDENT = 21 764 QUALIFIER_INDENT_STR = "FT" + " "*(QUALIFIER_INDENT-2) 765 QUALIFIER_INDENT_TMP = "FT %s " # 21 if %s is empty 766 FEATURE_HEADER = "FH Key Location/Qualifiers\n" 767
768 - def _write_contig(self, record):
769 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 770 lines = self._split_contig(record, max_len) 771 for text in lines: 772 self._write_single_line("CO", text)
773
774 - def _write_sequence(self, record):
775 LETTERS_PER_BLOCK = 10 776 BLOCKS_PER_LINE = 6 777 LETTERS_PER_LINE = LETTERS_PER_BLOCK * BLOCKS_PER_LINE 778 POSITION_PADDING = 10 779 handle = self.handle #save looking up this multiple times 780 781 if isinstance(record.seq, UnknownSeq): 782 #We have already recorded the length, and there is no need 783 #to record a long sequence of NNNNNNN...NNN or whatever. 784 if "contig" in record.annotations: 785 self._write_contig(record) 786 else: 787 #TODO - Can the sequence just be left out as in GenBank files? 788 handle.write("SQ \n") 789 return 790 791 #Catches sequence being None 792 data = self._get_seq_string(record).lower() 793 seq_len = len(data) 794 795 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 796 a = Alphabet._get_base_alphabet(record.seq.alphabet) 797 if isinstance(a, Alphabet.DNAAlphabet): 798 #TODO - What if we have RNA? 799 a_count = data.count('A') + data.count('a') 800 c_count = data.count('C') + data.count('c') 801 g_count = data.count('G') + data.count('g') 802 t_count = data.count('T') + data.count('t') 803 other = seq_len - (a_count + c_count + g_count + t_count) 804 handle.write("SQ Sequence %i BP; %i A; %i C; %i G; %i T; %i other;\n" \ 805 % (seq_len, a_count, c_count, g_count, t_count, other)) 806 else: 807 handle.write("SQ \n") 808 809 for line_number in range(0, seq_len // LETTERS_PER_LINE): 810 handle.write(" ") #Just four, not five 811 for block in range(BLOCKS_PER_LINE) : 812 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 813 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK])) 814 handle.write(str((line_number+1) 815 *LETTERS_PER_LINE).rjust(POSITION_PADDING)) 816 handle.write("\n") 817 if seq_len % LETTERS_PER_LINE: 818 #Final (partial) line 819 line_number = (seq_len // LETTERS_PER_LINE) 820 handle.write(" ") #Just four, not five 821 for block in range(BLOCKS_PER_LINE) : 822 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 823 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK]).ljust(11)) 824 handle.write(str(seq_len).rjust(POSITION_PADDING)) 825 handle.write("\n")
826
827 - def _write_single_line(self, tag, text):
828 assert len(tag)==2 829 line = tag+" "+text 830 if len(text) > self.MAX_WIDTH: 831 import warnings 832 warnings.warn("Line %r too long" % line) 833 self.handle.write(line+"\n")
834
835 - def _write_multi_line(self, tag, text):
836 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 837 lines = self._split_multi_line(text, max_len) 838 for line in lines : 839 self._write_single_line(tag, line)
840
841 - def _write_the_first_lines(self, record):
842 """Write the ID and AC lines.""" 843 if "." in record.id and record.id.rsplit(".", 1)[1].isdigit(): 844 version = "SV " + record.id.rsplit(".", 1)[1] 845 accession = self._get_annotation_str(record, "accession", 846 record.id.rsplit(".", 1)[0], 847 just_first=True) 848 else : 849 version = "" 850 accession = self._get_annotation_str(record, "accession", 851 record.id, 852 just_first=True) 853 854 if ";" in accession : 855 raise ValueError("Cannot have semi-colon in EMBL accession, %s" \ 856 % repr(str(accession))) 857 if " " in accession : 858 #This is out of practicallity... might it be allowed? 859 raise ValueError("Cannot have spaces in EMBL accession, %s" \ 860 % repr(str(accession))) 861 862 #Get the molecule type 863 #TODO - record this explicitly in the parser? 864 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 865 a = Alphabet._get_base_alphabet(record.seq.alphabet) 866 if not isinstance(a, Alphabet.Alphabet): 867 raise TypeError("Invalid alphabet") 868 elif isinstance(a, Alphabet.DNAAlphabet): 869 mol_type = "DNA" 870 units = "BP" 871 elif isinstance(a, Alphabet.RNAAlphabet): 872 mol_type = "RNA" 873 units = "BP" 874 elif isinstance(a, Alphabet.ProteinAlphabet): 875 mol_type = "PROTEIN" 876 units = "AA" 877 else: 878 #Must be something like NucleotideAlphabet 879 raise ValueError("Need a DNA, RNA or Protein alphabet") 880 881 #Get the taxonomy division 882 division = self._get_data_division(record) 883 884 #TODO - Full ID line 885 handle = self.handle 886 #ID <1>; SV <2>; <3>; <4>; <5>; <6>; <7> BP. 887 #1. Primary accession number 888 #2. Sequence version number 889 #3. Topology: 'circular' or 'linear' 890 #4. Molecule type 891 #5. Data class 892 #6. Taxonomic division 893 #7. Sequence length 894 self._write_single_line("ID", "%s; %s; ; %s; ; %s; %i %s." \ 895 % (accession, version, mol_type, 896 division, len(record), units)) 897 handle.write("XX\n") 898 self._write_single_line("AC", accession+";") 899 handle.write("XX\n")
900
901 - def _get_data_division(self, record):
902 try: 903 division = record.annotations["data_file_division"] 904 except KeyError: 905 division = "UNC" 906 if division in ["PHG", "ENV", "FUN", "HUM", "INV", "MAM", "VRT", 907 "MUS", "PLN", "PRO", "ROD", "SYN", "TGN", "UNC", 908 "VRL", "XXX"]: 909 #Good, already EMBL style 910 # Division Code 911 # ----------------- ---- 912 # Bacteriophage PHG 913 # Environmental Sample ENV 914 # Fungal FUN 915 # Human HUM 916 # Invertebrate INV 917 # Other Mammal MAM 918 # Other Vertebrate VRT 919 # Mus musculus MUS 920 # Plant PLN 921 # Prokaryote PRO 922 # Other Rodent ROD 923 # Synthetic SYN 924 # Transgenic TGN 925 # Unclassified UNC (i.e. unknown) 926 # Viral VRL 927 # 928 #(plus XXX used for submiting data to EMBL) 929 pass 930 else: 931 #See if this is in GenBank style & can be converted. 932 #Generally a problem as the GenBank groups are wider 933 #than those of EMBL. Note that GenBank use "BCT" for 934 #both bacteria and acherea thus this maps to EMBL's 935 #"PRO" nicely. 936 gbk_to_embl = {"BCT":"PRO", 937 "UNK":"UNC", 938 } 939 try: 940 division = gbk_to_embl[division] 941 except KeyError: 942 division = "UNC" 943 assert len(division)==3 944 return division
945
946 - def _write_references(self, record):
947 #The order should be RN, RC, RP, RX, RG, RA, RT, RL 948 number = 0 949 for ref in record.annotations["references"]: 950 if not isinstance(ref, SeqFeature.Reference): 951 continue 952 number += 1 953 self._write_single_line("RN", "[%i]" % number) 954 #TODO - support for RC line (needed in parser too) 955 #TODO - support more complex record reference locations? 956 if ref.location and len(ref.location)==1: 957 self._write_single_line("RP", "%i-%i" % (ref.location[0].nofuzzy_start+1, 958 ref.location[0].nofuzzy_end)) 959 #TODO - record any DOI or AGRICOLA identifier in the reference object? 960 if ref.pubmed_id: 961 self._write_single_line("RX", "PUBMED; %s." % ref.pubmed_id) 962 if ref.consrtm: 963 self._write_single_line("RG", "%s" % ref.consrtm) 964 if ref.authors: 965 #We store the AUTHORS data as a single string 966 self._write_multi_line("RA", ref.authors+";") 967 if ref.title: 968 #We store the title as a single string 969 self._write_multi_line("RT", '"%s";' % ref.title) 970 if ref.journal: 971 #We store this as a single string - holds the journal name, 972 #volume, year, and page numbers of the citation 973 self._write_multi_line("RL", ref.journal) 974 self.handle.write("XX\n")
975
976 - def _write_comment(self, record):
977 #This is a bit complicated due to the range of possible 978 #ways people might have done their annotation... 979 #Currently the parser uses a single string with newlines. 980 #A list of lines is also reasonable. 981 #A single (long) string is perhaps the most natural of all. 982 #This means we may need to deal with line wrapping. 983 comment = record.annotations["comment"] 984 if isinstance(comment, basestring): 985 lines = comment.split("\n") 986 elif isinstance(comment, list) or isinstance(comment, tuple): 987 lines = comment 988 else: 989 raise ValueError("Could not understand comment annotation") 990 #TODO - Merge this with the GenBank comment code? 991 if not lines : return 992 for line in lines: 993 self._write_multi_line("CC", line) 994 self.handle.write("XX\n")
995
996 - def write_record(self, record):
997 """Write a single record to the output file.""" 998 999 handle = self.handle 1000 self._write_the_first_lines(record) 1001 1002 #PR line (0 or 1 lines only), project identifier 1003 for xref in record.dbxrefs: 1004 if xref.startswith("Project:"): 1005 self._write_single_line("PR", xref+";") 1006 handle.write("XX\n") 1007 break 1008 1009 #TODO - DT lines (date) 1010 1011 descr = record.description 1012 if descr == "<unknown description>" : descr = "." 1013 self._write_multi_line("DE", descr) 1014 handle.write("XX\n") 1015 1016 #Should this be "source" or "organism"? 1017 self._write_multi_line("OS", self._get_annotation_str(record, "organism")) 1018 try: 1019 #List of strings 1020 taxonomy = "; ".join(record.annotations["taxonomy"]) + "." 1021 except KeyError: 1022 taxonomy = "." 1023 self._write_multi_line("OC", taxonomy) 1024 handle.write("XX\n") 1025 1026 if "references" in record.annotations: 1027 self._write_references(record) 1028 1029 if "comment" in record.annotations: 1030 self._write_comment(record) 1031 1032 handle.write(self.FEATURE_HEADER) 1033 rec_length = len(record) 1034 for feature in record.features: 1035 self._write_feature(feature, rec_length) 1036 1037 self._write_sequence(record) 1038 handle.write("//\n")
1039
1040 -class ImgtWriter(EmblWriter):
1041 HEADER_WIDTH = 5 1042 QUALIFIER_INDENT = 25 # Not 21 as in EMBL 1043 QUALIFIER_INDENT_STR = "FT" + " "*(QUALIFIER_INDENT-2) 1044 QUALIFIER_INDENT_TMP = "FT %s " # 25 if %s is empty 1045 FEATURE_HEADER = "FH Key Location/Qualifiers\n"
1046 1047 if __name__ == "__main__": 1048 print "Quick self test" 1049 import os 1050 from StringIO import StringIO 1051
1052 - def compare_record(old, new):
1053 if old.id != new.id and old.name != new.name: 1054 raise ValueError("'%s' or '%s' vs '%s' or '%s' records" \ 1055 % (old.id, old.name, new.id, new.name)) 1056 if len(old.seq) != len(new.seq): 1057 raise ValueError("%i vs %i" % (len(old.seq), len(new.seq))) 1058 if str(old.seq).upper() != str(new.seq).upper(): 1059 if len(old.seq) < 200: 1060 raise ValueError("'%s' vs '%s'" % (old.seq, new.seq)) 1061 else: 1062 raise ValueError("'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100])) 1063 if old.features and new.features: 1064 return compare_features(old.features, new.features) 1065 #Just insist on at least one word in common: 1066 if (old.description or new.description) \ 1067 and not set(old.description.split()).intersection(new.description.split()): 1068 raise ValueError("%s versus %s" \ 1069 % (repr(old.description), repr(new.description))) 1070 #TODO - check annotation 1071 if "contig" in old.annotations: 1072 assert old.annotations["contig"] == \ 1073 new.annotations["contig"] 1074 return True
1075
1076 - def compare_records(old_list, new_list):
1077 """Check two lists of SeqRecords agree, raises a ValueError if mismatch.""" 1078 if len(old_list) != len(new_list): 1079 raise ValueError("%i vs %i records" % (len(old_list), len(new_list))) 1080 for old, new in zip(old_list, new_list): 1081 if not compare_record(old, new): 1082 return False 1083 return True
1084
1085 - def compare_feature(old, new, ignore_sub_features=False):
1086 """Check two SeqFeatures agree.""" 1087 if old.type != new.type: 1088 raise ValueError("Type %s versus %s" % (old.type, new.type)) 1089 if old.location.nofuzzy_start != new.location.nofuzzy_start \ 1090 or old.location.nofuzzy_end != new.location.nofuzzy_end: 1091 raise ValueError("%s versus %s:\n%s\nvs:\n%s" \ 1092 % (old.location, new.location, str(old), str(new))) 1093 if old.strand != new.strand: 1094 raise ValueError("Different strand:\n%s\nvs:\n%s" % (str(old), str(new))) 1095 if old.location.start != new.location.start: 1096 raise ValueError("Start %s versus %s:\n%s\nvs:\n%s" \ 1097 % (old.location.start, new.location.start, str(old), str(new))) 1098 if old.location.end != new.location.end: 1099 raise ValueError("End %s versus %s:\n%s\nvs:\n%s" \ 1100 % (old.location.end, new.location.end, str(old), str(new))) 1101 if not ignore_sub_features: 1102 if len(old.sub_features) != len(new.sub_features): 1103 raise ValueError("Different sub features") 1104 for a, b in zip(old.sub_features, new.sub_features): 1105 if not compare_feature(a, b): 1106 return False 1107 #This only checks key shared qualifiers 1108 #Would a white list be easier? 1109 #for key in ["name", "gene", "translation", "codon_table", "codon_start", "locus_tag"]: 1110 for key in set(old.qualifiers).intersection(new.qualifiers): 1111 if key in ["db_xref", "protein_id", "product", "note"]: 1112 #EMBL and GenBank files are use different references/notes/etc 1113 continue 1114 if old.qualifiers[key] != new.qualifiers[key]: 1115 raise ValueError("Qualifier mis-match for %s:\n%s\n%s" \ 1116 % (key, old.qualifiers[key], new.qualifiers[key])) 1117 return True
1118
1119 - def compare_features(old_list, new_list, ignore_sub_features=False):
1120 """Check two lists of SeqFeatures agree, raises a ValueError if mismatch.""" 1121 if len(old_list) != len(new_list): 1122 raise ValueError("%i vs %i features" % (len(old_list), len(new_list))) 1123 for old, new in zip(old_list, new_list): 1124 #This assumes they are in the same order 1125 if not compare_feature(old, new, ignore_sub_features): 1126 return False 1127 return True
1128
1129 - def check_genbank_writer(records):
1130 handle = StringIO() 1131 GenBankWriter(handle).write_file(records) 1132 handle.seek(0) 1133 1134 records2 = list(GenBankIterator(handle)) 1135 assert compare_records(records, records2)
1136
1137 - def check_embl_writer(records):
1138 handle = StringIO() 1139 try: 1140 EmblWriter(handle).write_file(records) 1141 except ValueError, err: 1142 print err 1143 return 1144 handle.seek(0) 1145 1146 records2 = list(EmblIterator(handle)) 1147 assert compare_records(records, records2)
1148 1149 for filename in os.listdir("../../Tests/GenBank"): 1150 if not filename.endswith(".gbk") and not filename.endswith(".gb"): 1151 continue 1152 print filename 1153 1154 handle = open("../../Tests/GenBank/%s" % filename) 1155 records = list(GenBankIterator(handle)) 1156 handle.close() 1157 1158 check_genbank_writer(records) 1159 check_embl_writer(records) 1160 1161 for filename in os.listdir("../../Tests/EMBL"): 1162 if not filename.endswith(".embl"): 1163 continue 1164 print filename 1165 1166 handle = open("../../Tests/EMBL/%s" % filename) 1167 records = list(EmblIterator(handle)) 1168 handle.close() 1169 1170 check_genbank_writer(records) 1171 check_embl_writer(records) 1172 1173 from Bio import SeqIO 1174 for filename in os.listdir("../../Tests/SwissProt"): 1175 if not filename.startswith("sp"): 1176 continue 1177 print filename 1178 1179 handle = open("../../Tests/SwissProt/%s" % filename) 1180 records = list(SeqIO.parse(handle, "swiss")) 1181 handle.close() 1182 1183 check_genbank_writer(records) 1184