Package Bio :: Package SeqIO :: Module SeqXmlIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.SeqXmlIO

  1  # Copyright 2010 by Thomas Schmitt. 
  2  # All rights reserved. 
  3  # 
  4  # This module is for reading and writing SeqXML format files as 
  5  # SeqRecord objects, and is expected to be used via the Bio.SeqIO API. 
  6  """Bio.SeqIO support for the "seqxml" file format, SeqXML. 
  7   
  8  You are expected to use this module via the Bio.SeqIO functions. 
  9   
 10  SeqXML is a lightweight XML format which is supposed be an alternative for 
 11  FASTA files. For more Information see http://www.seqXML.org and Schmitt et al 
 12  (2011), http://dx.doi.org/10.1093/bib/bbr025 
 13  """ 
 14   
 15  from xml.sax.saxutils import XMLGenerator 
 16  from xml.sax.xmlreader import AttributesImpl 
 17  from xml.dom import pulldom 
 18  from xml.sax import SAXParseException 
 19   
 20  from Bio import Alphabet 
 21  from Bio.Seq import Seq 
 22  from Bio.Seq import UnknownSeq 
 23  from Bio.SeqRecord import SeqRecord 
 24  from Interfaces import SequentialSequenceWriter 
 25   
 26   
27 -class XMLRecordIterator:
28 """Base class for building iterators for record style XML formats. 29 30 It is assumed that all information for one record can be found within a 31 record element or above. Two types of methods are called when the start 32 tag of an element is reached. To receive only the attributes of an 33 element before its end tag is reached implement _attr_TAGNAME. 34 To get an element and its children as a DOM tree implement _elem_TAGNAME. 35 Everything that is part of the DOM tree will not trigger any further 36 method calls. 37 """ 38
39 - def __init__(self, handle, recordTag, namespace=None):
40 """Creating the object and initializing the XML parser.""" 41 42 self._recordTag = recordTag 43 self._namespace = namespace 44 self._events = pulldom.parse(handle)
45
46 - def __iter__(self):
47 """Iterate over the records in the XML file. 48 Returns the last parsed record.""" 49 50 record = None 51 try: 52 for event, node in self._events: 53 54 if event == "START_ELEMENT" and node.namespaceURI == self._namespace: 55 56 if node.localName == self._recordTag: 57 #create an empty SeqRecord 58 record = SeqRecord('', id='') 59 60 #call matching methods with attributes only 61 if hasattr(self, "_attr_" + node.localName): 62 getattr(self, "_attr_" + node.localName)( 63 self._attributes(node), record) 64 65 #call matching methods with DOM tree 66 if hasattr(self, "_elem_" + node.localName): 67 #read the element and all nested elements into a DOM tree 68 self._events.expandNode(node) 69 node.normalize() 70 71 getattr(self, "_elem_" + node.localName)(node, record) 72 73 elif event == "END_ELEMENT" and node.namespaceURI == self._namespace and node.localName == self._recordTag: 74 yield record 75 76 except SAXParseException, e: 77 78 if e.getLineNumber() == 1 and e.getColumnNumber() == 0: 79 #empty file 80 pass 81 else: 82 import os 83 if e.getLineNumber() == 1 and e.getColumnNumber() == 1 \ 84 and os.name == "java": 85 #empty file, see http://bugs.jython.org/issue1774 86 pass 87 else: 88 raise
89
90 - def _attributes(self, node):
91 """Return the attributes of a DOM node as dictionary.""" 92 93 return dict((node.attributes.item(i).name, node.attributes.item(i).value) for i in xrange(node.attributes.length))
94 95
96 -class SeqXmlIterator(XMLRecordIterator):
97 """Breaks seqXML file into SeqRecords. 98 99 Assumes valid seqXML please validate beforehand.""" 100
101 - def __init__(self, handle):
102 """Create the object.""" 103 XMLRecordIterator.__init__(self, handle, "entry") 104 105 self._source = None 106 self._source_version = None 107 self._version = None 108 self._speciesName = None 109 self._ncbiTaxId = None
110
111 - def _attr_seqXML(self, attr_dict, record):
112 """Parse the document metadata.""" 113 114 if "source" in attr_dict: 115 self._source = attr_dict["source"] 116 if "sourceVersion" in attr_dict: 117 self._source_version = attr_dict["sourceVersion"] 118 if "version" in attr_dict: 119 self._version = attr_dict["seqXMLversion"] 120 if "ncbiTaxID" in attr_dict: 121 self._ncbiTaxId = attr_dict["ncbiTaxID"] 122 if "speciesName" in attr_dict: 123 self._speciesName = attr_dict["speciesName"]
124
125 - def _attr_property(self, attr_dict, record):
126 """Parse key value pair properties and store them as annotations.""" 127 128 if "name" not in attr_dict: 129 raise ValueError("Malformed property element.") 130 131 value = attr_dict.get("value", None) 132 133 if attr_dict["name"] not in record.annotations: 134 record.annotations[attr_dict["name"]] = value 135 elif isinstance(record.annotations[attr_dict["name"]], list): 136 record.annotations[attr_dict["name"]].append(value) 137 else: 138 record.annotations[attr_dict["name"]] = [ 139 record.annotations[attr_dict["name"]], value]
140
141 - def _attr_species(self, attr_dict, record):
142 """Parse the species information.""" 143 144 if "name" not in attr_dict or "ncbiTaxID" not in attr_dict: 145 raise ValueError("Malformed species element!") 146 147 #the keywords for the species annotation are taken from SwissIO 148 record.annotations["organism"] = attr_dict["name"] 149 record.annotations["ncbi_taxid"] = attr_dict["ncbiTaxID"]
150
151 - def _attr_entry(self, attr_dict, record):
152 """New entry set id and the optional entry source.""" 153 154 if "id" not in attr_dict: 155 raise ValueError("Malformed entry! Identifier is missing.") 156 157 record.id = attr_dict["id"] 158 if "source" in attr_dict: 159 record.annotations["source"] = attr_dict["source"] 160 elif self._source is not None: 161 record.annotations["source"] = self._source 162 163 #initialize entry with global species definition 164 #the keywords for the species annotation are taken from SwissIO 165 if self._ncbiTaxId is not None: 166 record.annotations["ncbi_taxid"] = self._ncbiTaxId 167 if self._speciesName is not None: 168 record.annotations["organism"] = self._speciesName
169
170 - def _elem_DNAseq(self, node, record):
171 """Parse DNA sequence.""" 172 173 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 174 raise ValueError("Sequence length should be greater than 0.") 175 176 record.seq = Seq(node.firstChild.data, Alphabet.generic_dna)
177
178 - def _elem_RNAseq(self, node, record):
179 """Parse RNA sequence.""" 180 181 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 182 raise ValueError("Sequence length should be greater than 0.") 183 184 record.seq = Seq(node.firstChild.data, Alphabet.generic_rna)
185
186 - def _elem_AAseq(self, node, record):
187 """Parse protein sequence.""" 188 189 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 190 raise ValueError("Sequence length should be greater than 0.") 191 192 record.seq = Seq(node.firstChild.data, Alphabet.generic_protein)
193
194 - def _elem_description(self, node, record):
195 """Parse the description.""" 196 197 if node.hasChildNodes() and len(node.firstChild.data) > 0: 198 record.description = node.firstChild.data
199
200 - def _attr_DBRef(self, attr_dict, record):
201 """Parse a database cross reference""" 202 203 if "source" not in attr_dict or "id" not in attr_dict: 204 raise ValueError("Invalid DB cross reference.") 205 206 if "%s:%s" % (attr_dict["source"], attr_dict["id"]) not in record.dbxrefs: 207 record.dbxrefs.append( 208 "%s:%s" % (attr_dict["source"], attr_dict["id"]))
209 210
211 -class SeqXmlWriter(SequentialSequenceWriter):
212 """Writes SeqRecords into seqXML file. 213 214 SeqXML requires the sequence alphabet be explicitly RNA, DNA or protein, 215 i.e. an instance or subclass of Bio.Alphapet.RNAAlphabet, 216 Bio.Alphapet.DNAAlphabet or Bio.Alphapet.ProteinAlphabet. 217 """ 218
219 - def __init__(self, handle, source=None, source_version=None, 220 species=None, ncbiTaxId=None):
221 """Create Object and start the xml generator.""" 222 223 SequentialSequenceWriter.__init__(self, handle) 224 225 self.xml_generator = XMLGenerator(handle, "utf-8") 226 self.xml_generator.startDocument() 227 self.source = source 228 self.source_version = source_version 229 self.species = species 230 self.ncbiTaxId = ncbiTaxId
231
232 - def write_header(self):
233 """Write root node with document metadata.""" 234 SequentialSequenceWriter.write_header(self) 235 236 attrs = {"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", 237 "xsi:noNamespaceSchemaLocation": "http://www.seqxml.org/0.4/seqxml.xsd", 238 "seqXMLversion": "0.4"} 239 240 if self.source is not None: 241 attrs["source"] = self.source 242 if self.source_version is not None: 243 attrs["sourceVersion"] = self.source_ersion 244 if self.species is not None: 245 if not isinstance(species, basestring): 246 raise TypeError("species should be of type string") 247 attrs["speciesName"] = self.species 248 if self.ncbiTaxId is not None: 249 if not isinstance(self.ncbiTaxId, (basestring, int)): 250 raise TypeError("ncbiTaxID should be of type string or int") 251 attrs["ncbiTaxID"] = self.ncbiTaxId 252 253 self.xml_generator.startElement("seqXML", AttributesImpl(attrs))
254
255 - def write_record(self, record):
256 """Write one record.""" 257 258 if not record.id or record.id == "<unknown id>": 259 raise ValueError("SeqXML requires identifier") 260 261 if not isinstance(record.id, basestring): 262 raise TypeError("Identifier should be of type string") 263 264 attrb = {"id": record.id} 265 266 if "source" in record.annotations and self.source != record.annotations["source"]: 267 if not isinstance(record.annotations["source"], basestring): 268 raise TypeError("source should be of type string") 269 attrb["source"] = record.annotations["source"] 270 271 self.xml_generator.startElement("entry", AttributesImpl(attrb)) 272 self._write_species(record) 273 self._write_description(record) 274 self._write_seq(record) 275 self._write_dbxrefs(record) 276 self._write_properties(record) 277 self.xml_generator.endElement("entry")
278 286
287 - def _write_species(self, record):
288 """Write the species if given.""" 289 290 if "organism" in record.annotations and "ncbi_taxid" in record.annotations: 291 292 if not isinstance(record.annotations["organism"], basestring): 293 raise TypeError("organism should be of type string") 294 295 if not isinstance(record.annotations["ncbi_taxid"], (basestring, int)): 296 raise TypeError("ncbiTaxID should be of type string or int") 297 298 #The local species definition is only written if it differs from the global species definition 299 if record.annotations["organism"] != self.species or record.annotations["ncbi_taxid"] != self.ncbiTaxId: 300 301 attr = {"name": record.annotations["organism"], 302 "ncbiTaxID": record.annotations["ncbi_taxid"]} 303 self.xml_generator.startElement( 304 "species", AttributesImpl(attr)) 305 self.xml_generator.endElement("species")
306
307 - def _write_description(self, record):
308 """Write the description if given.""" 309 310 if record.description: 311 312 if not isinstance(record.description, basestring): 313 raise TypeError("Description should be of type string") 314 315 description = record.description 316 if description == "<unknown description>": 317 description = "" 318 319 if len(record.description) > 0: 320 self.xml_generator.startElement( 321 "description", AttributesImpl({})) 322 self.xml_generator.characters(description) 323 self.xml_generator.endElement("description")
324
325 - def _write_seq(self, record):
326 """Write the sequence. 327 328 Note that SeqXML requires a DNA, RNA or protein alphabet. 329 """ 330 331 if isinstance(record.seq, UnknownSeq): 332 raise TypeError( 333 "Sequence type is UnknownSeq but SeqXML requires sequence") 334 335 seq = str(record.seq) 336 337 if not len(seq) > 0: 338 raise ValueError("The sequence length should be greater than 0") 339 340 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 341 alpha = Alphabet._get_base_alphabet(record.seq.alphabet) 342 if isinstance(alpha, Alphabet.RNAAlphabet): 343 seqElem = "RNAseq" 344 elif isinstance(alpha, Alphabet.DNAAlphabet): 345 seqElem = "DNAseq" 346 elif isinstance(alpha, Alphabet.ProteinAlphabet): 347 seqElem = "AAseq" 348 else: 349 raise ValueError("Need a DNA, RNA or Protein alphabet") 350 351 self.xml_generator.startElement(seqElem, AttributesImpl({})) 352 self.xml_generator.characters(seq) 353 self.xml_generator.endElement(seqElem)
354
355 - def _write_dbxrefs(self, record):
356 """Write all database cross references.""" 357 if record.dbxrefs is not None: 358 359 for dbxref in record.dbxrefs: 360 361 if not isinstance(dbxref, basestring): 362 raise TypeError("dbxrefs should be of type list of string") 363 if dbxref.find(':') < 1: 364 raise ValueError("dbxrefs should be in the form ['source:id', 'source:id' ]") 365 366 dbsource, dbid = dbxref.split(':', 1) 367 368 attr = {"source": dbsource, "id": dbid} 369 self.xml_generator.startElement("DBRef", AttributesImpl(attr)) 370 self.xml_generator.endElement("DBRef")
371
372 - def _write_properties(self, record):
373 """Write all annotations that are key value pairs with values of a primitive type or list of primitive types.""" 374 375 for key, value in record.annotations.items(): 376 377 if key not in ("organism", "ncbi_taxid", "source"): 378 379 if value is None: 380 381 attr = {"name": key} 382 self.xml_generator.startElement( 383 "property", AttributesImpl(attr)) 384 self.xml_generator.endElement("property") 385 386 elif isinstance(value, list): 387 388 for v in value: 389 if isinstance(value, (int, float, basestring)): 390 attr = {"name": key, "value": v} 391 self.xml_generator.startElement( 392 "property", AttributesImpl(attr)) 393 self.xml_generator.endElement("property") 394 395 elif isinstance(value, (int, float, basestring)): 396 397 attr = {"name": key, "value": str(value)} 398 self.xml_generator.startElement( 399 "property", AttributesImpl(attr)) 400 self.xml_generator.endElement("property")
401 402 if __name__ == "__main__": 403 print "Running quick self test" 404 405 from Bio import SeqIO 406 import sys 407 408 fileHandle = open("Tests/SeqXML/protein_example.xml", "r") 409 records = list(SeqIO.parse(fileHandle, "seqxml")) 410 411 from StringIO import StringIO 412 stringHandle = StringIO() 413 414 SeqIO.write(records, stringHandle, "seqxml") 415 SeqIO.write(records, sys.stdout, "seqxml") 416 print 417 418 stringHandle.seek(0) 419 records = list(SeqIO.parse(stringHandle, "seqxml")) 420 421 SeqIO.write(records, sys.stdout, "seqxml") 422 print 423