from __future__ import annotations
from smash.solver._mw_forward import forward
from smash.core._constant import OPTIM_FUNC
from smash.core._event_segmentation import _mask_event
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from smash.core.model import Model
from smash.core.generate_samples import SampleResult
import pandas as pd
import numpy as np
from scipy.stats import gaussian_kde
import multiprocessing as mp
from tqdm import tqdm
__all__ = ["BayesResult"]
[docs]class BayesResult(dict):
"""
Represents the Bayesian estimation or optimization result.
Notes
-----
This class is essentially a subclass of dict with attribute accessors.
Attributes
----------
data : dict
Rrepresenting the generated spatially uniform Model parameters/sates and the corresponding cost values after
running the simulations on this dataset. The keys are 'cost' and the names of Model parameters/states considered.
lcurve : dict
The optimization results on the regularization parameter if the L-curve approach is used. The keys are
- 'alpha' : a list of regularization parameters to be optimized.
- 'cost' : a list of corresponding cost values.
- 'mahal_dist' : a list of corresponding Mahalanobis distance values.
- 'var' : a list of corresponding dictionaries. The keys are the names of the Model parameters/states, and each represents its variance.
- 'alpha_opt' : the optimal value of the regularization parameter.
See Also
--------
Model.bayes_estimate: Estimate prior Model parameters/states using Bayesian approach.
Model.bayes_optimize: Optimize the Model using Bayesian approach.
"""
def __getattr__(self, name):
try:
return self[name]
except KeyError as e:
raise AttributeError(name) from e
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __repr__(self):
if self.keys():
m = max(map(len, list(self.keys()))) + 1
return "\n".join(
[
k.rjust(m) + ": " + repr(v)
for k, v in sorted(self.items())
if not k.startswith("_")
]
)
else:
return self.__class__.__name__ + "()"
def __dir__(self):
return list(self.keys())
def _bayes_computation(
instance: Model,
sample: SampleResult,
alpha: int | float | list,
bw_method: str | None,
weights: np.ndarray | None,
algorithm: str | None,
mapping: str | None,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict,
bounds: np.ndarray | None,
wgauge: np.ndarray,
ost: pd.Timestamp,
verbose: bool,
options: dict | None,
ncpu: int,
) -> BayesResult:
# % prior solution
prior_data = {}
# % density of data distribution
density = {}
# % returns
ret_data = {}
ret_lcurve = {}
# % verbose
if verbose:
_bayes_message(sample, alpha, ncpu)
# % Build data from sample
res_simu = _multi_simu(
instance,
sample,
algorithm,
mapping,
jobs_fun,
wjobs_fun,
event_seg,
bounds,
wgauge,
ost,
options,
ncpu,
)
prior_data["cost"] = np.array(res_simu["cost"])
ret_data["cost"] = np.array(res_simu["cost"])
for p in sample._problem["names"]:
ret_data[p] = getattr(sample, p)
dat_p = np.dstack(res_simu[p])
prior_data[p] = dat_p
density[p] = np.ones(dat_p.shape)
# % Density compute
active_mask = np.where(instance.mesh.active_cell == 1)
_compute_density(
sample,
prior_data,
active_mask,
density,
algorithm,
bw_method,
weights,
)
# % Bayes compute
if isinstance(alpha, list):
_lcurve_compute_param(
instance,
sample,
jobs_fun,
wjobs_fun,
event_seg,
wgauge,
ost,
active_mask,
prior_data,
density,
ret_lcurve,
alpha,
)
else:
_compute_param(
instance,
sample,
jobs_fun,
wjobs_fun,
event_seg,
wgauge,
ost,
active_mask,
prior_data,
density,
alpha,
)
return BayesResult(dict(zip(["data", "lcurve"], [ret_data, ret_lcurve])))
def _bayes_message(sr: SampleResult, alpha: int | float | list, ncpu: int):
sp4 = " " * 4
lcurve = True if isinstance(alpha, list) else False
ret = []
ret.append(f"{sp4}Parameters/States set size: {sr.n_sample}")
ret.append(f"Sample generator: {sr.generator}")
ret.append(f"Ncpu: {ncpu}")
ret.append(f"L-curve approach: {lcurve}")
print(f"\n{sp4}".join(ret) + "\n")
### BUILD sample ###
def _run(
instance: Model,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict,
wgauge: np.ndarray,
ost: pd.Timestamp,
):
### SETTING MODEL TO COMPUTE COST VALUES ###
# % send mask_event to Fortran in case of event signatures based optimization
if any([fn[0] == "E" for fn in jobs_fun]):
instance.setup._optimize.mask_event = _mask_event(instance, **event_seg)
# % Set values for Fortran derived type variables
instance.setup._optimize.jobs_fun = jobs_fun
instance.setup._optimize.wjobs_fun = wjobs_fun
instance.setup._optimize.wgauge = wgauge
st = pd.Timestamp(instance.setup.start_time)
instance.setup._optimize.optimize_start_step = (
ost - st
).total_seconds() / instance.setup.dt + 1
###
# % FORWARD MODEL
cost = np.float32(0)
forward(
instance.setup,
instance.mesh,
instance.input_data,
instance.parameters,
instance.parameters.copy(),
instance.states,
instance.states.copy(),
instance.output,
cost,
)
def _unit_simu(
i: int,
instance: Model,
sample: SampleResult,
algorithm: str | None,
mapping: str | None,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict | None,
bounds: np.ndarray,
wgauge: np.ndarray,
ost: pd.Timestamp,
options: dict | None,
) -> dict:
# % SET PARAMS/STATES
for name in sample._problem["names"]:
if name in instance.setup._parameters_name:
setattr(instance.parameters, name, getattr(sample, name)[i])
else:
setattr(instance.states, name, getattr(sample, name)[i])
# % SIMU (RUN OR OPTIMIZE)
if algorithm is None:
_run(instance, jobs_fun, wjobs_fun, event_seg, wgauge, ost)
else:
OPTIM_FUNC[algorithm](
instance,
sample._problem["names"],
mapping,
jobs_fun,
wjobs_fun,
event_seg,
bounds,
wgauge,
ost,
False,
**options,
)
res = {}
res["cost"] = instance.output.cost
for name in sample._problem["names"]:
if name in instance.setup._parameters_name:
res[name] = np.copy(
getattr(instance.parameters, name)
) # must be copy here (TODO: change in V1.0.0)
else:
res[name] = np.copy(
getattr(instance.states, name)
) # must be copy here (TODO: change in V1.0.0)
return res
def _multi_simu(
instance: Model,
sample: SampleResult,
algorithm: str | None,
mapping: str | None,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict,
bounds: np.ndarray,
wgauge: np.ndarray,
ost: pd.Timestamp,
options: dict | None,
ncpu: int,
) -> dict:
if algorithm is None:
pgbar_mess = "</> Running forward Model on multiset"
else:
pgbar_mess = "</> Optimizing Model parameters on multiset"
if ncpu > 1:
list_instance = [instance.copy() for i in range(sample.n_sample)]
pool = mp.Pool(ncpu)
list_result = pool.starmap(
_unit_simu,
[
(
i,
instance,
sample,
algorithm,
mapping,
jobs_fun,
wjobs_fun,
event_seg,
bounds,
wgauge,
ost,
options,
)
for i, instance in tqdm(enumerate(list_instance), desc=pgbar_mess)
],
)
pool.close()
elif ncpu == 1:
list_result = []
for i in tqdm(range(sample.n_sample), desc=pgbar_mess):
list_result.append(
_unit_simu(
i,
instance,
sample,
algorithm,
mapping,
jobs_fun,
wjobs_fun,
event_seg,
bounds,
wgauge,
ost,
options,
)
)
else:
raise ValueError(f"ncpu should be a positive integer, not {ncpu}")
res_keys = list(sample._problem["names"])
res_keys.append("cost")
res = {k: [] for k in res_keys}
for result in list_result:
for k in res.keys():
res[k].append(result[k])
return res
### DENSITY ESTIMATE ###
def _compute_density(
sample: SampleResult,
data: dict,
active_mask: np.ndarray,
density: dict,
algorithm: str | None,
bw_method: str | None,
weights: np.ndarray | None,
):
coord = np.dstack([active_mask[0], active_mask[1]])[0]
x, y = zip(*coord)
for p in sample._problem["names"]:
if algorithm == "l-bfgs-b": # variational Bayes optim (HD-optim)
for xi, yi in zip(x, y):
estimted_density = gaussian_kde(
data[p][xi, yi], bw_method=bw_method, weights=weights
)(data[p][xi, yi])
density[p][
xi, yi
] = estimted_density # TODO: add this term in V1.0.0: * getattr(sample, "_" + p) # compute joint probability
elif isinstance(algorithm, str): # global Bayes optim (LD-optim)
u_dis = np.mean(data[p][active_mask], axis=0)
estimted_density = gaussian_kde(
u_dis, bw_method=bw_method, weights=weights
)(u_dis)
density[p][
x, y
] = estimted_density # TODO: add this term in V1.0.0: * getattr(sample, "_" + p) # compute joint probability
else: # Bayes estim (LD-estim)
density[p][x, y] = getattr(sample, "_" + p)
### BAYES ESTIMATE AND L-CURVE
def _compute_mean_U(
U: np.ndarray, J: np.ndarray, rho: np.ndarray, alpha: float, mask: np.ndarray
) -> tuple:
# U is 3-D array
# rho is 3-D array
# J is 1-D array
L = np.exp(-(2**alpha) * (J / min(J) - 1) ** 2) # likelihood
Lrho = L * rho # 3-D array
C = np.sum(Lrho, axis=2) # C is 2-D array
U_alp = 1 / C * np.sum(U * Lrho, axis=2) # 2-D array
varU = 1 / C * np.sum((U - U_alp[..., np.newaxis]) ** 2 * Lrho, axis=2) # 2-D array
varU = np.mean(varU[mask])
Uinf = np.mean(U, axis=2) # 2-D array
D_alp = np.mean(np.square(U_alp - Uinf)[mask]) / varU
return U_alp, varU, D_alp
def _compute_param(
instance: Model,
sample: SampleResult,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict,
wgauge: np.ndarray,
ost: pd.Timestamp,
active_mask: np.ndarray,
prior_data: dict,
density: dict,
alpha: int | float,
) -> tuple:
D_alp = []
var = {}
for name in sample._problem["names"]:
u, v, d = _compute_mean_U(
prior_data[name], prior_data["cost"], density[name], alpha, active_mask
)
if name in instance.setup._parameters_name:
setattr(instance.parameters, name, u)
else:
setattr(instance.states, name, u)
D_alp.append(d)
var[name] = v
_run(
instance,
jobs_fun,
wjobs_fun,
event_seg,
wgauge,
ost,
)
return instance.output.cost, np.mean(D_alp), var
def _lcurve_compute_param(
instance: Model,
sample: SampleResult,
jobs_fun: np.ndarray,
wjobs_fun: np.ndarray,
event_seg: dict,
wgauge: np.ndarray,
ost: pd.Timestamp,
active_mask: np.ndarray,
prior_data: dict,
density: dict,
ret_lcurve: dict,
alpha: list,
):
cost = []
D_alp = []
var = []
for alpha_i in alpha:
co, d_alp, vr = _compute_param(
instance,
sample,
jobs_fun,
wjobs_fun,
event_seg,
wgauge,
ost,
active_mask,
prior_data,
density,
alpha_i,
)
cost.append(co)
D_alp.append(d_alp)
var.append(vr)
cost_scaled = (cost - np.min(cost)) / (np.max(cost) - np.min(cost))
D_alp_scaled = (D_alp - np.min(D_alp)) / (np.max(D_alp) - np.min(D_alp))
d = np.square(cost_scaled) + np.square(D_alp_scaled)
alpha_opt = alpha[np.argmin(d)]
_compute_param(
instance,
sample,
jobs_fun,
wjobs_fun,
event_seg,
wgauge,
ost,
active_mask,
prior_data,
density,
alpha_opt,
)
ret_lcurve["alpha"] = alpha
ret_lcurve["alpha_opt"] = alpha_opt
ret_lcurve["mahal_dist"] = D_alp
ret_lcurve["cost"] = cost
ret_lcurve["var"] = var