1from multiprocessing
import Pool
5from copy
import deepcopy
6from typing
import Callable, Dict, List, Any, Union, Iterable
10 Initializes the MultiSimu instance for running multiple simulations
with various parameters, conditions,
and replicates.
14 : **simuRunner** : callable
15 The function to run the simulation. It should accept the parameters
and conditions
as input.
17 : **params** : dict
or list of dict
18 The parameters
for the simulation. If a single dictionary
is provided, it
is wrapped
in a list. Each dictionary represents a distinct set of parameters
for a simulation run.
20 : **replicat** : int, optional, default=1
21 The number of replicates to run
for each parameter
and condition combination.
23 : **conditions** : list, optional, default=[0]
24 A list of conditions to apply to each simulation run.
26 : **batch_size_level** : int
or str, optional, default=
None
27 Determines the size of the batches returned by the iterator. It can be an integer specifying the number of batches,
or a string indicating the batch level (
'param',
'replicat',
or 'condition').
29 : **cacheSize** : int
or None, optional, default=200
30 The size of the cache
for the number of simulations to pre-fetch
in parallel.
32 : **withTqdm** : bool, optional, default=
False
33 If
True, progress bars will be displayed using `tqdm` to show the progress of parameter, condition,
and replicate iterations.
35 : **parallel** : bool, optional, default=
True
36 If
True, simulations will be run
in parallel using multiprocessing. If
False, simulations will be run sequentially.
38 : **autoIndex** : bool, optional, default=
True
39 If
True, adds index columns (ID_PARAMETER, ID_CONDITION, ID_REPLICAT) to the output data
for identification.
41 : **autoConcat** : bool, optional, default=
True
42 If
True, concatenates the results
from different simulations into a single pandas DataFrame.
47 - If `params`
is neither a list of dictionaries nor a single dictionary.
48 - If `conditions`
is not a list.
53 def dataCatcherOnStep(simu):
54 return simu.cells.countState()
56 def run_simu(params,condition,replicat):
57 steps = list(range(0,2501,10))
58 MOI, confluence = condition
59 params[
'input'][
'Scenario'][
'infection'] = [[0,MOI]]
60 params[
'input'][
'Scenario'][
'initConfluence'] = confluence
61 data = Simu(params).compute(dataCatcherOnStep,steps)
62 data.index *= params[
'input'][
'Scenario'][
'dt']
66 paramsTest = [params1,params2]
67 conditions = [(0,0.1),
74 for d
in MultiSimu(run_simu,paramsTest,replicat = 5, conditions=conditions, batch_size_level=
'param', withTqdm=
True):
79 def dataCatcherOnStep(simu):
80 return simu.cells.countState()
83 steps = list(range(0,2501,10))
84 data =
Simu(params).compute(dataCatcherOnStep,steps)
85 data.index *= params[
'input'][
'Scenario'][
'dt']
88 data = MultiSimu(run_simu,params,replicat=5).
get()
93 params: dict | list[dict],
95 conditions: list[int] = [0],
96 batch_size_level: int | str |
None =
None,
97 cacheSize: int |
None = 200,
98 withTqdm: bool =
False,
99 parallel: bool =
True,
100 autoIndex: bool =
True,
101 autoConcat: bool =
True):
102 if isinstance(params,dict): params = [params]
103 elif not isinstance(params,Iterable):
raise KeyError(
"params must a list of dict or dict")
104 if not isinstance(conditions,list):
raise KeyError(
"conditions must be a list")
109 nbParams = len(params)
110 nbCondition = len(conditions)
115 if withTqdm: self.
_init_pbars(nbParams,nbCondition,replicat)
116 if isinstance(batch_size_level,int):realBatchSize=replicat*nbCondition*batch_size_level
117 elif (batch_size_level==
'param'): realBatchSize=replicat*nbCondition
118 elif batch_size_level==
'condition': realBatchSize=replicat
119 elif batch_size_level==
'replicat': realBatchSize=1
120 else: realBatchSize = replicat*nbCondition*nbParams
135 futures.append((ids,pool.apply_async(self.
simuRunner,params)))
136 except StopIteration:
147 while len(futures)>0:
149 ids, simu = futures.pop(0)
162 def get(self) -> Union[pd.DataFrame, List[pd.DataFrame]]:
164 Retrieve all simulation results at once.
168 : Union[pd.DataFrame, List[pd.DataFrame]]
169 Returns a single pandas DataFrame if there
is only one batch of results.
170 Returns a list of pandas DataFrames
if there are multiple batches of results.
173 if len(ret)==1:
return ret[0]
177 nbOutput = len(datas[0])
if (isinstance(datas[0],list)
or isinstance(datas[0],tuple))
else 0
178 if nbOutput==1: datas=datas[0]
180 datas = list(zip(*datas))
182 for i
in range(len(datas)):
183 datas[i] = pd.concat(datas[i],copy=
False)
185 datas = pd.concat(datas,copy=
False)
192 self.
pbars[
'param']=tqdm(total=nbParam, ncols=80, position=i ,desc=
'Parameters')
193 self.
pbars[
'param'].updateEach = nbReplicat*nbCondition
196 self.
pbars[
'condition']=tqdm(total=nbCondition, ncols=80, position=i,desc=
'Conditions')
197 self.
pbars[
'condition'].updateEach = nbReplicat
200 self.
pbars[
'replicat']=tqdm(total=nbReplicat, ncols=80, position=i ,desc=
' Replicats')
201 self.
pbars[
'replicat'].updateEach = 1
202 self.
total = nbReplicat*nbParam*nbCondition
208 if pbar.n >= pbar.total:
211 elif (self.
i%pbar.updateEach)==0:
218 if isinstance(datas,list)
or isinstance(datas,tuple):
220 if isinstance(data,pd.DataFrame):
221 if self.
WithParam: data[
'ID_PARAMETER'] = id_param
224 elif isinstance(datas,pd.DataFrame):
225 if self.
WithParam: datas[
'ID_PARAMETER'] = id_param
233 for idP,param
in enumerate(params):
235 for idC,condition
in enumerate(conditions):
238 yield ((idP,idC,idR),(deepcopy(param),condition))
240 yield ((idP,idC,
None),(deepcopy(param),condition))
244 yield ((idP,
None,idR),(deepcopy(param),))
246 yield ((idP,
None,
None),(deepcopy(param),))
249 if n < 1:
raise ValueError(
'n must be at least one')
250 iterator = iter(iterable)
251 while batch := tuple(itertools.islice(iterator, n)):
def _batched(self, iterable, n)
def _postProcess(self, datas, id_param, id_cond, replicat)
def _feedProcess(self, pool, futures, n=1)
def _autoConcat(self, datas)
def __init__(self, Callable simuRunner, dict|list[dict] params, int replicat=1, list[int] conditions=[0], int|str|None batch_size_level=None, int|None cacheSize=200, bool withTqdm=False, bool parallel=True, bool autoIndex=True, bool autoConcat=True)
def _init_pbars(self, nbParam, nbCondition, nbReplicat)
Union[pd.DataFrame, List[pd.DataFrame]] get(self)
def _combine(self, params, conditions, replicat)
CompositeGenerator< T > values(T val1, T val2)