1
2
3
4
5
6
7 """
8 This modules allows for asynchronous execution of Fdist and
9 spliting of loads.
10
11 FDistAsync Allows for the execution of FDist.
12
13 SplitFDist splits a single Fdist execution in several, taking advantage
14 of multi-core architectures.
15
16 """
17
18 import os
19 import shutil
20 import thread
21 from time import sleep
22 from Bio.PopGen.Async import Local
23 from Bio.PopGen.FDist.Controller import FDistController
24
25
27 """Asynchronous FDist execution.
28 """
29
30 - def __init__(self, fdist_dir = "", ext = None):
31 """Constructor.
32
33 Parameters:
34 fdist_dir - Where fdist can be found, if = "", then it
35 should be on the path.
36 ext - Extension of binary names (e.g. nothing on Unix,
37 ".exe" on Windows
38 """
39 FDistController.__init__(self, fdist_dir, ext)
40
41 - def run_job(self, parameters, input_files):
42 """Runs FDist asynchronously.
43
44 Gets typical Fdist parameters from a dictionary and
45 makes a "normal" call. This is run, normally, inside
46 a separate thread.
47 """
48 npops = parameters['npops']
49 nsamples = parameters['nsamples']
50 fst = parameters['fst']
51 sample_size = parameters['sample_size']
52 mut = parameters.get('mut', 0)
53 num_sims = parameters.get('num_sims', 20000)
54 data_dir = parameters.get('data_dir', '.')
55 is_dominant = parameters.get('is_dominant', False)
56 theta = parameters.get('theta', 0.06)
57 beta = parameters.get('beta', (0.25, 0.25))
58 max_freq = parameters.get('max_freq', 0.99)
59 fst = self.run_fdist(npops, nsamples, fst, sample_size,
60 mut, num_sims, data_dir,
61 is_dominant, theta, beta,
62 max_freq)
63 output_files = {}
64 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r')
65 return fst, output_files
66
67
69 """Splits a FDist run.
70
71 The idea is to split a certain number of simulations in smaller
72 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This
73 allows to run simulations in parallel, thus taking advantage
74 of multi-core CPUs.
75
76 Each SplitFDist object can only be used to run a single FDist
77 simulation.
78 """
79 - def __init__(self, report_fun = None,
80 num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
81 """Constructor.
82
83 Parameters:
84 report_fun - Function that is called when a single packet is
85 run, it should have a single parameter: Fst.
86 num_thr - Number of desired threads, typically the number
87 of cores.
88 split_size - Size that a full simulation will be split in.
89 ext - Binary extension name (e.g. nothing on Unix, '.exe' on
90 Windows).
91 """
92 self.async = Local.Local(num_thr)
93 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext)
94 self.report_fun = report_fun
95 self.split_size = split_size
96
97
99 """Monitors and reports (using report_fun) execution.
100
101 Every time a partial simulation ends, calls report_fun.
102 IMPORTANT: monitor calls can be concurrent with other
103 events, ie, a tasks might end while report_fun is being
104 called. This means that report_fun should be consider that
105 other events might be happening while it is running (it
106 can call acquire/release if necessary).
107 """
108 while(True):
109 sleep(1)
110 self.async.access_ds.acquire()
111 keys = self.async.done.keys()[:]
112 self.async.access_ds.release()
113 for done in keys:
114 self.async.access_ds.acquire()
115 fst, files = self.async.done[done]
116 del self.async.done[done]
117 out_dat = files['out.dat']
118 f = open(self.data_dir + os.sep + 'out.dat','a')
119 f.writelines(out_dat.readlines())
120 f.close()
121 out_dat.close()
122 self.async.access_ds.release()
123 for file in os.listdir(self.parts[done]):
124 os.remove(self.parts[done] + os.sep + file)
125 os.rmdir(self.parts[done])
126
127 if self.report_fun:
128 self.report_fun(fst)
129 self.async.access_ds.acquire()
130 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \
131 and len(self.async.done) == 0:
132 break
133 self.async.access_ds.release()
134
135
136
137
139 """Allows the external acquisition of the lock.
140 """
141 self.async.access_ds.acquire()
142
144 """Allows the external release of the lock.
145 """
146 self.async.access_ds.release()
147
148
149 - def run_fdist(self, npops, nsamples, fst, sample_size,
150 mut = 0, num_sims = 20000, data_dir='.',
151 is_dominant = False, theta = 0.06, beta = (0.25, 0.25),
152 max_freq = 0.99):
153 """Runs FDist.
154
155 Parameters can be seen on FDistController.run_fdist.
156
157 It will split a single execution in several parts and
158 create separated data directories.
159 """
160 num_parts = num_sims/self.split_size
161 self.parts = {}
162 self.data_dir = data_dir
163 for directory in range(num_parts):
164 full_path = data_dir + os.sep + str(directory)
165 try:
166 os.mkdir(full_path)
167 except OSError:
168 pass
169 if "ss_file" in os.listdir(data_dir):
170 shutil.copy(data_dir + os.sep + "ss_file", full_path)
171 id = self.async.run_program('fdist', {
172 'npops' : npops,
173 'nsamples' : nsamples,
174 'fst' : fst,
175 'sample_size' : sample_size,
176 'mut' : mut,
177 'num_sims' : self.split_size,
178 'data_dir' : full_path,
179 'is_dominant' : is_dominant,
180 'theta' : theta,
181 'beta' : beta,
182 'max_freq' : max_freq
183 }, {})
184 self.parts[id] = full_path
185 thread.start_new_thread(self.monitor, ())
186