Source code for smash.core.signatures

from __future__ import annotations

from smash.solver._mw_forward import forward

from smash.core._constant import SIGNS

from smash.core._event_segmentation import (
    _detect_peaks,
    _missing_values,
    _events_grad,
    _baseflow_separation,
    _get_season,
    _check_unknown_options_event_seg,
)

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from smash.core.model import Model

import numpy as np
import pandas as pd
from SALib import ProblemSpec
from tqdm import tqdm
import warnings

__all__ = ["SignResult", "SignSensResult"]


[docs]class SignResult(dict): """ Represents signatures computation result. Notes ----- This class is essentially a subclass of dict with attribute accessors. Attributes ---------- cont : dict Continuous signatures. The keys are - 'obs': a dataframe representing observation results. - 'sim': a dataframe representing simulation results. The column names of both dataframes consist of the catchment code and studied signature names. event : dict Flood event signatures. The keys are - 'obs': a dataframe representing observation results. - 'sim': a dataframe representing simulation results. The column names of both dataframes consist of the catchment code, the season that event occurrs, the beginning/end of each event and studied signature names. See Also -------- Model.signatures: Compute continuous or/and flood event signatures of the Model. """ def __getattr__(self, name): try: return self[name] except KeyError as e: raise AttributeError(name) from e __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def __repr__(self): if self.keys(): m = max(map(len, list(self.keys()))) + 1 return "\n".join( [ k.rjust(m) + ": " + repr(v) for k, v in sorted(self.items()) if not k.startswith("_") ] ) else: return self.__class__.__name__ + "()" def __dir__(self): return list(self.keys())
[docs]class SignSensResult(dict): """ Represents signatures sensitivity computation result. Notes ----- This class is essentially a subclass of dict with attribute accessors. Attributes ---------- cont : dict A dictionary with two keys - 'total_si' : representing the total-order Sobol indices of the hydrological model parameters to continuous signatures. - 'first_si' : representing the first-order Sobol indices of the hydrological model parameters to continuous signatures. Each value of the dictionary is a sub-dictionary with the keys are the hydrological model parameters. Then each value of each sub-dictionary (associating to a model parameter) is a dataframe containing the sensitivities of the associated model parameter to all studied signatures. The column names of each dataframe consist of the catchment code and studied signature names. event : dict A dictionary with two keys - 'total_si' : representing the total-order Sobol indices of the hydrological model parameters to flood event signatures. - 'first_si' : representing the first-order Sobol indices of the hydrological model parameters to flood event signatures. Each value of the dictionary is a sub-dictionary with the keys are the hydrological model parameters. Then each value of each sub-dictionary (associating to a model parameter) is a dataframe containing the sensitivities of the associated model parameter to all studied signatures. The column names of each dataframe consist of the catchment code, the season that event occurrs, the beginning/end of each event and studied signature names. sample: pandas.DataFrame A dataframe containing the generated samples used to compute sensitivity indices. See Also -------- Model.signatures_sensitivity: Compute the first- and total-order variance-based sensitivity (Sobol indices) of spatially uniform hydrological model parameters to the output signatures. """ def __getattr__(self, name): try: return self[name] except KeyError as e: raise AttributeError(name) from e __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def __repr__(self): if self.keys(): m = max(map(len, list(self.keys()))) + 1 return "\n".join( [ k.rjust(m) + ": " + repr(v) for k, v in sorted(self.items()) if not k.startswith("_") ] ) else: return self.__class__.__name__ + "()" def __dir__(self): return list(self.keys())
def _standardize_signatures(sign: str | list[str] | None): if sign is None: sign = SIGNS elif isinstance(sign, str): if sign not in SIGNS: raise ValueError(f"Unknown signature {sign}. Choices: {SIGNS}") else: sign = [sign] elif isinstance(sign, list): unk_sign = tuple(s for s in sign if s not in SIGNS) if unk_sign: raise ValueError(f"Unknown signature(s) {unk_sign}. Choices: {SIGNS}") else: raise TypeError(f"Signature(s) must be a str, a list of str or None") cs = [s for s in sign if s[0] == "C"] es = [s for s in sign if s[0] == "E"] return cs, es def _continuous_signatures(p: np.ndarray, q: np.ndarray, list_signatures: list[str]): res = [] qb, qq = _baseflow_separation(q) qp = q[q >= 0] for signature in list_signatures: if signature == "Crc": try: res.append(np.sum(q) / np.sum(p)) except: res.append(np.nan) if signature == "Crchf": try: res.append(np.sum(qq) / np.sum(p)) except: res.append(np.nan) if signature == "Crclf": try: res.append(np.sum(qb) / np.sum(p)) except: res.append(np.nan) if signature == "Crch2r": try: res.append(np.sum(qq) / np.sum(q)) except: res.append(np.nan) if signature == "Cfp2": try: res.append(np.quantile(qp, 0.02)) except: res.append(np.nan) if signature == "Cfp10": try: res.append(np.quantile(qp, 0.1)) except: res.append(np.nan) if signature == "Cfp50": try: res.append(np.quantile(qp, 0.5)) except: res.append(np.nan) if signature == "Cfp90": try: res.append(np.quantile(qp, 0.9)) except: res.append(np.nan) return res def _event_signatures( p: np.ndarray, q: np.ndarray, start: int, peakp: float, peakq: float, list_signatures: list[str], ): res = [] qb, qq = _baseflow_separation(q) deteq = True for signature in list_signatures: if signature == "Eff": res.append(np.mean(qq)) if signature == "Ebf": res.append(np.mean(qb)) if signature == "Erc": try: res.append(np.sum(q) / np.sum(p)) except: res.append(np.nan) if signature == "Erchf": try: res.append(np.sum(qq) / np.sum(p)) except: res.append(np.nan) if signature == "Erclf": try: res.append(np.sum(qb) / np.sum(p)) except: res.append(np.nan) if signature == "Erch2r": try: res.append(np.sum(qq) / np.sum(q)) except: res.append(np.nan) if (signature == "Elt" or signature == "Epf") and deteq: deteq = False if peakq is None: try: peakq = ( _detect_peaks(q, mpd=len(q))[0] + start ) # detect only 1 peak except: peakq = start + len(q) - 1 if signature == "Elt": res.append(peakq - peakp) if signature == "Epf": res.append(q[peakq - start]) return res def _signatures_comp( instance: Model, cs: list[str], es: list[str], peak_quant: float, max_duration: float, obs_comp: bool, # decide if process observation computation or not. warn: bool, ): prcp_cvt = ( instance.input_data.mean_prcp.copy() * 0.001 * instance.mesh.area[..., np.newaxis] / instance.setup.dt ) # convert precip from mm to m3/s date_range = pd.date_range( start=instance.setup.start_time, periods=instance.input_data.qobs.shape[1], freq=f"{int(instance.setup.dt)}s", ) col_cs = ["code"] + cs col_es = ["code", "season", "start", "end"] + es dfsim_cs = pd.DataFrame(columns=col_cs) dfsim_es = pd.DataFrame(columns=col_es) dfobs_cs = pd.DataFrame(columns=col_cs) dfobs_es = pd.DataFrame(columns=col_es) if len(cs) + len(es) > 0: for i, catchment in enumerate(instance.mesh.code): prcp_tmp, qobs_tmp, ratio = _missing_values( prcp_cvt[i, :], instance.input_data.qobs[i, :] ) if prcp_tmp is None: if warn: warnings.warn( f"Reject data at catchment {catchment} ({round(ratio * 100, 2)}% of missing values)" ) row_cs = pd.DataFrame( [[catchment] + [np.nan] * (len(col_cs) - 1)], columns=col_cs ) row_es = pd.DataFrame( [[catchment] + [np.nan] * (len(col_es) - 1)], columns=col_es ) dfsim_cs = pd.concat([dfsim_cs, row_cs], ignore_index=True) dfsim_es = pd.concat([dfsim_es, row_es], ignore_index=True) dfobs_cs = pd.concat([dfobs_cs, row_cs], ignore_index=True) dfobs_es = pd.concat([dfobs_es, row_es], ignore_index=True) else: qsim_tmp = instance.output.qsim[i, :].copy() if len(cs) > 0: csignatures_sim = _continuous_signatures( prcp_tmp, qsim_tmp, list_signatures=cs ) rowsim_cs = pd.DataFrame( [[catchment] + csignatures_sim], columns=col_cs ) dfsim_cs = pd.concat([dfsim_cs, rowsim_cs], ignore_index=True) if obs_comp: csignatures_obs = _continuous_signatures( prcp_tmp, qobs_tmp, list_signatures=cs ) rowobs_cs = pd.DataFrame( [[catchment] + csignatures_obs], columns=col_cs ) dfobs_cs = pd.concat([dfobs_cs, rowobs_cs], ignore_index=True) if len(es) > 0: list_events = _events_grad( prcp_tmp, qobs_tmp, peak_quant, max_duration, instance.setup.dt ) if len(list_events) == 0: row_es = pd.DataFrame( [[catchment] + [np.nan] * (len(col_es) - 1)], columns=col_es, ) dfsim_es = pd.concat([dfsim_es, row_es], ignore_index=True) dfobs_es = pd.concat([dfobs_es, row_es], ignore_index=True) else: for t in list_events: ts = t["start"] te = t["end"] event_prcp = prcp_tmp[ts : te + 1] event_qobs = qobs_tmp[ts : te + 1] event_qsim = qsim_tmp[ts : te + 1] season = _get_season(date_range[ts].date()) esignatures_sim = _event_signatures( event_prcp, event_qsim, ts, t["peakP"], None, list_signatures=es, ) rowsim_es = pd.DataFrame( [ [catchment, season, date_range[ts], date_range[te]] + esignatures_sim ], columns=col_es, ) dfsim_es = pd.concat( [dfsim_es, rowsim_es], ignore_index=True ) if obs_comp: esignatures_obs = _event_signatures( event_prcp, event_qobs, ts, t["peakP"], t["peakQ"], list_signatures=es, ) rowobs_es = pd.DataFrame( [ [ catchment, season, date_range[ts], date_range[te], ] + esignatures_obs ], columns=col_es, ) dfobs_es = pd.concat( [dfobs_es, rowobs_es], ignore_index=True ) return SignResult( dict( zip( ["cont", "event"], [ {"obs": dfobs_cs, "sim": dfsim_cs}, {"obs": dfobs_es, "sim": dfsim_es}, ], ) ) ) def _signatures( instance: Model, cs: list[str], es: list[str], obs_comp: bool, peak_quant: float = 0.995, max_duration: float = 240, **unknown_options, ): if es: _check_unknown_options_event_seg(unknown_options) return _signatures_comp( instance, cs, es, peak_quant, max_duration, obs_comp=obs_comp, warn=True ) def _signatures_sensitivity( instance: Model, problem: dict, n: int, cs: list[str], es: list[str], seed: None | int, peak_quant: float = 0.995, max_duration: float = 240, **unknown_options, ): if es: _check_unknown_options_event_seg(unknown_options) second_order = False # do not compute second order sensitivity # % Define problem and initialize SALib object sp = ProblemSpec(problem) # % Generate samples sp.sample_sobol(n, calc_second_order=second_order, seed=seed) # % Compute outputs (signatures computation) dfs_cs = [] # list of dataframes for CS dfs_es = [] # list of dataframes for ES for i in tqdm(range(len(sp.samples)), desc="</> Computing signatures sensitivity"): for j, name in enumerate(problem["names"]): setattr(instance.parameters, name, sp.samples[i, j]) cost = np.float32(0) forward( instance.setup, instance.mesh, instance.input_data, instance.parameters, instance.parameters.copy(), instance.states, instance.states.copy(), instance.output, cost, ) res_sign = _signatures_comp( instance, cs, es, peak_quant, max_duration, obs_comp=False, warn=(i == 0) ) dfs_cs.append(res_sign.cont["sim"]) dfs_es.append(res_sign.event["sim"]) # % Sensitivity computation dfinfo_cs = dfs_cs[0][["code"]] dfinfo_es = dfs_es[0][["code", "season", "start", "end"]] res_sa = [] for dfinfo, dfs, signs in zip( [dfinfo_cs, dfinfo_es], [dfs_cs, dfs_es], [cs, es] ): # loop for CS and ES dict_sa_tot = {key: [] for key in problem["names"]} dict_sa_first = {key: [] for key in problem["names"]} for name in signs: total_si = {key: [] for key in problem["names"]} first_si = {key: [] for key in problem["names"]} for j in range(len(dfinfo["code"])): y = np.array( [dfs[i][name].loc[j] for i in range(len(dfs))] ) # signature output sp.set_results(y) # send the output to the SALib interface sp.analyze_sobol(calc_second_order=second_order) # estimate sensitivity tsi, fsi = sp.to_df() # get estimated results for ip, param in enumerate(problem["names"]): total_si[param].append(tsi.iloc[:, 0].iloc[ip]) first_si[param].append(fsi.iloc[:, 0].iloc[ip]) for param in problem["names"]: dict_sa_tot[param].append(pd.Series(total_si[param], name=name)) dict_sa_first[param].append(pd.Series(first_si[param], name=name)) res_sa.append( { "total_si": { param: pd.concat( [dfinfo] + dict_sa_tot[param], axis=1, join="inner" ) for param in problem["names"] }, "first_si": { param: pd.concat( [dfinfo] + dict_sa_first[param], axis=1, join="inner" ) for param in problem["names"] }, } ) # concat dfinfo and dict_sa return SignSensResult( dict( zip( ["cont", "event", "sample"], res_sa + [pd.DataFrame(sp.samples, columns=problem["names"])], ) ) )