Package Bio :: Package PopGen :: Package FDist :: Module Async
[hide private]
[frames] | no frames]

Source Code for Module Bio.PopGen.FDist.Async

  1  # Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Asynchronous execution of Fdist and spliting of loads. 
  7   
  8  FDistAsync Allows for the execution of FDist. 
  9   
 10  SplitFDist splits a single Fdist execution in several, taking advantage 
 11  of multi-core architectures. 
 12  """ 
 13   
 14  import os 
 15  import shutil 
 16  import threading 
 17  from time import sleep 
 18  from Bio.PopGen.Async import Local 
 19  from Bio.PopGen.FDist.Controller import FDistController 
 20   
 21  __docformat__ = "restructuredtext en" 
 22   
23 -class FDistAsync(FDistController):
24 """Asynchronous FDist execution. 25 """ 26
27 - def __init__(self, fdist_dir="", ext=None):
28 """Constructor. 29 30 Parameters: 31 32 - fdist_dir - Where fdist can be found, if = "", then it 33 should be on the path. 34 - ext - Extension of binary names (e.g. nothing on Unix, 35 ".exe" on Windows 36 """ 37 FDistController.__init__(self, fdist_dir, ext)
38
39 - def run_job(self, parameters, input_files):
40 """Runs FDist asynchronously. 41 42 Gets typical Fdist parameters from a dictionary and 43 makes a "normal" call. This is run, normally, inside 44 a separate thread. 45 """ 46 npops = parameters['npops'] 47 nsamples = parameters['nsamples'] 48 fst = parameters['fst'] 49 sample_size = parameters['sample_size'] 50 mut = parameters.get('mut', 0) 51 num_sims = parameters.get('num_sims', 20000) 52 data_dir = parameters.get('data_dir', '.') 53 is_dominant = parameters.get('is_dominant', False) 54 theta = parameters.get('theta', 0.06) 55 beta = parameters.get('beta', (0.25, 0.25)) 56 max_freq = parameters.get('max_freq', 0.99) 57 fst = self.run_fdist(npops, nsamples, fst, sample_size, 58 mut, num_sims, data_dir, 59 is_dominant, theta, beta, 60 max_freq) 61 output_files = {} 62 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r') 63 return fst, output_files
64 65
66 -class SplitFDist(object):
67 """Splits a FDist run. 68 69 The idea is to split a certain number of simulations in smaller 70 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This 71 allows to run simulations in parallel, thus taking advantage 72 of multi-core CPUs. 73 74 Each SplitFDist object can only be used to run a single FDist 75 simulation. 76 """
77 - def __init__(self, report_fun=None, 78 num_thr=2, split_size=1000, fdist_dir='', ext=None):
79 """Constructor. 80 81 Parameters: 82 83 - report_fun - Function that is called when a single packet is 84 run, it should have a single parameter: Fst. 85 - num_thr - Number of desired threads, typically the number 86 of cores. 87 - split_size - Size that a full simulation will be split in. 88 - ext - Binary extension name (e.g. nothing on Unix, '.exe' on 89 Windows). 90 """ 91 self.async = Local.Local(num_thr) 92 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext) 93 self.report_fun = report_fun 94 self.split_size = split_size
95 96 # There might be races when reporting...
97 - def monitor(self):
98 """Monitors and reports (using report_fun) execution. 99 100 Every time a partial simulation ends, calls report_fun. 101 IMPORTANT: monitor calls can be concurrent with other 102 events, ie, a tasks might end while report_fun is being 103 called. This means that report_fun should be consider that 104 other events might be happening while it is running (it 105 can call acquire/release if necessary). 106 """ 107 while(True): 108 sleep(1) 109 self.async.access_ds.acquire() 110 keys = list(self.async.done.keys()) # copy it 111 self.async.access_ds.release() 112 for done in keys: 113 self.async.access_ds.acquire() 114 fst, files = self.async.done[done] 115 del self.async.done[done] 116 out_dat = files['out.dat'] 117 f = open(self.data_dir + os.sep + 'out.dat', 'a') 118 f.writelines(out_dat.readlines()) 119 f.close() 120 out_dat.close() 121 self.async.access_ds.release() 122 for file in os.listdir(self.parts[done]): 123 os.remove(self.parts[done] + os.sep + file) 124 os.rmdir(self.parts[done]) 125 if self.report_fun: 126 self.report_fun(fst) 127 self.async.access_ds.acquire() 128 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \ 129 and len(self.async.done) == 0: 130 break 131 self.async.access_ds.release()
132
133 - def acquire(self):
134 """Allows the external acquisition of the lock. 135 """ 136 self.async.access_ds.acquire()
137
138 - def release(self):
139 """Allows the external release of the lock. 140 """ 141 self.async.access_ds.release()
142 143 # You can only run a fdist case at a time
144 - def run_fdist(self, npops, nsamples, fst, sample_size, 145 mut=0, num_sims=20000, data_dir='.', 146 is_dominant=False, theta=0.06, beta=(0.25, 0.25), 147 max_freq=0.99):
148 """Runs FDist. 149 150 Parameters can be seen on FDistController.run_fdist. 151 152 It will split a single execution in several parts and 153 create separated data directories. 154 """ 155 num_parts = num_sims // self.split_size 156 self.parts = {} 157 self.data_dir = data_dir 158 for directory in range(num_parts): 159 full_path = data_dir + os.sep + str(directory) 160 try: 161 os.mkdir(full_path) 162 except OSError: 163 pass # Its ok, if it is already there 164 if "ss_file" in os.listdir(data_dir): 165 shutil.copy(data_dir + os.sep + "ss_file", full_path) 166 id = self.async.run_program('fdist', { 167 'npops': npops, 168 'nsamples': nsamples, 169 'fst': fst, 170 'sample_size': sample_size, 171 'mut': mut, 172 'num_sims': self.split_size, 173 'data_dir': full_path, 174 'is_dominant': is_dominant, 175 'theta': theta, 176 'beta': beta, 177 'max_freq': max_freq 178 }, {}) 179 self.parts[id] = full_path 180 threading.Thread(target=self.monitor).run()
181