import fnmatch
import itertools
import warnings
import pandas as pd
import xarray as xr
from clisops.core.subset import subset_time
from .spatial import clip_by_bbox, clip_by_point, clip_by_polygon
from .temporal import (
ajust_dates,
change_timezone,
minimum_duration,
temporal_aggregation,
)
from .utils import open_dataset
[docs]
def climate_request(dataset_name, variables, space, time, catalog):
ds = open_dataset(dataset_name, catalog)
rename_dict = {
# dim labels (order represents the priority when checking for the dim labels)
"longitude": ["lon", "long", "x"],
"latitude": ["lat", "y"],
}
dims = list(ds.dims)
for key, values in rename_dict.items():
for value in values:
if value in dims:
ds = ds.rename({value: key})
try:
ds = ds[variables]
except: # noqa: S110
pass
# Adjust timezone and then slice time dimension before moving on with spatiotemporal operations
if time["timezone"] is not None:
try:
# Assume UTC for now, will change when metadata database in up and running
ds = change_timezone(ds, "UTC", time["timezone"])
except: # noqa: S110
pass # replace by error
if time["start"] is not None or time["end"] is not None:
try:
start_time = time["start"] if "start" in time else None
end_time = time["end"] if "end" in time else None
ds = subset_time(ds, start_date=start_time, end_date=end_time)
except: # noqa: S110
pass # replace by error
# Spatial operations
if space["clip"] == "polygon":
spatial_agg = "polygon"
ds = clip_by_polygon(ds, space, dataset_name).load()
elif space["clip"] == "point":
spatial_agg = "point"
ds = clip_by_point(ds, space, dataset_name).load()
elif space["clip"] == "bbox":
spatial_agg = "polygon"
ds = clip_by_bbox(ds, space, dataset_name).load()
if time["timestep"] is not None and time["aggregation"] is not None:
ds = temporal_aggregation(ds, time, dataset_name, spatial_agg)
# Add source name to dataset
# np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)
ds = ds.assign_coords(source=("source", [dataset_name]))
# This is painfully slow if the dataset is large enough and will be commented for now
# for var in ds.keys():
# ds[var] = ds[var].expand_dims("source", axis=-1)
return ds
[docs]
def gis_request(dataset_name, variables, space, time, catalog, **kwargs):
# This parameter should be set in the catalog
unique_id = catalog["geography"][dataset_name].metadata["unique_id"]
if any(kwargs):
ds = hydrometric_request(
dataset_name.replace("_polygons", ""),
variables,
space,
time,
catalog,
**kwargs,
)
filters = {"filters": [(unique_id, "in", tuple(ds.id.values.tolist()))]}
else:
filters = None
warnings.warn("No filters fetches the complete dataset (few minutes). Use filters to expedite data retrieval.", stacklevel=2)
gdf = catalog["geography"][dataset_name](geopandas_kwargs=filters).read().infer_objects().set_index(unique_id)
gdf.index = gdf.index.astype("str")
return gdf
[docs]
def hydrometric_request(dataset_name, variables, space, time, catalog, **kwargs):
ds = open_dataset(dataset_name, catalog)
try:
ds = ds.sel(variable=variables)
ds = ds[variables]
except: # noqa: S110
pass
for key, value in kwargs.items():
# FIXME: PERF203 `try`-`except` within a loop incurs performance overhead
try:
if isinstance(value, str):
value = [value]
# If user provided a wildcard to match a pattern for a specific dimension
if any("*" in pattern or "?" in pattern for pattern in value):
value = list(
itertools.chain.from_iterable(
[fnmatch.filter(ds[key].data, val) for val in value],
),
)
ds = ds.where(ds[key].isin(value).load(), drop=True)
except: # noqa: S110 PERF203
# Add warning
pass
# TODO: to implement this feature, We will need the timezone as a coords for each id
# # Adjust timezone and then slice time dimension before moving on with spatiotemporal operations
# if time["timezone"] != None:
# try:
# # Assume UTC for now, will change when metadata database in up and running
# ds = change_timezone(ds, 'UTC', time['timezone'])
# except:
# pass # replace by error
if time["start"] is not None or time["end"] is not None:
try:
start_time = time["start"] if "start" in time else None
end_time = time["end"] if "end" in time else None
ds = subset_time(ds, start_date=start_time, end_date=end_time)
except: # noqa: S110
pass # replace by error
# # Spatial operations
# TODO : Find all stations within a gdf's mask
# if space['clip'] == 'polygon':
# ds = clip_by_polygon(ds, space, dataset_name).load()
# TODO : find the closest station to each point(s)
# elif space['clip'] == 'point':
# ds = clip_by_point(ds, space, dataset_name).load()
# TODO : Convert gdf's mask to bbox, find all stations within that bbox and apply function below
# elif space['clip'] == 'bbox':
# ds = clip_by_bbox(ds, space, dataset_name).load()
# Load all coordinates
[ds[c].load() for c in ds.coords]
if time["start"] is not None or time["end"] is not None:
ds = ajust_dates(ds, time)
if time["minimum_duration"] is not None:
ds = minimum_duration(ds, time)
if time["timestep"] is not None and time["aggregation"] is not None:
if pd.Timedelta(1, unit=time["timestep"]) > pd.Timedelta(
1,
unit=xr.infer_freq(ds.time),
):
spatial_agg = "polygon"
ds = temporal_aggregation(ds, time, dataset_name, spatial_agg)
# Remove all dimension values that are not required anymore after previous filetring
# This returns a cleaner dataset at the cost of a compute
# _to_stack = []
# for dim in ds.dims:
# if len(ds[dim]) >1:
# _to_stack.append(dim)
# print('stack')
# ds = ds.stack(stacked=_to_stack)
# Drop the pixels that only have NA values.
# print('dropna')
# ds = ds.dropna("stacked", how="all")
# print('unstack')
# ds = ds.unstack('stacked')
return ds
[docs]
def user_provided_dataset(dataset_name, variables, space, time, ds):
try:
ds = ds[variables]
except: # noqa: S110
pass
# TODO: to implement this feature, We will need the timezone as a coords for each id
# # Adjust timezone and then slice time dimension before moving on with spatiotemporal operations
# if time["timezone"] != None:
# try:
# # Assume UTC for now, will change when metadata database in up and running
# ds = change_timezone(ds, 'UTC', time['timezone'])
# except:
# pass # replace by error
if time["start"] is not None or time["end"] is not None:
try:
start_time = time["start"] if "start" in time else None
end_time = time["end"] if "end" in time else None
ds = subset_time(ds, start_date=start_time, end_date=end_time)
except: # noqa: S110
pass # replace by error
# Spatial operations
if space["clip"] == "polygon":
spatial_agg = "polygon"
ds = clip_by_polygon(ds, space, dataset_name).load()
elif space["clip"] == "point":
spatial_agg = "point"
ds = clip_by_point(ds, space, dataset_name).load()
elif space["clip"] == "bbox":
spatial_agg = "polygon"
ds = clip_by_bbox(ds, space, dataset_name).load()
if time["timestep"] is not None and time["aggregation"] is not None:
if pd.Timedelta(1, unit=time["timestep"]) > pd.Timedelta(
1,
unit=xr.infer_freq(ds.time),
):
ds = temporal_aggregation(ds, time, dataset_name, spatial_agg)
# Add source name to dataset
# np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)
ds = ds.assign_coords(source=("source", [dataset_name]))
# for var in ds.keys():
# ds[var] = ds[var].expand_dims("source", axis=-1)
return ds