import logging.config
import warnings
from collections.abc import Callable
from logging import getLogger
from typing import Any
import geopandas as gpd
import intake
import xarray as xr
from .scripting import LOGGING_CONFIG
from .utils import cache_catalog
from .validations import _validate_space_params
from .workflows import (
climate_request,
gis_request,
hydrometric_request,
user_provided_dataset,
)
logging.config.dictConfig(LOGGING_CONFIG)
logger = getLogger(__name__)
url_path = "https://raw.githubusercontent.com/hydrocloudservices/catalogs/main/catalogs/main.yaml"
__all__ = ["Query"]
[docs]
class Query: # numpydoc ignore=PR09
"""
The Query class.
The Query interface facilitates access to analysis-ready earth observation datasets and allows for
spatiotemporal operations to be performed based on user queries.
Parameters
----------
datasets : str, list, dict-like
If a str, a dataset name, i.e.: era5_land_reanalysis.
If a list, a list of dataset names, i.e.: [era5_single_levels_reanalysis, era5_land_reanalysis].
If a dictionary, it should map dataset names to their corresponding requested
content such as some desired variables. See the notes below for more details.
The list of available datasets in this library is coming soon!
space : dict-like
A dictionary that maps spatial parameters with their corresponding value.
More information on accepted key/value pairs : :py:meth:`~xdatasets.Query._resolve_space_params`.
time : dict-like
A dictionary that maps temporal parameters with their corresponding value.
More information on accepted key/value pairs : :py:meth:`~xdatasets.Query._resolve_time_params`.
catalog_path : str
URL for the intake catalog which provides access to the datasets. While this library provides its own
intake catalog, users have the option to provide their own catalog, which can be particularly beneficial for
private datasets or if different configurations are needed.
Notes
-----
The dictionary approach allows more flexibility in the request. i.e.:
>>> query = {
... era5_land_reanalysis: {"variables": ["t2m", "tp"]},
... era5_single_levels_reanalysis: {"variables": "t2m"},
... }
Currently, accepted key, value pairs for a mapping argument include the following:
>>> {"variables": str | list[str]}
Examples
--------
Create data:
>>> sites = {
... "Montreal": (45.508888, -73.561668),
... "New York": (40.730610, -73.935242),
... "Miami": (25.761681, -80.191788),
... }
>>> query = {
... "datasets": "era5_land_reanalysis_dev",
... "space": {"clip": "point", "geometry": sites},
... "time": {
... "timestep": "D",
... "averaging": {"tp": np.nansum, "t2m": np.nanmean},
... "start": "1950-01-01",
... "end": "1955-12-31",
... "timezone": "America/Montreal",
... },
... }
>>> xds = xd.Query(**query)
>>> xds.data
<xarray.Dataset>
Dimensions: (site: 3, time: 2191, source: 1)
Coordinates:
latitude (site) float64 45.5 40.7 25.8
longitude (site) float64 -73.6 -73.9 -80.2
* site (site) <U8 'Montreal' 'New York' 'Miami'
* time (time) datetime64[ns] 1950-01-01 1950-01-02 ... 1955-12-31
* source (source) <U24 'era5_land_reanalysis_dev'
Data variables:
t2m_nanmean (time, site, source) float32 269.6 273.8 294.3 ... 268.1 292.1
tp_nansum (time, site, source) float32 0.0004192 2.792e-06 ... 0.0001207
Attributes:
pangeo-forge:inputs_hash: 1622c0abe9326bfa4d6ee6cdf817fccb1ef1661046f30f...
pangeo-forge:recipe_hash: f2b6c75f28693bbae820161d5b71ebdb9d740dcdde0666...
pangeo-forge:version: 0.9.4
"""
def __init__(
self,
datasets: str | list[str] | dict[str, str | list[str]],
space: dict[str, str | list[str]] | None = None,
time: dict[str, str | list[str]] | None = None,
catalog_path: str = url_path,
) -> None:
# We cache the catalog's yaml files for easier access behind corporate firewalls
if space is None:
space = dict()
if time is None:
time = dict()
catalog_path = cache_catalog(catalog_path)
self.catalog = intake.open_catalog(catalog_path)
self.datasets = datasets
self.space = self._resolve_space_params(**space)
self.time = self._resolve_time_params(**time)
self.load_query(datasets=self.datasets, space=self.space, time=self.time)
def _resolve_space_params(
self,
clip: str | None = None,
geometry: dict[str, tuple] | gpd.GeoDataFrame = None,
averaging: bool | None = False,
unique_id: str | None = None,
) -> dict:
"""
Resolve and validate user-provided space params.
Parameters
----------
clip : str
Which kind of clip operation to perform on geometry.
Possible values are one of "polygon", "point" or "bbox".
geometry : gdf.DataFrame, Dict[str, Tuple]
Geometry/geometries on which to perform spatial operations.
averaging : bool, optional
Whether to spatially average the arrays within a geometry or not.
unique_id : str, optional
A column name, if gdf.DataFrame is provided, to identify each unique geometry.
"""
space = locals()
space.pop("self")
# FIXME: Assert should not be in the library, only for testing
assert _validate_space_params(**space) # noqa: S101
if isinstance(geometry, gpd.GeoDataFrame):
geometry = geometry.reset_index(drop=True)
# We created a new dict based on user-provided parameters
# TODO : adapt all parameters before requesting any operations on datasets
args = {
"clip": clip,
"geometry": geometry,
"averaging": averaging,
"unique_id": unique_id,
}
return args
def _resolve_time_params(
self,
timestep: str | None = None,
aggregation: None | (dict[str, Callable[..., Any] | list[Callable[..., Any]]]) = None,
start: bool | None = None,
end: str | None = None,
timezone: str | None = None,
minimum_duration: str | None = None,
) -> dict:
"""
Resolve and validate user-provided time params.
Parameters
----------
timestep : str, optional
In which time step should the data be returned.
Possible values: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases.
aggregation : Dict[str, callable], optional
Mapping that associates a variable name with the aggregation function to be applied to it.
Function which can be called in the form `f(x, axis=axis, **kwargs)` to return the result of reducing an
np.ndarray over an integer valued axis. This parameter is required should the `timestep` argument be passed.
start : str, optional
Start date of the selected time period.
String format – can be year (“%Y”), year-month (“%Y-%m”) or year-month-day(“%Y-%m-%d”).
end : str, optional
End date of the selected time period.
String format – can be year (“%Y”), year-month (“%Y-%m”) or year-month-day(“%Y-%m-%d”).
timezone : str, optional
Timezone to be used for the returned datasets.
Possible values are listed here: https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568.
minimum_duration : str, optional
Minimum duration of a time series (id) in order to be kept.
Possible values: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases.
"""
space = locals()
space.pop("self")
# assert _validate_time_params(**space)
# We created a new dict based on user-provided parameters
# TODO : adapt all parameters before requesting any operations on datasets
args = {
"timestep": timestep,
"aggregation": aggregation,
"start": start,
"end": end,
"timezone": timezone,
"minimum_duration": minimum_duration,
}
return args
[docs]
def load_query(
self,
datasets: str | dict[str, str | list[str]],
space: dict[str, str | list[str]],
time,
):
# Get all datasets in query
if isinstance(datasets, str):
datasets_name = [datasets]
elif isinstance(datasets, dict):
datasets_name = list(datasets.keys())
else:
raise ValueError("Datasets should be a string, a list of strings, or a dictionary.")
# Load data for each dataset
dsets = []
for dataset_name in datasets_name:
# data = None # FIXME: This is never used.
kwargs = {}
try:
variables_name = self.datasets[dataset_name]["variables"]
if isinstance(variables_name, str):
variables_name = [variables_name]
except: # noqa: S110
variables_name = None
pass
try:
kwargs = {k: v for k, v in self.datasets[dataset_name].items() if k not in ["variables"]}
except: # noqa: S110
pass
ds_one = self._process_one_dataset(
dataset_name=dataset_name,
variables=variables_name,
space=space,
time=time,
**kwargs,
)
dsets.append(ds_one)
try:
# Try naively merging datasets into single dataset
ds = None
if isinstance(dsets[0], xr.Dataset):
# if more than one dataset, then we add source as a dimension
# so we can merge two or more datasets together
if len(dsets) > 1:
for idx, dset in enumerate(dsets):
for var in dset.keys():
dset[var] = dset[var].expand_dims("source", axis=-1)
dsets[idx] = dset
ds = xr.merge(dsets)
elif len(dsets) == 1:
ds = dsets[0]
except: # noqa: S110
warnings.warn("Couldn't merge datasets so we pass a list of datasets.", stacklevel=2)
# Look into passing a DataTree instead
ds = dsets
pass
self.data = ds
return self
def _process_one_dataset(self, dataset_name, variables, space, time, **kwargs):
data = None
if "data" in kwargs:
data = kwargs["data"]
if data is not None and isinstance(data, xr.Dataset):
dataset_category = "user-provided"
elif isinstance(dataset_name, str):
dataset_category = [
category for category in self.catalog._entries.keys() for name in self.catalog[category]._entries.keys() if name == dataset_name
][0]
if dataset_category in ["atmosphere"]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
ds = climate_request(dataset_name, variables, space, time, self.catalog)
elif dataset_category in ["hydrology"]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
ds = hydrometric_request(
dataset_name,
variables,
space,
time,
self.catalog,
**kwargs,
)
if dataset_category in ["geography"]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
ds = gis_request(
dataset_name,
variables,
space,
time,
self.catalog,
**kwargs,
)
elif dataset_category in ["user-provided"]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
ds = user_provided_dataset(dataset_name, variables, space, time, data)
return ds
[docs]
def bbox_clip(self, ds, variable="weights"):
return ds.where(~ds[variable].isnull(), drop=True)