Package Bio :: Package SeqIO :: Module SeqXmlIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.SeqXmlIO

  1  # Copyright 2010 by Thomas Schmitt. 
  2  # All rights reserved. 
  3  # 
  4  # This module is for reading and writing SeqXML format files as 
  5  # SeqRecord objects, and is expected to be used via the Bio.SeqIO API. 
  6  """Bio.SeqIO support for the "seqxml" file format, SeqXML. 
  7   
  8  You are expected to use this module via the Bio.SeqIO functions. 
  9   
 10  SeqXML is a lightweight XML format which is supposed be an alternative for 
 11  FASTA files. For more Information see http://www.seqXML.org and Schmitt et al 
 12  (2011), http://dx.doi.org/10.1093/bib/bbr025 
 13  """ 
 14   
 15  from __future__ import print_function 
 16   
 17  from xml.sax.saxutils import XMLGenerator 
 18  from xml.sax.xmlreader import AttributesImpl 
 19  from xml.dom import pulldom 
 20  from xml.sax import SAXParseException 
 21   
 22  from Bio._py3k import range 
 23  from Bio._py3k import basestring 
 24   
 25  from Bio import Alphabet 
 26  from Bio.Seq import Seq 
 27  from Bio.Seq import UnknownSeq 
 28  from Bio.SeqRecord import SeqRecord 
 29  from .Interfaces import SequentialSequenceWriter 
 30   
 31   
32 -class XMLRecordIterator:
33 """Base class for building iterators for record style XML formats. 34 35 It is assumed that all information for one record can be found within a 36 record element or above. Two types of methods are called when the start 37 tag of an element is reached. To receive only the attributes of an 38 element before its end tag is reached implement _attr_TAGNAME. 39 To get an element and its children as a DOM tree implement _elem_TAGNAME. 40 Everything that is part of the DOM tree will not trigger any further 41 method calls. 42 """ 43
44 - def __init__(self, handle, recordTag, namespace=None):
45 """Creating the object and initializing the XML parser.""" 46 47 self._recordTag = recordTag 48 self._namespace = namespace 49 self._events = pulldom.parse(handle)
50
51 - def __iter__(self):
52 """Iterate over the records in the XML file. 53 Returns the last parsed record.""" 54 55 record = None 56 try: 57 for event, node in self._events: 58 59 if event == "START_ELEMENT" and node.namespaceURI == self._namespace: 60 61 if node.localName == self._recordTag: 62 #create an empty SeqRecord 63 record = SeqRecord('', id='') 64 65 #call matching methods with attributes only 66 if hasattr(self, "_attr_" + node.localName): 67 getattr(self, "_attr_" + node.localName)( 68 self._attributes(node), record) 69 70 #call matching methods with DOM tree 71 if hasattr(self, "_elem_" + node.localName): 72 #read the element and all nested elements into a DOM tree 73 self._events.expandNode(node) 74 node.normalize() 75 76 getattr(self, "_elem_" + node.localName)(node, record) 77 78 elif event == "END_ELEMENT" and node.namespaceURI == self._namespace and node.localName == self._recordTag: 79 yield record 80 81 except SAXParseException as e: 82 83 if e.getLineNumber() == 1 and e.getColumnNumber() == 0: 84 #empty file 85 pass 86 else: 87 import os 88 if e.getLineNumber() == 1 and e.getColumnNumber() == 1 \ 89 and os.name == "java": 90 #empty file, see http://bugs.jython.org/issue1774 91 pass 92 else: 93 raise
94
95 - def _attributes(self, node):
96 """Return the attributes of a DOM node as dictionary.""" 97 98 return dict((node.attributes.item(i).name, node.attributes.item(i).value) 99 for i in range(node.attributes.length))
100 101
102 -class SeqXmlIterator(XMLRecordIterator):
103 """Breaks seqXML file into SeqRecords. 104 105 Assumes valid seqXML please validate beforehand.""" 106
107 - def __init__(self, handle):
108 """Create the object.""" 109 XMLRecordIterator.__init__(self, handle, "entry") 110 111 self._source = None 112 self._source_version = None 113 self._version = None 114 self._speciesName = None 115 self._ncbiTaxId = None
116
117 - def _attr_seqXML(self, attr_dict, record):
118 """Parse the document metadata.""" 119 120 if "source" in attr_dict: 121 self._source = attr_dict["source"] 122 if "sourceVersion" in attr_dict: 123 self._source_version = attr_dict["sourceVersion"] 124 if "version" in attr_dict: 125 self._version = attr_dict["seqXMLversion"] 126 if "ncbiTaxID" in attr_dict: 127 self._ncbiTaxId = attr_dict["ncbiTaxID"] 128 if "speciesName" in attr_dict: 129 self._speciesName = attr_dict["speciesName"]
130
131 - def _attr_property(self, attr_dict, record):
132 """Parse key value pair properties and store them as annotations.""" 133 134 if "name" not in attr_dict: 135 raise ValueError("Malformed property element.") 136 137 value = attr_dict.get("value", None) 138 139 if attr_dict["name"] not in record.annotations: 140 record.annotations[attr_dict["name"]] = value 141 elif isinstance(record.annotations[attr_dict["name"]], list): 142 record.annotations[attr_dict["name"]].append(value) 143 else: 144 record.annotations[attr_dict["name"]] = [ 145 record.annotations[attr_dict["name"]], value]
146
147 - def _attr_species(self, attr_dict, record):
148 """Parse the species information.""" 149 150 if "name" not in attr_dict or "ncbiTaxID" not in attr_dict: 151 raise ValueError("Malformed species element!") 152 153 #the keywords for the species annotation are taken from SwissIO 154 record.annotations["organism"] = attr_dict["name"] 155 record.annotations["ncbi_taxid"] = attr_dict["ncbiTaxID"]
156
157 - def _attr_entry(self, attr_dict, record):
158 """New entry set id and the optional entry source.""" 159 160 if "id" not in attr_dict: 161 raise ValueError("Malformed entry! Identifier is missing.") 162 163 record.id = attr_dict["id"] 164 if "source" in attr_dict: 165 record.annotations["source"] = attr_dict["source"] 166 elif self._source is not None: 167 record.annotations["source"] = self._source 168 169 #initialize entry with global species definition 170 #the keywords for the species annotation are taken from SwissIO 171 if self._ncbiTaxId is not None: 172 record.annotations["ncbi_taxid"] = self._ncbiTaxId 173 if self._speciesName is not None: 174 record.annotations["organism"] = self._speciesName
175
176 - def _elem_DNAseq(self, node, record):
177 """Parse DNA sequence.""" 178 179 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 180 raise ValueError("Sequence length should be greater than 0.") 181 182 record.seq = Seq(node.firstChild.data, Alphabet.generic_dna)
183
184 - def _elem_RNAseq(self, node, record):
185 """Parse RNA sequence.""" 186 187 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 188 raise ValueError("Sequence length should be greater than 0.") 189 190 record.seq = Seq(node.firstChild.data, Alphabet.generic_rna)
191
192 - def _elem_AAseq(self, node, record):
193 """Parse protein sequence.""" 194 195 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 196 raise ValueError("Sequence length should be greater than 0.") 197 198 record.seq = Seq(node.firstChild.data, Alphabet.generic_protein)
199
200 - def _elem_description(self, node, record):
201 """Parse the description.""" 202 203 if node.hasChildNodes() and len(node.firstChild.data) > 0: 204 record.description = node.firstChild.data
205
206 - def _attr_DBRef(self, attr_dict, record):
207 """Parse a database cross reference""" 208 209 if "source" not in attr_dict or "id" not in attr_dict: 210 raise ValueError("Invalid DB cross reference.") 211 212 if "%s:%s" % (attr_dict["source"], attr_dict["id"]) not in record.dbxrefs: 213 record.dbxrefs.append( 214 "%s:%s" % (attr_dict["source"], attr_dict["id"]))
215 216
217 -class SeqXmlWriter(SequentialSequenceWriter):
218 """Writes SeqRecords into seqXML file. 219 220 SeqXML requires the sequence alphabet be explicitly RNA, DNA or protein, 221 i.e. an instance or subclass of Bio.Alphapet.RNAAlphabet, 222 Bio.Alphapet.DNAAlphabet or Bio.Alphapet.ProteinAlphabet. 223 """ 224
225 - def __init__(self, handle, source=None, source_version=None, 226 species=None, ncbiTaxId=None):
227 """Create Object and start the xml generator.""" 228 229 SequentialSequenceWriter.__init__(self, handle) 230 231 self.xml_generator = XMLGenerator(handle, "utf-8") 232 self.xml_generator.startDocument() 233 self.source = source 234 self.source_version = source_version 235 self.species = species 236 self.ncbiTaxId = ncbiTaxId
237
238 - def write_header(self):
239 """Write root node with document metadata.""" 240 SequentialSequenceWriter.write_header(self) 241 242 attrs = {"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", 243 "xsi:noNamespaceSchemaLocation": "http://www.seqxml.org/0.4/seqxml.xsd", 244 "seqXMLversion": "0.4"} 245 246 if self.source is not None: 247 attrs["source"] = self.source 248 if self.source_version is not None: 249 attrs["sourceVersion"] = self.source_ersion 250 if self.species is not None: 251 if not isinstance(species, basestring): 252 raise TypeError("species should be of type string") 253 attrs["speciesName"] = self.species 254 if self.ncbiTaxId is not None: 255 if not isinstance(self.ncbiTaxId, (basestring, int)): 256 raise TypeError("ncbiTaxID should be of type string or int") 257 attrs["ncbiTaxID"] = self.ncbiTaxId 258 259 self.xml_generator.startElement("seqXML", AttributesImpl(attrs))
260
261 - def write_record(self, record):
262 """Write one record.""" 263 264 if not record.id or record.id == "<unknown id>": 265 raise ValueError("SeqXML requires identifier") 266 267 if not isinstance(record.id, basestring): 268 raise TypeError("Identifier should be of type string") 269 270 attrb = {"id": record.id} 271 272 if "source" in record.annotations and self.source != record.annotations["source"]: 273 if not isinstance(record.annotations["source"], basestring): 274 raise TypeError("source should be of type string") 275 attrb["source"] = record.annotations["source"] 276 277 self.xml_generator.startElement("entry", AttributesImpl(attrb)) 278 self._write_species(record) 279 self._write_description(record) 280 self._write_seq(record) 281 self._write_dbxrefs(record) 282 self._write_properties(record) 283 self.xml_generator.endElement("entry")
284 292
293 - def _write_species(self, record):
294 """Write the species if given.""" 295 296 if "organism" in record.annotations and "ncbi_taxid" in record.annotations: 297 298 if not isinstance(record.annotations["organism"], basestring): 299 raise TypeError("organism should be of type string") 300 301 if not isinstance(record.annotations["ncbi_taxid"], (basestring, int)): 302 raise TypeError("ncbiTaxID should be of type string or int") 303 304 #The local species definition is only written if it differs from the global species definition 305 if record.annotations["organism"] != self.species or record.annotations["ncbi_taxid"] != self.ncbiTaxId: 306 307 attr = {"name": record.annotations["organism"], 308 "ncbiTaxID": record.annotations["ncbi_taxid"]} 309 self.xml_generator.startElement( 310 "species", AttributesImpl(attr)) 311 self.xml_generator.endElement("species")
312
313 - def _write_description(self, record):
314 """Write the description if given.""" 315 316 if record.description: 317 318 if not isinstance(record.description, basestring): 319 raise TypeError("Description should be of type string") 320 321 description = record.description 322 if description == "<unknown description>": 323 description = "" 324 325 if len(record.description) > 0: 326 self.xml_generator.startElement( 327 "description", AttributesImpl({})) 328 self.xml_generator.characters(description) 329 self.xml_generator.endElement("description")
330
331 - def _write_seq(self, record):
332 """Write the sequence. 333 334 Note that SeqXML requires a DNA, RNA or protein alphabet. 335 """ 336 337 if isinstance(record.seq, UnknownSeq): 338 raise TypeError( 339 "Sequence type is UnknownSeq but SeqXML requires sequence") 340 341 seq = str(record.seq) 342 343 if not len(seq) > 0: 344 raise ValueError("The sequence length should be greater than 0") 345 346 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 347 alpha = Alphabet._get_base_alphabet(record.seq.alphabet) 348 if isinstance(alpha, Alphabet.RNAAlphabet): 349 seqElem = "RNAseq" 350 elif isinstance(alpha, Alphabet.DNAAlphabet): 351 seqElem = "DNAseq" 352 elif isinstance(alpha, Alphabet.ProteinAlphabet): 353 seqElem = "AAseq" 354 else: 355 raise ValueError("Need a DNA, RNA or Protein alphabet") 356 357 self.xml_generator.startElement(seqElem, AttributesImpl({})) 358 self.xml_generator.characters(seq) 359 self.xml_generator.endElement(seqElem)
360
361 - def _write_dbxrefs(self, record):
362 """Write all database cross references.""" 363 if record.dbxrefs is not None: 364 365 for dbxref in record.dbxrefs: 366 367 if not isinstance(dbxref, basestring): 368 raise TypeError("dbxrefs should be of type list of string") 369 if dbxref.find(':') < 1: 370 raise ValueError("dbxrefs should be in the form ['source:id', 'source:id' ]") 371 372 dbsource, dbid = dbxref.split(':', 1) 373 374 attr = {"source": dbsource, "id": dbid} 375 self.xml_generator.startElement("DBRef", AttributesImpl(attr)) 376 self.xml_generator.endElement("DBRef")
377
378 - def _write_properties(self, record):
379 """Write all annotations that are key value pairs with values of a primitive type or list of primitive types.""" 380 381 for key, value in record.annotations.items(): 382 383 if key not in ("organism", "ncbi_taxid", "source"): 384 385 if value is None: 386 387 attr = {"name": key} 388 self.xml_generator.startElement( 389 "property", AttributesImpl(attr)) 390 self.xml_generator.endElement("property") 391 392 elif isinstance(value, list): 393 394 for v in value: 395 if isinstance(value, (int, float, basestring)): 396 attr = {"name": key, "value": v} 397 self.xml_generator.startElement( 398 "property", AttributesImpl(attr)) 399 self.xml_generator.endElement("property") 400 401 elif isinstance(value, (int, float, basestring)): 402 403 attr = {"name": key, "value": str(value)} 404 self.xml_generator.startElement( 405 "property", AttributesImpl(attr)) 406 self.xml_generator.endElement("property")
407 408 if __name__ == "__main__": 409 print("Running quick self test") 410 411 from Bio import SeqIO 412 import sys 413 414 with open("Tests/SeqXML/protein_example.xml", "r") as fileHandle: 415 records = list(SeqIO.parse(fileHandle, "seqxml")) 416 417 from Bio._py3k import StringIO 418 stringHandle = StringIO() 419 420 SeqIO.write(records, stringHandle, "seqxml") 421 SeqIO.write(records, sys.stdout, "seqxml") 422 print("") 423 424 stringHandle.seek(0) 425 records = list(SeqIO.parse(stringHandle, "seqxml")) 426 427 SeqIO.write(records, sys.stdout, "seqxml") 428 print("") 429