CellModules
MultiSimu.py
Go to the documentation of this file.
1from multiprocessing import Pool
2import pandas as pd
3from tqdm import tqdm
4import itertools
5from copy import deepcopy
6from typing import Callable, Dict, List, Any, Union, Iterable
7import inspect
8
9_pool = None
10
12 global _pool
13 if _pool is None:
14 return False
15
16 procs = getattr(_pool, "_pool", None)
17 if not procs:
18 return False
19
20 alive = False
21 for p in procs:
22 try:
23 if p is not None and p.is_alive():
24 alive = True
25 break
26 except (ValueError, AssertionError, OSError):
27 continue
28
29 return alive
30
31
32class MultiSimu():
33 r"""
34 Initializes the MultiSimu instance for running multiple simulations with various parameters, conditions, and replicates.
35
36 Parameters
37 ----------
38 : **simuRunner** : callable
39 The function to run the simulation. It should accept the parameters and conditions as input.
40
41 : **params** : dict or list of dict
42 The parameters for the simulation. If a single dictionary is provided, it is wrapped in a list. Each dictionary represents a distinct set of parameters for a simulation run.
43
44 : **replicat** : int, optional, default=1
45 The number of replicates to run for each parameter and condition combination.
46
47 : **conditions** : list, optional, default=[0]
48 A list of conditions to apply to each simulation run.
49
50 : **batch_size_level** : int or str, optional, default=None
51 Determines the size of the batches returned by the iterator. It can be an integer specifying the number of batches, or a string indicating the batch level ('param', 'replicat', or 'condition').
52
53 : **cacheSize** : int or None, optional, default=200
54 The size of the cache for the number of simulations to pre-fetch in parallel.
55
56 : **withTqdm** : bool, optional, default=False
57 If True, progress bars will be displayed using `tqdm` to show the progress of parameter, condition, and replicate iterations.
58
59 : **parallel** : bool, optional, default=True
60 If True, simulations will be run in parallel using multiprocessing. If False, simulations will be run sequentially.
61
62 : **autoIndex** : bool, optional, default=True
63 If True, adds index columns (ID_PARAMETER, ID_CONDITION, ID_REPLICAT) to the output data for identification.
64
65 : **autoConcat** : bool, optional, default=True
66 If True, concatenates the results from different simulations into a single pandas DataFrame.
67
68 Raises
69 ------
70 : **KeyError**
71 - If `params` is neither a list of dictionaries nor a single dictionary.
72 - If `conditions` is not a list.
73
74 Example
75 -------
76 ```python
77 def dataCatcherOnStep(simu):
78 return simu.cells.countState()
79
80 def run_simu(params,condition,replicat):
81 steps = list(range(0,2501,10))
82 MOI, confluence = condition
83 params['input']['Scenario']['infection'] = [[0,MOI]]
84 params['input']['Scenario']['initConfluence'] = confluence
85 data = Simu(params).compute(dataCatcherOnStep,steps)
86 data.index *= params['input']['Scenario']['dt']
87 return data
88
89
90 paramsTest = [params1,params2]
91 conditions = [(0,0.1),
92 (5,0.15),
93 (10,0.2)]
94
95 def storeData(datas):
96 pass
97
98 for d in MultiSimu(run_simu,paramsTest,replicat = 5, conditions=conditions, batch_size_level='param', withTqdm=True):
99 storeData(d)
100
101 # or simple usage for replicate :
102
103 def dataCatcherOnStep(simu):
104 return simu.cells.countState()
105
106 def run_simu(params):
107 steps = list(range(0,2501,10))
108 data = Simu(params).compute(dataCatcherOnStep,steps)
109 data.index *= params['input']['Scenario']['dt']
110 return data
111
112 data = MultiSimu(run_simu,params,replicat=5).get()
113 ```
114 """
115 def __init__(self,
116 simuRunner: Callable,
117 params: dict | list[dict],
118 replicat: int = 1,
119 conditions: Iterable = None,
120 batch_size_level: int | str | None = None,
121 cacheSize: int | None = 200,
122 withTqdm: bool = False,
123 parallel: bool = True,
124 autoIndex: bool = True,
125 autoConcat: bool = True):
126 if isinstance(params,dict): params = [params]
127 elif not isinstance(params,Iterable): raise KeyError("params must a list of dict or dict")
128 withCondition = conditions is not None
129 if withCondition:
130 if not isinstance(conditions,Iterable): raise KeyError("conditions must be an iterable")
131 conditions = list(conditions)
132 else: conditions = []
133 simuRunnerArity = len([p for p in inspect.signature(simuRunner).parameters.values() if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD and p.default is inspect._empty])
134 if simuRunnerArity == 3:
135 self.matchRunner = lambda param,cond,rep: (param,cond,rep)
136 elif simuRunnerArity == 2 and len(conditions)>0:
137 self.matchRunner = lambda param,cond,rep: (param,cond)
138 elif simuRunnerArity == 2 and len(conditions)==0:
139 self.matchRunner = lambda param,cond,rep: (param,rep)
140 elif simuRunnerArity == 1 and len(conditions)==0:
141 self.matchRunner = lambda param,cond,rep: (param,)
142 else:
143 sig = f"{simuRunner.__name__}{inspect.signature(simuRunner)}"
144
145 if withCondition:
146 expected = f"{simuRunner.__name__}(params, condition) or {simuRunner.__name__}(params, condition, replicat)"
147 note = ""
148 else:
149 expected = f"{simuRunner.__name__}(params) or {simuRunner.__name__}(params, replicat) or {simuRunner.__name__}(params, condition, replicat)"
150 note = f"Note: when conditions is None, a 2-arg {simuRunner.__name__} is interpreted as (params, replicat)."
151
152 raise TypeError(
153 "simuRunner signature is not compatible with MultiSimu.\n"
154 f"Found: {sig}\n"
155 f"Expected: {expected}\n"
156 f"{note}")
157 self.simuRunner = simuRunner
158 self.parallel = parallel
159 self.autoConcat = autoConcat
160 self.autoIndex = autoIndex
161 nbParams = len(params)
162 nbCondition = len(conditions) if withCondition else 1
163 self.WithParam = nbParams > 1
164 self.WithCondition = withCondition
165 self.WithReplicat = replicat > 1
166 self.withTqdm = withTqdm
167 if withTqdm: self._init_pbars(nbParams,nbCondition,replicat)
168 if isinstance(batch_size_level,int):realBatchSize=replicat*nbCondition*batch_size_level
169 elif (batch_size_level=='param'): realBatchSize=replicat*nbCondition
170 elif batch_size_level=='condition': realBatchSize=replicat
171 elif batch_size_level=='replicat': realBatchSize=1
172 else: realBatchSize = replicat*nbCondition*nbParams
173 self.realBatchSize = realBatchSize
174 if self.parallel:
175 self.argsIter = self._combine(params,conditions,range(replicat))
176 else:
177 self.argsIter = self._batched(self._combine(params,conditions,range(replicat)),realBatchSize)
178 self.i = 0
179 self.end = False
180 self.cacheSize = cacheSize
181
182 def _feedProcess(self,pool,futures,n=1):
183 if not self.end:
184 for _ in range(n):
185 try:
186 ids, params = next(self.argsIter)
187 futures.append((ids,pool.apply_async(self.simuRunner,params)))
188 except StopIteration:
189 self.end = True
190 break
191
192
193 def __iter__(self):
194 global _pool
195 try:
196 if self.parallel:
197 if not _pool_is_usable():
198 _pool = Pool()
199 try:
200 data = []
201 futures = []
202 self._feedProcess(_pool, futures, self.cacheSize)
203
204 while len(futures) > 0:
205 self._feedProcess(_pool, futures)
206 ids, simu = futures.pop(0)
207 data.append(self._postProcess(simu.get(), *ids))
208
209 if len(data) == self.realBatchSize:
210 yield self._autoConcat(data)
211 data = []
212
213 if len(data) > 0:
214 yield self._autoConcat(data)
215
216 except KeyboardInterrupt:
217 _pool.terminate()
218 _pool.join()
219 _pool = None
220 raise
221 except Exception:
222 _pool.terminate()
223 _pool.join()
224 _pool = None
225 raise
226 else:
227 for batch in self.argsIter:
228 yield self._autoConcat(
229 [self._postProcess(self.simuRunner(*params), *ids) for ids, params in batch]
230 )
231
232 finally:
233 self._close_pbars()
234
235 def get(self) -> Union[pd.DataFrame, List[pd.DataFrame]]:
236 """
237 Retrieve all simulation results at once.
238
239 Returns
240 -------
241 : Union[pd.DataFrame, List[pd.DataFrame]]
242 Returns a single pandas DataFrame if there is only one batch of results.
243 Returns a list of pandas DataFrames if there are multiple batches of results.
244 """
245 ret = list(self)
246 if len(ret)==1: return ret[0]
247 else: return ret
248
249 def _autoConcat(self,datas):
250 nbOutput = len(datas[0]) if (isinstance(datas[0],list) or isinstance(datas[0],tuple)) else 0
251 if nbOutput==1: datas=datas[0]
252 if nbOutput>1:
253 datas = list(zip(*datas))
254 if self.autoConcat:
255 for i in range(len(datas)):
256 datas[i] = pd.concat(datas[i],copy=False)
257 elif self.autoConcat:
258 datas = pd.concat(datas,copy=False)
259 return datas
260
261 def _init_pbars(self,nbParam,nbCondition,nbReplicat):
262 i= 0
263 self.pbars={}
264 if self.WithParam:
265 self.pbars['param']=tqdm(total=nbParam, ncols=80, position=i ,desc='Parameters')
266 self.pbars['param'].updateEach = nbReplicat*nbCondition
267 i+=1
268 if self.WithCondition:
269 self.pbars['condition']=tqdm(total=nbCondition, ncols=80, position=i,desc='Conditions')
270 self.pbars['condition'].updateEach = nbReplicat
271 i+=1
272 if self.WithReplicat:
273 self.pbars['replicat']=tqdm(total=nbReplicat, ncols=80, position=i ,desc=' Replicats')
274 self.pbars['replicat'].updateEach = 1
275 self.total = nbReplicat*nbParam*nbCondition
276
277
278 def _updateBar(self):
279 if self.withTqdm:
280 for pbar in self.pbars.values():
281 if pbar.n >= pbar.total:
282 pbar.n=0
283 pbar.refresh()
284 elif (self.i%pbar.updateEach)==0:
285 pbar.update()
286 pbar.refresh()
287
288 def _postProcess(self,datas,id_param,id_cond,replicat):
289 self.i+=1
290 if self.autoIndex:
291 if isinstance(datas,list) or isinstance(datas,tuple):
292 for data in datas:
293 if isinstance(data,pd.DataFrame):
294 if self.WithParam: data['ID_PARAMETER'] = id_param
295 if self.WithCondition: data['ID_CONDITION'] = id_cond
296 if self.WithReplicat: data['ID_REPLICAT'] = replicat
297 elif isinstance(datas,pd.DataFrame):
298 if self.WithParam: datas['ID_PARAMETER'] = id_param
299 if self.WithCondition: datas['ID_CONDITION'] = id_cond
300 if self.WithReplicat: datas['ID_REPLICAT'] = replicat
301
302 self._updateBar()
303 return datas
304
305 def _combine(self,params,conditions,replicat):
306 for idP,param in enumerate(params):
307 if self.WithCondition:
308 for idC,condition in enumerate(conditions):
309 if self.WithReplicat:
310 for idR in replicat:
311 yield ((idP,idC,idR),self.matchRunner(deepcopy(param),condition,idR))
312 else:
313 yield ((idP,idC,None),self.matchRunner(deepcopy(param),condition,None))
314 else:
315 if self.WithReplicat:
316 for idR in replicat:
317 yield ((idP,None,idR),self.matchRunner(deepcopy(param),None,idR))
318 else:
319 yield ((idP,None,None),self.matchRunner(deepcopy(param),None,None))
320
321 def _batched(self,iterable, n):
322 if n < 1: raise ValueError('n must be at least one')
323 iterator = iter(iterable)
324 while batch := tuple(itertools.islice(iterator, n)):
325 yield batch
326
327 def _close_pbars(self):
328 if self.withTqdm:
329 for pbar in self.pbars.values():
330 pbar.close()
Definition: simu.cpp:31
def _batched(self, iterable, n)
Definition: MultiSimu.py:321
def _postProcess(self, datas, id_param, id_cond, replicat)
Definition: MultiSimu.py:288
def _feedProcess(self, pool, futures, n=1)
Definition: MultiSimu.py:182
def _autoConcat(self, datas)
Definition: MultiSimu.py:249
def __init__(self, Callable simuRunner, dict|list[dict] params, int replicat=1, Iterable conditions=None, int|str|None batch_size_level=None, int|None cacheSize=200, bool withTqdm=False, bool parallel=True, bool autoIndex=True, bool autoConcat=True)
Definition: MultiSimu.py:125
def _init_pbars(self, nbParam, nbCondition, nbReplicat)
Definition: MultiSimu.py:261
Union[pd.DataFrame, List[pd.DataFrame]] get(self)
Definition: MultiSimu.py:235
def _combine(self, params, conditions, replicat)
Definition: MultiSimu.py:305
CompositeGenerator< T > values(T val1, T val2)
Definition: catch.hpp:1968