"""
containing modules to set up MESH models
"""
# third-party libraries
import numpy as np
import xarray as xr
import pandas as pd
import geopandas as gpd
from hydrant.topology import river_graph
# built-in libraries
from typing import (
Sequence,
Iterable,
Tuple,
Dict,
)
import re
# internal imports
from .geom import _calculate_polygon_areas
def _check_geo_obj_dtype(obj):
# check `obj`'s dtype
# if it is a path string
if isinstance(obj, str):
obj = gpd.read_file(obj)
# if it is a geopandas.GeoDataFrame
elif isinstance(obj, gpd.GeoDataFrame):
pass
# if it is a list of str paths
elif isinstance(obj[0], str):
obj = gpd.GeoDataFrame(pd.concat([gpd.read_file(f) for f in obj]))
else:
raise TypeError("The type of the `riv` could be a string path, "
"a list of string paths, or a geopandas.GeoDataframe "
"object")
# return a copy of the GeoDataFrame object
return obj.copy()
def _adjust_ids(
seg_id: np.ndarray,
ds_seg_id: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
[Temporary solution]: readjusts segments (and therefore downstream
segments) to allow all segments to be in between the nodes of the
longest branch found in a given river network
"""
# function limited names for the segments and downstream
# river segments
main_id_str = 'main_id'
ds_main_id_str = 'ds_main_id'
# creating a pandas DataFrame of `seg_id` and `ds_seg_id`
riv_df = pd.concat([pd.Series(arr) for arr in [seg_id, ds_seg_id]],
names=[main_id_str, ds_main_id_str],
axis=1)
# naming columns, in case needed
riv_df.columns = [main_id_str, ds_main_id_str]
# extracting the longest branch out of "hydrant"
longest_branch = river_graph.longest_branch(
riv=riv_df,
main_id=main_id_str,
ds_main_id=ds_main_id_str)
# selecting first and last (while not counting the outlet value, so -2)
first_node = longest_branch[0]
last_node = longest_branch[-2]
# extracting the index of first and last nodes
first_idx = riv_df.index[riv_df[main_id_str] == first_node]
last_idx = riv_df.index[riv_df[main_id_str] == last_node]
# building new `riv_df` with new index values
idx = pd.Index(first_idx.to_list() + riv_df.index.to_list())
idx = idx.drop_duplicates(keep='first')
idx = pd.Index(idx.to_list() + last_idx.to_list())
idx = idx.drop_duplicates(keep='last')
riv_df = riv_df.loc[idx]
# reseting index, just to reassure
riv_df.reset_index(drop=True, inplace=True)
# extracting np.ndarrays
new_seg_id = riv_df[main_id_str].to_numpy()
new_ds_seg_id = riv_df[ds_main_id_str].to_numpy()
return new_seg_id, new_ds_seg_id
def _prepare_landcover_mesh(
landcover: pd.DataFrame,
cat_dim: str,
gru_dim: str,
landcover_ds_name: str = 'landcover',
dummy_col_name: int = None,
dummy_col_value: int = 0,
) -> xr.Dataset:
'''Implements necessary landcover manipulations by:
1) removing columns with no contribution (sum=0),
2) assigning `cat_dim` column as index,
3) assigning a dummy variables as needed by MESH,
4) QA/QC on the select parameters indicated in `cols`,
5)
'''
if dummy_col_name is None:
dummy_col_name = 9999
# remove zero-sum columns
landcover = landcover.loc[:, landcover.sum(axis=0) > 0]
# removing non-digit characters from columns, if any
landcover.columns = [int(re.sub('\D', '', c)) for c in landcover.columns]
# set column name for landcover classes
landcover.columns.name = gru_dim
# adding necessary MESH dummy landcover variable - Hard-coded
landcover.insert(len(landcover.columns), dummy_col_name, dummy_col_value)
# making an xarray.DataArray
landcover_da = landcover.stack().to_xarray()
# converting to xr.Dataset
landcover_ds = landcover_da.to_dataset(name=landcover_ds_name)
return landcover_ds
def _fill_na_ds(
ds: xr.Dataset,
na_values: Dict[str, float],
) -> xr.Dataset:
'''Replacing NA values within an xarray.Dataset
object for different DataAarrays (corresponding to
the keys of the `na_values`) with the corresponding
values of `na_values`.
'''
# make a copy of the dataset
ds = ds.copy()
# replacing NAs
for var, val in na_values.items():
ds[var] = xr.where(ds[var] == np.nan, val, ds[var])
return ds
def _downcast_to_int(
ds: xr.Dataset,
int_vars: Iterable[str],
) -> xr.Dataset:
'''Downcasts the variables of `ds` to int32 type
if they are not already int32.
Parameters
----------
ds : xarray.Dataset
Dataset object of interest
int_vars : Iterable[str]
Iterable of variable names to be downcasted to int32
Returns
-------
ds : xarray.Dataset
Dataset with specified variables downcasted to int32
'''
# make a copy of the dataset
ds = ds.copy()
# downcast to int32
for var in int_vars:
if var in ds and ds[var].dtype != np.int32:
ds[var] = ds[var].astype(np.int32)
return ds
def _prepare_geodataframe_mesh(
geodf: gpd.GeoDataFrame,
cat_dim: str,
geometry_dim: str = 'geometry',
) -> xr.Dataset:
'''Implements necessary geodf manipulations by:
1) removing the `geometry_dim` column of the
geopandas.GeoDataFrame object and turning it
into a pandas.DataFrame automatically
2) setting index of the new DataFrame to the
`cat_dim` indicating IDs for each element of
`geodf`
3) returning an xarray.Dataset
'''
# drop the `geometry_var` column
geodf = geodf.copy().drop(columns=geometry_dim)
return geodf.set_index(cat_dim).to_xarray()
[docs]
def prepare_mesh_ddb(
riv: gpd.GeoDataFrame,
cat: gpd.GeoDataFrame,
landcover: pd.DataFrame,
cat_dim: str,
gru_dim: str,
hru_dim: str,
gru_names: Sequence[str],
include_vars: Dict[str, str],
attr_local: Dict[str, Dict[str, str]],
attr_global: Dict[str, str],
min_values: Dict[str, float] = None,
fill_na: Dict[str, float] = None,
ordered_dims: Dict[str, Iterable] = None,
ddb_units: Dict[str, str] = None,
ddb_to_units: Dict[str, str] = None,
) -> xr.Dataset:
"""
Prepares the drainage database (ddb) for the MESH model.
This function applies a set of manipulations to river network and catchment
geospatial data to construct the drainage database required by MESH.
Parameters
----------
riv : str, list of str, or geopandas.GeoDataFrame
Path(s) to ESRI Shapefile(s) or a GeoDataFrame containing river
LineString(s).
cat : str, list of str, or geopandas.GeoDataFrame
Path(s) to ESRI Shapefile(s) or a GeoDataFrame containing catchment
Polygon(s) or MultiPolygon(s).
landcover : pandas.DataFrame
DataFrame of land use class fractions (values between 0 and 1) for each
catchment. Columns are land cover classes; rows are catchment elements.
cat_dim : str
Name of the catchment dimension present in `cat`, `riv`, and `landcover`.
gru_dim : str
Name of the landcover class dimension after processing `landcover`.
hru_dim : str
Name of the hydrological response unit (HRU) dimension for output.
gru_names : Sequence of str
List of landcover class names, ordered to match processed `landcover`.
include_vars : dict
Dictionary mapping variable names in input data to output names in ddb.
attr_local : dict
Dictionary mapping output variable names to their attribute dictionaries.
attr_global : dict
Dictionary mapping global attribute names to their descriptions.
min_values : dict, optional
Minimum values for variables in ddb. Keys must match output variable names.
fill_na : dict, optional
Values to fill for NaNs in each variable of ddb. Keys are variable names.
ordered_dims : dict, optional
Dictionary mapping dimension names to ordered values for sorting ddb.
ddb_units : dict, optional
Dictionary mapping variable names to their units for quantification.
ddb_to_units : dict, optional
Dictionary mapping variable names to target units for conversion.
Returns
-------
ddb : xarray.Dataset
Drainage database as an xarray.Dataset containing all required variables
and attributes for MESH model setup.
"""
# define necessary variables
geometry_var = 'geometry' # geopandas.GeoDataFrame geometry column name
landcover_var = 'landclass' # landcover variable name in the object
dummy_col = 9999 # MESH's dummy landcover class dimension/column name
dummy_col_value = 0 # MESH's dummy landcover class value
# extract order of `cat_dim`
cat_dim_array = riv.loc[:, cat_dim].copy().to_numpy()
# check `riv` and `cat` dtypes
riv = _check_geo_obj_dtype(riv)
cat = _check_geo_obj_dtype(cat)
# calculate catchment areas in meters squared
grid_area = _calculate_polygon_areas(
gdf=cat,
target_area_unit='m ** 2',
equal_area_crs='ESRI:54009'
)
grid_area['area'] = grid_area['area'].pint.magnitude
# check `landcover` and `coords` dtypes
if not isinstance(landcover, pd.DataFrame):
raise TypeError("`landcover` and `coords` must be of type "
"pandas.DataFrame")
# make actual xarray.Dataset objects for `riv` and `cat`
riv_ds, cat_ds, grid_area_ds = (_prepare_geodataframe_mesh(
geodf,
cat_dim=cat_dim,
geometry_dim=geometry_var)
for geodf in [riv, cat, grid_area])
# make xarray.Dataset object for `landcover`
landcover_ds = _prepare_landcover_mesh(
landcover,
cat_dim=cat_dim,
gru_dim=gru_dim,
landcover_ds_name=landcover_var,
dummy_col_name=dummy_col,
dummy_col_value=dummy_col_value)
# add landcover classes names present in the setup to `ddb`
# note that the last landcover class/name is a dummy one
_gru_list = landcover_ds[gru_dim].to_numpy()
# creating dictionary for final merging
lc_names = {
"coords": {
gru_dim: {
"dims": gru_dim,
"data": _gru_list,
},
},
"dims": gru_dim,
"data_vars": {
"landclass_names": {
"dims": gru_dim,
"data": [gru_names[c] for c in _gru_list[0:-1]] + ['Dump']
},
},
}
lc_names_da = xr.Dataset.from_dict(lc_names)
# making a list of all xarray.Dataset objects
ds_list = [
landcover_ds,
lc_names_da,
riv_ds,
cat_ds,
grid_area_ds]
# merging all xarray objects into one
ddb = xr.combine_by_coords(ds_list, compat='override')
# only select `include_vars` variable(s) while renaming
# based on the _values_ (NOT KEYS) of the dictionary
ddb = ddb.rename_vars(name_dict=include_vars)
ddb = ddb[list(include_vars.values())]
# fill NA values for `ddb`
if fill_na is not None:
ddb = _fill_na_ds(ds=ddb,
na_values=fill_na)
# assigning local attributes for each variable
if attr_local:
for var, val in attr_local.items():
if var in ddb:
for attr, desc in val.items():
ddb[var].attrs[attr] = desc
# assigning global attributes for `ddb`
if attr_global:
for attr, desc in attr_global.items():
ddb.attrs[attr] = desc
# check to see if all the keys included in the `units` dictionary are
# found inside the `variables` sequence
for k in ddb_units:
if k not in ddb:
raise ValueError(f"item {k} defined in `units` cannot be found"
" in `variables`")
# if all elements of `variables` not found in `units`,
# assign them to None
for v in include_vars:
if v not in ddb_units:
ddb_units[v] = None
# now assign the units - assuming `pint`'s default unit registery
ddb = ddb.pint.quantify(units=ddb_units)
# if `to_units` is defined
if ddb_to_units:
# if dimensionless values exists, pop them
ddb_to_units = {k: v for k, v in ddb_to_units.items()
if v != 'dimensionless'}
# implement the conversion
ddb = ddb.pint.to(units=ddb_to_units)
# print the netCDF file
ddb = ddb.pint.dequantify()
# sort dimensions of `ddb`
if ordered_dims is not None:
ddb = ddb.reindex(indexers=ordered_dims,
copy=True)
# rename ddb dimension
ddb = ddb.rename({cat_dim: hru_dim})
# change the value of `gru` dimension
ddb[gru_dim] = range(0, len(ddb[gru_dim]))
# # assinging minimum values for `ddb`
if min_values is not None:
# only apply to variables that are present in `ddb`
min_values = {k: v for k, v in min_values.items()
if k in ddb}
# if min_values turns to be non-empty
if min_values:
for var, val in min_values.items():
ddb[var] = ddb[var].where(ddb[var] > val,
val,
drop=False)
# downcast specific variables to int32 if exist
int_vars = ['IAK', 'Rank', 'Next']
for var in int_vars:
if var not in ddb:
int_vars.remove(var)
# calling the downcast function
ddb = _downcast_to_int(ddb, int_vars=int_vars)
# assure the order of `hru_dim` is correct
ddb = ddb.loc[{hru_dim: cat_dim_array}].copy()
# rename coordinate variable names to avoid confusion with dimension
# names
ddb = ddb.drop_vars(gru_dim)
# assure the subbasin values are `integer`
ddb['subbasin'] = ddb['subbasin'].astype(int)
return ddb