from __future__ import annotations
from smash.mesh._mw_meshing import mw_meshing
import errno
import os
import warnings
import numpy as np
from osgeo import gdal, osr
D8_VALUE = np.arange(1, 9)
__all__ = ["generate_mesh"]
def _xy_to_rowcol(x, y, xmin, ymax, xres, yres):
row = int((ymax - y) / yres)
col = int((x - xmin) / xres)
return row, col
def _rowcol_to_xy(row, col, xmin, ymax, xres, yres):
x = int(col * xres + xmin)
y = int(ymax - row * yres)
return x, y
def _trim_zeros_2d(array, shift_value=False):
for ax in [0, 1]:
mask = ~(array == 0).all(axis=ax)
inv_mask = mask[::-1]
start_ind = np.argmax(mask)
end_ind = len(inv_mask) - np.argmax(inv_mask)
if ax == 0:
scol, ecol = start_ind, end_ind
array = array[:, start_ind:end_ind]
else:
srow, erow = start_ind, end_ind
array = array[start_ind:end_ind, :]
if shift_value:
return array, scol, ecol, srow, erow
else:
return array
def _array_to_ascii(array, path, xmin, ymin, cellsize, no_data_val):
array = np.copy(array)
array[np.isnan(array)] = no_data_val
header = (
f"NCOLS {array.shape[1]} \nNROWS {array.shape[0]}"
f"\nXLLCENTER {xmin} \nYLLCENTER {ymin} \nCELLSIZE {cellsize} \nNODATA_value {no_data_val}\n"
)
with open(path, "w") as f:
f.write(header)
np.savetxt(f, array, "%5.2f")
def _get_array(ds_flwdir, bbox=None):
if bbox:
xmin, xmax, xres, ymin, ymax, yres = _get_transform(ds_flwdir)
col_off = int((bbox[0] - xmin) / xres)
row_off = int((ymax - bbox[3]) / yres)
ncol = int((bbox[1] - bbox[0]) / xres)
nrow = int((bbox[3] - bbox[2]) / yres)
flwdir = ds_flwdir.GetRasterBand(1).ReadAsArray(col_off, row_off, ncol, nrow)
else:
flwdir = ds_flwdir.GetRasterBand(1).ReadAsArray()
if np.any(~np.isin(flwdir, D8_VALUE), where=(flwdir > 0)):
raise ValueError(f"Flow direction data is invalid. Value must be in {D8_VALUE}")
return flwdir
def _get_transform(ds_flwdir):
ncol = ds_flwdir.RasterXSize
nrow = ds_flwdir.RasterYSize
transform = ds_flwdir.GetGeoTransform()
xmin = transform[0]
xres = transform[1]
xmax = xmin + ncol * xres
ymax = transform[3]
yres = -transform[5]
ymin = ymax - nrow * yres
return xmin, xmax, xres, ymin, ymax, yres
def _get_srs(ds_flwdir, epsg):
projection = ds_flwdir.GetProjection()
if projection:
srs = osr.SpatialReference(wkt=projection)
else:
if epsg:
srs = osr.SpatialReference()
srs.ImportFromEPSG(int(epsg))
else:
raise ValueError(
"Flow direction file does not contain projection information. Can be specified with the 'epsg' argument"
)
return srs
def _standardize_gauge(ds_flwdir, x, y, area, code):
x = np.array(x, dtype=np.float32, ndmin=1)
y = np.array(y, dtype=np.float32, ndmin=1)
area = np.array(area, dtype=np.float32, ndmin=1)
if (x.size != y.size) or (y.size != area.size):
raise ValueError(
f"Inconsistent size for 'x' ({x.size}), 'y' ({y.size}) and 'area' ({area.size})"
)
xmin, xmax, xres, ymin, ymax, yres = _get_transform(ds_flwdir)
if np.any((x < xmin) | (x > xmax)):
raise ValueError(f"'x' {x} value(s) out of flow directions bounds {xmin, xmax}")
if np.any((y < ymin) | (y > ymax)):
raise ValueError(f"'y' {y} value(s) out of flow directions bounds {ymin, ymax}")
if np.any(area < 0):
raise ValueError(f"Negative 'area' value(s) {area}")
# % Setting _c0, _c1, ... _cN-1 as default codes
if code is None:
code = np.array([f"_c{i}" for i in range(x.size)])
elif isinstance(code, (str, list)):
code = np.array(code, ndmin=1)
# % Only check x (y and area already check above)
if code.size != x.size:
raise ValueError(
f"Inconsistent size for 'code' ({code.size}) and 'x' ({x.size})"
)
return x, y, area, code
def _standardize_bbox(ds_flwdir, bbox):
# % Bounding Box (xmin, xmax, ymin, ymax)
if isinstance(bbox, (list, tuple, set)):
bbox = list(bbox)
if len(bbox) != 4:
raise ValueError(f"'bbox argument must be of size 4 ({len(bbox)})")
if bbox[0] > bbox[1]:
raise ValueError(
f"'bbox' xmin ({bbox[0]}) is greater than xmax ({bbox[1]})"
)
if bbox[2] > bbox[3]:
raise ValueError(
f"'bbox' ymin ({bbox[2]}) is greater than ymax ({bbox[3]})"
)
xmin, xmax, xres, ymin, ymax, yres = _get_transform(ds_flwdir)
if bbox[0] < xmin:
warnings.warn(
f"'bbox' xmin ({bbox[0]}) is out of flow directions bound ({xmin}). 'bbox' is update according to flow directions bound"
)
bbox[0] = xmin
if bbox[1] > xmax:
warnings.warn(
f"'bbox' xmax ({bbox[1]}) is out of flow directions bound ({xmax}). 'bbox' is update according to flow directions bound"
)
bbox[1] = xmax
if bbox[2] < ymin:
warnings.warn(
f"'bbox' ymin ({bbox[2]}) is out of flow directions bound ({ymin}). 'bbox' is update according to flow directions bound"
)
bbox[2] = ymin
if bbox[3] > ymax:
warnings.warn(
f"'bbox' ymax ({bbox[3]}) is out of flow directions bound ({ymax}). 'bbox' is update according to flow directions bound"
)
bbox[3] = ymax
else:
raise TypeError("'bbox' argument must be list-like object")
return bbox
def _get_path(flwacc):
ind_path = np.unravel_index(np.argsort(flwacc, axis=None), flwacc.shape)
path = np.zeros(shape=(2, flwacc.size), dtype=np.int32, order="F")
path[0, :] = ind_path[0]
path[1, :] = ind_path[1]
return path
def _get_mesh_from_xy(ds_flwdir, x, y, area, code, max_depth, epsg):
(xmin, xmax, xres, ymin, ymax, yres) = _get_transform(ds_flwdir)
srs = _get_srs(ds_flwdir, epsg)
(x, y, area, code) = _standardize_gauge(ds_flwdir, x, y, area, code)
flwdir = _get_array(ds_flwdir)
dx = xres
row_dln = np.zeros(shape=x.shape, dtype=np.int32)
col_dln = np.zeros(shape=x.shape, dtype=np.int32)
area_dln = np.zeros(shape=x.shape, dtype=np.float32)
mask_dln = np.zeros(shape=flwdir.shape, dtype=np.int32)
for ind in range(x.size):
row, col = _xy_to_rowcol(x[ind], y[ind], xmin, ymax, xres, yres)
mask_dln_imd, row_dln[ind], col_dln[ind] = mw_meshing.catchment_dln(
flwdir, row, col, xres, yres, area[ind], max_depth
)
area_dln[ind] = np.count_nonzero(mask_dln_imd == 1) * (xres * yres)
mask_dln = np.where(mask_dln_imd == 1, 1, mask_dln)
flwdir = np.ma.masked_array(flwdir, mask=(1 - mask_dln))
mask_dln, scol, ecol, srow, erow = _trim_zeros_2d(mask_dln, shift_value=True)
flwdir = flwdir[srow:erow, scol:ecol]
xmin_shifted = xmin + scol * xres
ymax_shifted = ymax - srow * yres
row_dln = row_dln - srow
col_dln = col_dln - scol
flwacc = mw_meshing.flow_accumulation(flwdir)
flwdst = mw_meshing.flow_distance(flwdir, row_dln, col_dln, area_dln, dx)
path = _get_path(flwacc)
flwdst = np.ma.masked_array(flwdst, mask=(1 - mask_dln))
flwacc = np.ma.masked_array(flwacc, mask=(1 - mask_dln))
active_cell = mask_dln.astype(np.int32)
gauge_pos = np.column_stack((row_dln, col_dln))
mesh = {
"dx": dx,
"nrow": flwdir.shape[0],
"ncol": flwdir.shape[1],
"ng": x.size,
"nac": np.count_nonzero(active_cell),
"xmin": xmin_shifted,
"ymax": ymax_shifted,
"flwdir": flwdir,
"flwdst": flwdst,
"flwacc": flwacc,
"path": path,
"gauge_pos": gauge_pos,
"code": code,
"area": area_dln,
"active_cell": active_cell,
}
return mesh
def _get_mesh_from_bbox(ds_flwdir, bbox, epsg):
(xmin, xmax, xres, ymin, ymax, yres) = _get_transform(ds_flwdir)
srs = _get_srs(ds_flwdir, epsg)
bbox = _standardize_bbox(ds_flwdir, bbox)
flwdir = _get_array(ds_flwdir, bbox)
flwdir = np.ma.masked_array(flwdir, mask=(flwdir < 1))
dx = xres
flwacc = mw_meshing.flow_accumulation(flwdir)
path = _get_path(flwacc)
flwacc = np.ma.masked_array(flwacc, mask=(flwdir < 1))
active_cell = np.zeros(shape=flwdir.shape, dtype=np.int32)
active_cell = np.where(flwdir > 0, 1, active_cell)
mesh = {
"dx": dx,
"nrow": flwdir.shape[0],
"ncol": flwdir.shape[1],
"ng": 0,
"nac": np.count_nonzero(active_cell),
"xmin": bbox[0],
"ymax": bbox[3],
"flwdir": flwdir,
"flwacc": flwacc,
"path": path,
"active_cell": active_cell,
}
return mesh
[docs]def generate_mesh(
path: str,
bbox: None | list | tuple | set = None,
x: None | float | list | tuple | set = None,
y: None | float | list | tuple | set = None,
area: None | float | list | tuple | set = None,
code: None | str | list | tuple | set = None,
max_depth: int = 1,
epsg: None | str | int = None,
) -> dict:
"""
Automatic mesh generation.
.. hint::
See the :ref:`User Guide <user_guide.in_depth.automatic_meshing>` for more.
Parameters
----------
path : str
Path to the flow directions file. The file will be opened with `gdal <https://gdal.org/api/python/osgeo.gdal.html>`__.
bbox : sequence or None, default None
Bounding box with the following convention:
``(xmin, xmax, ymin, ymax)``.
The bounding box values must respect the CRS of the flow directions file.
.. note::
If not given, **x**, **y** and **area** must be filled in.
x : float, sequence or None, default None
The x-coordinate(s) of the catchment outlet(s) to mesh.
The **x** value(s) must respect the CRS of the flow directions file.
The **x** size must be equal to **y** and area.
y : float, sequence or None, default None
The y-coordinate(s) of the catchment outlet(s) to mesh.
The **y** value(s) must respect the CRS of the flow directions file.
The **y** size must be equal to **x** and **area**.
area : float, sequence or None, default None
The area of the catchment(s) to mesh in m².
The **area** size must be equal to **x** and **y**.
code : str, sequence or None, default None
The code of the catchment(s) to mesh.
The **code** size must be equal to **x**, **y** and **area**.
In case of bouding box meshing, the **code** argument is not used.
.. note::
If not given, the default code is:
``['_c0', '_c1', ..., '_cn-1']`` with :math:`n`, the number of gauges.
max_depth : int, default 1
The maximum depth accepted by the algorithm to find the catchment outlet.
A **max_depth** of 1 means that the algorithm will search among the 2-length combinations in:
``(row - 1, row, row + 1, col - 1, col, col + 1)``, the coordinates that minimize the relative error between
the given catchment area and the modeled catchment area calculated from the flow directions file.
This can be generalized to :math:`n`.
.. image:: ../../_static/max_depth.png
:align: center
:width: 350
epsg : str, int or None, default None
The EPSG value of the flow directions file. By default, if the projection is well
defined in the flow directions file. It is not necessary to provide the value of
the EPSG. On the other hand, if the projection is not well defined in the flow directions file
(i.e. in ASCII file). The **epsg** argument must be filled in.
Returns
-------
dict
A mesh dictionary that can be used to initialize the `Model` object.
See Also
--------
Model: Primary data structure of the hydrological model `smash`.
Examples
--------
>>> flwdir = smash.load_dataset("flwdir")
>>> mesh = smash.generate_mesh(
... flwdir,
... x=[840_261, 826_553, 828_269],
... y=[6_457_807, 6_467_115, 6_469_198],
... area=[381.7 * 1e6, 107 * 1e6, 25.3 * 1e6],
... code=["V3524010", "V3515010", "V3517010"],
... )
>>> mesh.keys()
dict_keys(['dx', 'nrow', 'ncol', 'ng', 'nac', 'xmin', 'ymax',
'flwdir', 'flwdst', 'flwacc', 'path', 'gauge_pos',
'code', 'area', 'active_cell'])
"""
if os.path.isfile(path):
ds_flwdir = gdal.Open(path)
else:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
if bbox is None:
if x is None or y is None or area is None:
raise ValueError(
"'bbox' argument or 'x', 'y' and 'area' arguments must be defined"
)
else:
return _get_mesh_from_xy(ds_flwdir, x, y, area, code, max_depth, epsg)
else:
return _get_mesh_from_bbox(ds_flwdir, bbox, epsg)