Package Bio :: Module ParserSupport
[hide private]
[frames] | no frames]

Source Code for Module Bio.ParserSupport

  1  # Copyright 1999 by Jeffrey Chang.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Code to support writing parsers (DEPRECATED). 
  7   
  8  Classes: 
  9  AbstractParser         Base class for parsers. 
 10  AbstractConsumer       Base class of all Consumers. 
 11  TaggingConsumer        Consumer that tags output with its event.  For debugging 
 12  EventGenerator         Generate Biopython Events from Martel XML output 
 13                         (note that Martel has been removed) 
 14   
 15  Functions: 
 16  safe_readline          Read a line from a handle, with check for EOF. 
 17  safe_peekline          Peek at next line, with check for EOF. 
 18  read_and_call          Read a line from a handle and pass it to a method. 
 19  read_and_call_while    Read many lines, as long as a condition is met. 
 20  read_and_call_until    Read many lines, until a condition is met. 
 21  attempt_read_and_call  Like read_and_call, but forgiving of errors. 
 22  is_blank_line          Test whether a line is blank. 
 23   
 24  """ 
 25   
 26  from Bio import BiopythonDeprecationWarning 
 27  import warnings 
 28  warnings.warn("Bio.ParserSupport is now deprecated will be removed in a " 
 29                "future release of Biopython.", BiopythonDeprecationWarning) 
 30   
 31  import sys 
 32  try: 
 33      from types import InstanceType 
 34  except ImportError: 
 35      #Python 3, see http://bugs.python.org/issue8206 
 36      InstanceType = object 
 37  from types import MethodType 
 38   
 39  from Bio._py3k import StringIO 
 40   
 41  from Bio import File 
 42   
 43  # XML from python 2.0 
 44  try: 
 45      from xml.sax import handler 
 46      xml_support = 1 
 47  except ImportError: 
 48      sys.stderr.write("Warning: Could not import SAX for dealing with XML.\n" + 
 49                       "This causes problems with some ParserSupport modules\n") 
 50      xml_support = 0 
 51   
 52   
53 -class AbstractParser(object):
54 """Base class for other parsers. 55 56 """
57 - def parse(self, handle):
58 raise NotImplementedError("Please implement in a derived class")
59
60 - def parse_str(self, string):
61 return self.parse(StringIO(string))
62
63 - def parse_file(self, filename):
64 with open(filename) as h: 65 retval = self.parse(h) 66 return retval
67 68
69 -class AbstractConsumer(object):
70 """Base class for other Consumers. 71 72 Derive Consumers from this class and implement appropriate 73 methods for each event that you want to receive. 74 75 """
76 - def _unhandled_section(self):
77 pass
78
79 - def _unhandled(self, data):
80 pass
81
82 - def __getattr__(self, attr):
83 if attr[:6] == 'start_' or attr[:4] == 'end_': 84 method = self._unhandled_section 85 else: 86 method = self._unhandled 87 return method
88 89
90 -class TaggingConsumer(AbstractConsumer):
91 """A Consumer that tags the data stream with the event and 92 prints it to a handle. Useful for debugging. 93 94 """
95 - def __init__(self, handle=None, colwidth=15, maxwidth=80):
96 """TaggingConsumer(handle=sys.stdout, colwidth=15, maxwidth=80)""" 97 # I can't assign sys.stdout to handle in the argument list. 98 # If I do that, handle will be assigned the value of sys.stdout 99 # the first time this function is called. This will fail if 100 # the user has assigned sys.stdout to some other file, which may 101 # be closed or invalid at a later time. 102 if handle is None: 103 handle = sys.stdout 104 self._handle = handle 105 self._colwidth = colwidth 106 self._maxwidth = maxwidth
107
108 - def unhandled_section(self):
109 self._print_name('unhandled_section')
110
111 - def unhandled(self, data):
112 self._print_name('unhandled', data)
113
114 - def _print_name(self, name, data=None):
115 if data is None: 116 # Write the name of a section. 117 self._handle.write("%s %s\n" % ("*"*self._colwidth, name)) 118 else: 119 # Write the tag and line. 120 self._handle.write("%-*s: %s\n" % ( 121 self._colwidth, name[:self._colwidth], 122 data[:self._maxwidth-self._colwidth-2].rstrip()))
123
124 - def __getattr__(self, attr):
125 if attr[:6] == 'start_' or attr[:4] == 'end_': 126 method = lambda a=attr, s=self: s._print_name(a) 127 else: 128 method = lambda x, a=attr, s=self: s._print_name(a, x) 129 return method
130 131 132 # onle use the Event Generator if XML handling is okay 133 if xml_support:
134 - class EventGenerator(handler.ContentHandler):
135 """Handler to generate events associated with a Martel parsed file. 136 137 This acts like a normal SAX handler, and accepts XML generated by 138 Martel during parsing. These events are then converted into 139 'Biopython events', which can then be caught by a standard 140 biopython consumer. 141 142 Note that Martel is now DEPRECATED. 143 """
144 - def __init__(self, consumer, interest_tags, callback_finalizer = None, 145 exempt_tags = []):
146 """Initialize to begin catching and firing off events. 147 148 Arguments: 149 o consumer - The consumer that we'll send Biopython events to. 150 151 o interest_tags - A listing of all the tags we are interested in. 152 153 o callback_finalizer - A function to deal with the collected 154 information before passing it on to the consumer. By default 155 the collected information is a list of all of the lines read 156 for a particular tag -- if there are multiple tags in a row 157 like: 158 159 <some_info>Spam<some_info> 160 <some_info>More Spam<some_info> 161 162 In this case the list of information would be: 163 164 ['Spam', 'More Spam'] 165 166 This list of lines will be passed to the callback finalizer if 167 it is present. Otherwise the consumer will be called with the 168 list of content information. 169 170 o exempt_tags - A listing of particular tags that are exempt from 171 being processed by the callback_finalizer. This allows you to 172 use a finalizer to deal with most tags, but leave those you don't 173 want touched. 174 """ 175 self._consumer = consumer 176 self.interest_tags = interest_tags 177 self._finalizer = callback_finalizer 178 self._exempt_tags = exempt_tags 179 180 # a dictionary of content for each tag of interest 181 # the information for each tag is held as a list of the lines. 182 # This allows us to collect information from multiple tags 183 # in a row, and return it all at once. 184 self.info = {} 185 for tag in self.interest_tags: 186 self.info[tag] = [] 187 188 # the previous tag we were collecting information for. 189 # We set a delay in sending info to the consumer so that we can 190 # collect a bunch of tags in a row and append all of the info 191 # together. 192 self._previous_tag = '' 193 194 # the current character information for a tag 195 self._cur_content = [] 196 # whether we should be collecting information 197 self._collect_characters = 0
198
199 - def startElement(self, name, attrs):
200 """Determine if we should collect characters from this tag. 201 """ 202 if name in self.interest_tags: 203 self._collect_characters = 1
204
205 - def characters(self, content):
206 """Extract the information if we are interested in it. 207 """ 208 if self._collect_characters: 209 self._cur_content.append(content)
210
211 - def endElement(self, name):
212 """Send the information to the consumer. 213 214 Once we've got the end element we've collected up all of the 215 character information we need, and we need to send this on to 216 the consumer to do something with it. 217 218 We have a delay of one tag on doing this, so that we can collect 219 all of the info from multiple calls to the same element at once. 220 """ 221 # only deal with the tag if it is something we are 222 # interested in and potentially have information for 223 if self._collect_characters: 224 # add all of the information collected inside this tag 225 self.info[name].append("".join(self._cur_content)) 226 # reset our information and flags 227 self._cur_content = [] 228 self._collect_characters = 0 229 230 # if we are at a new tag, pass on the info from the last tag 231 if self._previous_tag and self._previous_tag != name: 232 self._make_callback(self._previous_tag) 233 234 # set this tag as the next to be passed 235 self._previous_tag = name
236
237 - def _make_callback(self, name):
238 """Call the callback function with the info with the given name. 239 """ 240 # strip off whitespace and call the consumer 241 callback_function = getattr(self._consumer, name) 242 243 # --- pass back the information 244 # if there is a finalizer, use that 245 if self._finalizer is not None and name not in self._exempt_tags: 246 info_to_pass = self._finalizer(self.info[name]) 247 # otherwise pass back the entire list of information 248 else: 249 info_to_pass = self.info[name] 250 251 callback_function(info_to_pass) 252 253 # reset the information for the tag 254 self.info[name] = []
255
256 - def endDocument(self):
257 """Make sure all of our information has been passed. 258 259 This just flushes out any stored tags that need to be passed. 260 """ 261 if self._previous_tag: 262 self._make_callback(self._previous_tag)
263 264
265 -def read_and_call(uhandle, method, **keywds):
266 """read_and_call(uhandle, method[, start][, end][, contains][, blank][, has_re]) 267 268 Read a line from uhandle, check it, and pass it to the method. 269 Raises a ValueError if the line does not pass the checks. 270 271 start, end, contains, blank, and has_re specify optional conditions 272 that the line must pass. start and end specifies what the line must 273 begin or end with (not counting EOL characters). contains 274 specifies a substring that must be found in the line. If blank 275 is a true value, then the line must be blank. has_re should be 276 a regular expression object with a pattern that the line must match 277 somewhere. 278 279 """ 280 line = safe_readline(uhandle) 281 errmsg = _fails_conditions(*(line,), **keywds) 282 if errmsg is not None: 283 raise ValueError(errmsg) 284 method(line)
285 286
287 -def read_and_call_while(uhandle, method, **keywds):
288 """read_and_call_while(uhandle, method[, start][, end][, contains][, blank][, has_re]) -> number of lines 289 290 Read a line from uhandle and pass it to the method as long as 291 some condition is true. Returns the number of lines that were read. 292 293 See the docstring for read_and_call for a description of the parameters. 294 295 """ 296 nlines = 0 297 while True: 298 line = safe_readline(uhandle) 299 # If I've failed the condition, then stop reading the line. 300 if _fails_conditions(*(line,), **keywds): 301 uhandle.saveline(line) 302 break 303 method(line) 304 nlines = nlines + 1 305 return nlines
306 307
308 -def read_and_call_until(uhandle, method, **keywds):
309 """read_and_call_until(uhandle, method, 310 start=None, end=None, contains=None, blank=None) -> number of lines 311 312 Read a line from uhandle and pass it to the method until 313 some condition is true. Returns the number of lines that were read. 314 315 See the docstring for read_and_call for a description of the parameters. 316 317 """ 318 nlines = 0 319 while True: 320 line = safe_readline(uhandle) 321 # If I've met the condition, then stop reading the line. 322 if not _fails_conditions(*(line,), **keywds): 323 uhandle.saveline(line) 324 break 325 method(line) 326 nlines = nlines + 1 327 return nlines
328 329
330 -def attempt_read_and_call(uhandle, method, **keywds):
331 """attempt_read_and_call(uhandle, method, **keywds) -> boolean 332 333 Similar to read_and_call, but returns a boolean specifying 334 whether the line has passed the checks. Does not raise 335 exceptions. 336 337 See docs for read_and_call for a description of the function 338 arguments. 339 340 """ 341 line = safe_readline(uhandle) 342 passed = not _fails_conditions(*(line,), **keywds) 343 if passed: 344 method(line) 345 else: 346 uhandle.saveline(line) 347 return passed
348 349
350 -def _fails_conditions(line, start=None, end=None, contains=None, blank=None, 351 has_re=None):
352 if start is not None: 353 if line[:len(start)] != start: 354 return "Line does not start with '%s':\n%s" % (start, line) 355 if end is not None: 356 if line.rstrip()[-len(end):] != end: 357 return "Line does not end with '%s':\n%s" % (end, line) 358 if contains is not None: 359 if contains not in line: 360 return "Line does not contain '%s':\n%s" % (contains, line) 361 if blank is not None: 362 if blank: 363 if not is_blank_line(line): 364 return "Expected blank line, but got:\n%s" % line 365 else: 366 if is_blank_line(line): 367 return "Expected non-blank line, but got a blank one" 368 if has_re is not None: 369 if has_re.search(line) is None: 370 return "Line does not match regex '%s':\n%s" % ( 371 has_re.pattern, line) 372 return None
373 374
375 -def is_blank_line(line, allow_spaces=0):
376 """is_blank_line(line, allow_spaces=0) -> boolean 377 378 Return whether a line is blank. allow_spaces specifies whether to 379 allow whitespaces in a blank line. A true value signifies that a 380 line containing whitespaces as well as end-of-line characters 381 should be considered blank. 382 383 """ 384 if not line: 385 return 1 386 if allow_spaces: 387 return line.rstrip() == '' 388 return line[0] == '\n' or line[0] == '\r'
389 390
391 -def safe_readline(handle):
392 """safe_readline(handle) -> line 393 394 Read a line from an UndoHandle and return it. If there are no more 395 lines to read, I will raise a ValueError. 396 397 """ 398 line = handle.readline() 399 if not line: 400 raise ValueError("Unexpected end of stream.") 401 return line
402 403
404 -def safe_peekline(handle):
405 """safe_peekline(handle) -> line 406 407 Peek at the next line in an UndoHandle and return it. If there are no 408 more lines to peek, I will raise a ValueError. 409 410 """ 411 line = handle.peekline() 412 if not line: 413 raise ValueError("Unexpected end of stream.") 414 return line
415