1from multiprocessing
import Pool
5from copy
import deepcopy
6from typing
import Callable, Dict, List, Any, Union, Iterable
16 procs = getattr(_pool,
"_pool",
None)
23 if p
is not None and p.is_alive():
26 except (ValueError, AssertionError, OSError):
34 Initializes the MultiSimu instance for running multiple simulations
with various parameters, conditions,
and replicates.
38 : **simuRunner** : callable
39 The function to run the simulation. It should accept the parameters
and conditions
as input.
41 : **params** : dict
or list of dict
42 The parameters
for the simulation. If a single dictionary
is provided, it
is wrapped
in a list. Each dictionary represents a distinct set of parameters
for a simulation run.
44 : **replicat** : int, optional, default=1
45 The number of replicates to run
for each parameter
and condition combination.
47 : **conditions** : list, optional, default=[0]
48 A list of conditions to apply to each simulation run.
50 : **batch_size_level** : int
or str, optional, default=
None
51 Determines the size of the batches returned by the iterator. It can be an integer specifying the number of batches,
or a string indicating the batch level (
'param',
'replicat',
or 'condition').
53 : **cacheSize** : int
or None, optional, default=200
54 The size of the cache
for the number of simulations to pre-fetch
in parallel.
56 : **withTqdm** : bool, optional, default=
False
57 If
True, progress bars will be displayed using `tqdm` to show the progress of parameter, condition,
and replicate iterations.
59 : **parallel** : bool, optional, default=
True
60 If
True, simulations will be run
in parallel using multiprocessing. If
False, simulations will be run sequentially.
62 : **autoIndex** : bool, optional, default=
True
63 If
True, adds index columns (ID_PARAMETER, ID_CONDITION, ID_REPLICAT) to the output data
for identification.
65 : **autoConcat** : bool, optional, default=
True
66 If
True, concatenates the results
from different simulations into a single pandas DataFrame.
71 - If `params`
is neither a list of dictionaries nor a single dictionary.
72 - If `conditions`
is not a list.
77 def dataCatcherOnStep(simu):
78 return simu.cells.countState()
80 def run_simu(params,condition,replicat):
81 steps = list(range(0,2501,10))
82 MOI, confluence = condition
83 params[
'input'][
'Scenario'][
'infection'] = [[0,MOI]]
84 params[
'input'][
'Scenario'][
'initConfluence'] = confluence
85 data = Simu(params).compute(dataCatcherOnStep,steps)
86 data.index *= params[
'input'][
'Scenario'][
'dt']
90 paramsTest = [params1,params2]
91 conditions = [(0,0.1),
98 for d
in MultiSimu(run_simu,paramsTest,replicat = 5, conditions=conditions, batch_size_level=
'param', withTqdm=
True):
103 def dataCatcherOnStep(simu):
104 return simu.cells.countState()
106 def run_simu(params):
107 steps = list(range(0,2501,10))
108 data =
Simu(params).compute(dataCatcherOnStep,steps)
109 data.index *= params[
'input'][
'Scenario'][
'dt']
112 data = MultiSimu(run_simu,params,replicat=5).
get()
116 simuRunner: Callable,
117 params: dict | list[dict],
119 conditions: Iterable =
None,
120 batch_size_level: int | str |
None =
None,
121 cacheSize: int |
None = 200,
122 withTqdm: bool =
False,
123 parallel: bool =
True,
124 autoIndex: bool =
True,
125 autoConcat: bool =
True):
126 if isinstance(params,dict): params = [params]
127 elif not isinstance(params,Iterable):
raise KeyError(
"params must a list of dict or dict")
128 withCondition = conditions
is not None
130 if not isinstance(conditions,Iterable):
raise KeyError(
"conditions must be an iterable")
131 conditions = list(conditions)
132 else: conditions = []
133 simuRunnerArity = len([p
for p
in inspect.signature(simuRunner).parameters.values()
if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
and p.default
is inspect._empty])
134 if simuRunnerArity == 3:
136 elif simuRunnerArity == 2
and len(conditions)>0:
137 self.
matchRunner =
lambda param,cond,rep: (param,cond)
138 elif simuRunnerArity == 2
and len(conditions)==0:
139 self.
matchRunner =
lambda param,cond,rep: (param,rep)
140 elif simuRunnerArity == 1
and len(conditions)==0:
143 sig = f
"{simuRunner.__name__}{inspect.signature(simuRunner)}"
146 expected = f
"{simuRunner.__name__}(params, condition) or {simuRunner.__name__}(params, condition, replicat)"
149 expected = f
"{simuRunner.__name__}(params) or {simuRunner.__name__}(params, replicat) or {simuRunner.__name__}(params, condition, replicat)"
150 note = f
"Note: when conditions is None, a 2-arg {simuRunner.__name__} is interpreted as (params, replicat)."
153 "simuRunner signature is not compatible with MultiSimu.\n"
155 f
"Expected: {expected}\n"
161 nbParams = len(params)
162 nbCondition = len(conditions)
if withCondition
else 1
167 if withTqdm: self.
_init_pbars(nbParams,nbCondition,replicat)
168 if isinstance(batch_size_level,int):realBatchSize=replicat*nbCondition*batch_size_level
169 elif (batch_size_level==
'param'): realBatchSize=replicat*nbCondition
170 elif batch_size_level==
'condition': realBatchSize=replicat
171 elif batch_size_level==
'replicat': realBatchSize=1
172 else: realBatchSize = replicat*nbCondition*nbParams
187 futures.append((ids,pool.apply_async(self.
simuRunner,params)))
188 except StopIteration:
204 while len(futures) > 0:
206 ids, simu = futures.pop(0)
216 except KeyboardInterrupt:
235 def get(self) -> Union[pd.DataFrame, List[pd.DataFrame]]:
237 Retrieve all simulation results at once.
241 : Union[pd.DataFrame, List[pd.DataFrame]]
242 Returns a single pandas DataFrame if there
is only one batch of results.
243 Returns a list of pandas DataFrames
if there are multiple batches of results.
246 if len(ret)==1:
return ret[0]
250 nbOutput = len(datas[0])
if (isinstance(datas[0],list)
or isinstance(datas[0],tuple))
else 0
251 if nbOutput==1: datas=datas[0]
253 datas = list(zip(*datas))
255 for i
in range(len(datas)):
256 datas[i] = pd.concat(datas[i],copy=
False)
258 datas = pd.concat(datas,copy=
False)
265 self.
pbars[
'param']=tqdm(total=nbParam, ncols=80, position=i ,desc=
'Parameters')
266 self.
pbars[
'param'].updateEach = nbReplicat*nbCondition
269 self.
pbars[
'condition']=tqdm(total=nbCondition, ncols=80, position=i,desc=
'Conditions')
270 self.
pbars[
'condition'].updateEach = nbReplicat
273 self.
pbars[
'replicat']=tqdm(total=nbReplicat, ncols=80, position=i ,desc=
' Replicats')
274 self.
pbars[
'replicat'].updateEach = 1
275 self.
total = nbReplicat*nbParam*nbCondition
281 if pbar.n >= pbar.total:
284 elif (self.
i%pbar.updateEach)==0:
291 if isinstance(datas,list)
or isinstance(datas,tuple):
293 if isinstance(data,pd.DataFrame):
294 if self.
WithParam: data[
'ID_PARAMETER'] = id_param
297 elif isinstance(datas,pd.DataFrame):
298 if self.
WithParam: datas[
'ID_PARAMETER'] = id_param
306 for idP,param
in enumerate(params):
308 for idC,condition
in enumerate(conditions):
311 yield ((idP,idC,idR),self.
matchRunner(deepcopy(param),condition,idR))
313 yield ((idP,idC,
None),self.
matchRunner(deepcopy(param),condition,
None))
317 yield ((idP,
None,idR),self.
matchRunner(deepcopy(param),
None,idR))
319 yield ((idP,
None,
None),self.
matchRunner(deepcopy(param),
None,
None))
322 if n < 1:
raise ValueError(
'n must be at least one')
323 iterator = iter(iterable)
324 while batch := tuple(itertools.islice(iterator, n)):
def _batched(self, iterable, n)
def _postProcess(self, datas, id_param, id_cond, replicat)
def _feedProcess(self, pool, futures, n=1)
def _autoConcat(self, datas)
def __init__(self, Callable simuRunner, dict|list[dict] params, int replicat=1, Iterable conditions=None, int|str|None batch_size_level=None, int|None cacheSize=200, bool withTqdm=False, bool parallel=True, bool autoIndex=True, bool autoConcat=True)
def _init_pbars(self, nbParam, nbCondition, nbReplicat)
Union[pd.DataFrame, List[pd.DataFrame]] get(self)
def _combine(self, params, conditions, replicat)
CompositeGenerator< T > values(T val1, T val2)