Source code for smash.core.generate_samples

from __future__ import annotations

from smash.core._constant import (
    STRUCTURE_PARAMETERS,
    STRUCTURE_STATES,
    SAMPLE_GENERATORS,
    PROBLEM_KEYS,
)

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from smash.solver._mwd_setup import SetupDT

import warnings

import numpy as np
import pandas as pd
from scipy.stats import truncnorm


__all__ = ["generate_samples", "SampleResult"]


[docs]class SampleResult(dict): """ Represents the generated sample result. Notes ----- This class is essentially a subclass of dict with attribute accessors and four additional methods, which are: - `SampleResult.to_numpy`: Convert the `SampleResult` object to a numpy.ndarray. - `SampleResult.to_dataframe`: Convert the `SampleResult` object to a pandas.DataFrame. - `SampleResult.slice`: Slice the `SampleResult` object. - `SampleResult.iterslice`: Iterate over the `SampleResult` object by slices. This may have additional attributes not listed here depending on the specific names provided in the argument ``problem`` in the `smash.generate_samples` method. Attributes ---------- generator : str The generator used to generate the samples. n_sample : int The number of generated samples. See Also -------- smash.generate_samples: Generate a multiple set of spatially uniform Model parameters/states. Examples -------- >>> problem = {"num_vars": 2, "names": ["cp", "lr"], "bounds": [[1,200], [1,500]]} >>> sr = smash.generate_samples(problem, n=5, random_state=1) Convert the result to a numpy.ndarray: >>> sr.to_numpy(axis=-1) array([[ 83.98737894, 47.07695879], [144.34457419, 93.94384548], [ 1.02276059, 173.43480279], [ 61.16418195, 198.98696964], [ 30.20442227, 269.86955027]]) Convert the result to a pandas.DataFrame: >>> sr.to_dataframe() cp lr 0 83.987379 47.076959 1 144.344574 93.943845 2 1.022761 173.434803 3 61.164182 198.986970 4 30.204422 269.869550 Slice the first two sets: >>> slc = sr.slice(2) >>> slc.to_numpy(axis=-1) array([[ 83.98737894, 47.07695879], [144.34457419, 93.94384548]]) Slice between the start and end set: >>> slc = sr.slice(start=3, end=5) >>> slc.to_numpy(axis=-1) array([[ 61.16418195, 198.98696964], [ 30.20442227, 269.86955027]]) Iterate on each set: >>> for slc_i in sr.iterslice(): >>> slc_i.to_numpy(axis=-1) array([[83.98737894, 47.07695879]]) array([[144.34457419, 93.94384548]]) array([[ 1.02276059, 173.43480279]]) array([[ 61.16418195, 198.98696964]]) array([[ 30.20442227, 269.86955027]]) Iterate on pairs of sets: >>> for slc_i in sr.iterslice(2): >>> slc_i.to_numpy(axis=-1) array([[ 83.98737894, 47.07695879], [144.34457419, 93.94384548]]) array([[ 1.02276059, 173.43480279], [ 61.16418195, 198.98696964]]) array([[ 30.20442227, 269.86955027]]) """ def __getattr__(self, name): try: return self[name] except KeyError as e: raise AttributeError(name) from e __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def __repr__(self): if self.keys(): m = max(map(len, list(self.keys()))) + 1 return "\n".join( [ k.rjust(m) + ": " + repr(v) for k, v in sorted(self.items()) if not k.startswith("_") ] ) else: return self.__class__.__name__ + "()" def __dir__(self): return list(self.keys())
[docs] def slice(self, end: int, start: int = 0): """ Slice the `SampleResult` object. The attribute arrays are sliced along a user-specified start and end index. Parameters ---------- end : int The end index of the slice. start : int, default 0 The start index of the slice. Must be lower than **end**. Returns ------- res : SampleResult The `SampleResult` object sliced according to **start** and **end** arguments. """ if end < start: raise ValueError( f"start argument {start} must be lower than end argument {end}" ) if start < 0: raise ValueError(f"start argument {start} must be greater or equal to 0") if end > self.n_sample: raise ValueError( f"end argument {end} must be lower or equal to the sample size {self.n_sample}" ) slc_n = end - start slc_names = [key for key in self._problem["names"]] + [ "_" + key for key in self._problem["names"] ] slc_dict = {key: self[key][start:end] for key in slc_names} slc_dict["generator"] = self.generator slc_dict["n_sample"] = slc_n slc_dict["_problem"] = self._problem.copy() return SampleResult(slc_dict)
[docs] def iterslice(self, by: int = 1): """ Iterate on the `SampleResult` object by slices. Parameters ---------- by : int, default 1 The size of the `SampleResult` slice. If **by** is not a multiple of the sample size :math:`n` the last slice iteration size will be updated to the maximum range. It results in :math:`k=\\lfloor{\\frac{n}{by}}\\rfloor` iterations of size :math:`by` and one last iteration of size :math:`n - k \\times by`. Yields ------ slice : SampleResult The `SampleResult` object sliced according to **by** arguments. See Also -------- SampleResult.slice: Slice the `SampleResult` object. """ if by > self.n_sample: raise ValueError( f"by argument {by} must be lower or equal to the sample size {self.n_sample}" ) ind_start = 0 ind_end = by while ind_start != ind_end: yield self.slice(start=ind_start, end=ind_end) ind_start = ind_end ind_end = np.minimum(ind_end + by, self.n_sample)
[docs] def to_numpy(self, axis=0): """ Convert the `SampleResult` object to a numpy.ndarray. The attribute arrays are stacked along a user-specified axis of the resulting array. Parameters ---------- axis : int, default 0 The axis along which the generated samples of each Model parameter/state will be joined. Returns ------- res : numpy.ndarray The `SampleResult` object as a numpy.ndarray. """ return np.stack([self[k] for k in self._problem["names"]], axis=axis)
[docs] def to_dataframe(self): """ Convert the `SampleResult` object to a pandas.DataFrame. Returns ------- res : pandas.DataFrame The SampleResult object as a pandas.DataFrame. """ return pd.DataFrame({k: self[k] for k in self._problem["names"]})
[docs]def generate_samples( problem: dict, generator: str = "uniform", n: int = 1000, random_state: int | None = None, mean: np.ndarray | None = None, coef_std: float | None = None, ): """ Generate a multiple set of spatially uniform Model parameters/states. Parameters ---------- problem : dict Problem definition. The keys are - 'num_vars' : the number of Model parameters/states. - 'names' : the name of Model parameters/states. - 'bounds' : the upper and lower bounds of each Model parameter/state (a sequence of ``(min, max)``). .. hint:: This problem can be created using the Model object. See `smash.Model.get_bound_constraints` for more. generator : str, default 'uniform' Samples generator. Should be one of - 'uniform' - 'normal' or 'gaussian' n : int, default 1000 Number of generated samples. random_state : int or None, default None Random seed used to generate samples. .. note:: If not given, generates parameters sets with a random seed. mean : dict or None, default None If the samples are generated using a Gaussian distribution, **mean** is used to define the mean of the distribution for each Model parameter/state. It is a dictionary where keys are the name of the parameters/states defined in the **problem** argument. In this case, the truncated normal distribution may be used with respect to the boundary conditions defined in **problem**. None value inside the dictionary will be filled in with the center of the parameter/state bounds. .. note:: If not given and Gaussian distribution is used, the mean of the distribution will be set to the center of the parameter/state bounds. coef_std : float or None A coefficient related to the standard deviation in case of Gaussian generator: .. math:: std = \\frac{u - l}{coef\\_std} where :math:`u` and :math:`l` are the upper and lower bounds of Model parameters/states. .. note:: If not given and Gaussian distribution is used, **coef_std** is set to 3 as default: .. math:: std = \\frac{u - l}{3} Returns ------- res : SampleResult The generated samples result represented as a `SampleResult` object. See Also -------- SampleResult: Represents the generated samples using `smash.generate_samples` method. Model.get_bound_constraints: Get the boundary constraints of the Model parameters/states. Examples -------- Define the problem by a dictionary: >>> problem = { ... 'num_vars': 4, ... 'names': ['cp', 'cft', 'exc', 'lr'], ... 'bounds': [[1,2000], [1,1000], [-20,5], [1,1000]] ... } Generate samples with the uniform generator: >>> sr = smash.generate_samples(problem, n=3, random_state=99) >>> sr.to_dataframe() # convert SampleResult object to pandas.DataFrame cp cft exc lr 0 1344.884839 32.414941 -12.559438 7.818907 1 976.668720 808.241913 -18.832607 770.023235 2 1651.164853 566.051802 4.765685 747.020334 """ generator, mean = _standardize_generate_samples_args(problem, generator, mean) ret_dict = {key: [] for key in problem["names"]} ret_dict["generator"] = generator ret_dict["n_sample"] = n ret_dict["_problem"] = problem.copy() if random_state is not None: np.random.seed(random_state) for i, p in enumerate(problem["names"]): low = problem["bounds"][i][0] upp = problem["bounds"][i][1] if generator == "uniform": ret_dict[p] = np.random.uniform(low, upp, n) ret_dict["_" + p] = np.ones(n) / (upp - low) elif generator in ["normal", "gaussian"]: if coef_std is None: sd = (upp - low) / 3 else: sd = (upp - low) / coef_std trunc_normal = _get_truncated_normal(mean[p], sd, low, upp) ret_dict[p] = trunc_normal.rvs(size=n) ret_dict["_" + p] = trunc_normal.pdf(ret_dict[p]) return SampleResult(ret_dict)
def _get_truncated_normal(mean: float, sd: float, low: float, upp: float): return truncnorm((low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd) def _get_bound_constraints(setup: SetupDT, states: bool): if states: control_vector = STRUCTURE_STATES[setup.structure] else: control_vector = STRUCTURE_PARAMETERS[setup.structure] bounds = [] for name in control_vector: if name in setup._states_name: ind = np.argwhere(setup._states_name == name) l = setup._optimize.lb_states[ind].item() u = setup._optimize.ub_states[ind].item() else: ind = np.argwhere(setup._parameters_name == name) l = setup._optimize.lb_parameters[ind].item() u = setup._optimize.ub_parameters[ind].item() bounds += [[l, u]] problem = { "num_vars": len(control_vector), "names": control_vector, "bounds": bounds, } return problem def _standardize_problem(problem: dict | None, setup: SetupDT, states: bool): if problem is None: problem = _get_bound_constraints(setup, states) elif isinstance(problem, dict): prl_keys = problem.keys() if not all(k in prl_keys for k in PROBLEM_KEYS): raise KeyError( f"Problem dictionary should be defined with required keys {PROBLEM_KEYS}" ) unk_keys = [k for k in prl_keys if k not in PROBLEM_KEYS] if unk_keys: warnings.warn( f"Unknown key(s) found in the problem definition {unk_keys}. Choices: {PROBLEM_KEYS}" ) else: raise TypeError("The problem definition must be a dictionary or None") return problem def _standardize_generate_samples_args(problem: dict, generator: str, user_mean: dict): if isinstance(problem, dict): # simple check problem _standardize_problem(problem, None, None) else: raise TypeError("problem must be a dictionary") if isinstance(generator, str): # check generator generator = generator.lower() if generator not in SAMPLE_GENERATORS: raise ValueError( f"Unknown generator '{generator}': Choices: {SAMPLE_GENERATORS}" ) elif generator in ["normal", "gaussian"]: # check mean mean = dict(zip(problem["names"], np.mean(problem["bounds"], axis=1))) if user_mean is None: pass elif isinstance(user_mean, dict): for name, um in user_mean.items(): if not name in problem["names"]: warnings.warn( f"Key '{name}' does not match any existing names in the problem definition {problem['names']}" ) if isinstance(um, (int, float)): mean.update({name: um}) else: raise TypeError("mean value must be float or integer") else: raise TypeError("mean must be None or a dictionary") else: mean = user_mean else: raise TypeError("generator must be a string") return generator, mean