diff --git a/src/anemoi/datasets/create/functions/sources/accumulations.py b/src/anemoi/datasets/create/functions/sources/accumulations.py index 46a9e163c..fd87798ce 100644 --- a/src/anemoi/datasets/create/functions/sources/accumulations.py +++ b/src/anemoi/datasets/create/functions/sources/accumulations.py @@ -1,4 +1,4 @@ -# (C) Copyright 2024 Anemoi contributors. +# (C) Copyright 2024-2025 Anemoi contributors. # # This software is licensed under the terms of the Apache Licence Version 2.0 # which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. @@ -17,8 +17,6 @@ from earthkit.data.core.temporary import temp_file from earthkit.data.readers.grib.output import new_grib_output -from anemoi.datasets.create.utils import to_datetime_list - from .mars import mars LOG = logging.getLogger(__name__) @@ -32,7 +30,9 @@ def _member(field): return number -class Accumulation: +class RangeParameter: + buggy_steps = False + def __init__(self, out, /, param, date, time, number, step, frequency, **kwargs): self.out = out self.param = param @@ -85,10 +85,7 @@ def check(self, field): def write(self, template): assert self.startStep != self.endStep, (self.startStep, self.endStep) - if np.all(self.values < 0): - LOG.warning( - f"Negative values when computing accumutation for {self.param} ({self.date} {self.time}): min={np.amin(self.values)} max={np.amax(self.values)}" - ) + self.check_results() self.out.write( self.values, @@ -109,8 +106,7 @@ def add(self, field, values): if step not in self.steps: return - if not np.all(values >= 0): - warnings.warn(f"Negative values for {field}: {np.amin(values)} {np.amax(values)}") + self.check_values(field, values) assert not self.done, (self.key, step) assert step not in self.seen, (self.key, step) @@ -154,6 +150,24 @@ def mars_date_time_steps(cls, dates, step1, step2, frequency, base_times, adjust def __repr__(self) -> str: return f"{self.__class__.__name__}({self.key})" + def check_values(self, field, values): + pass + + def check_results(self): + pass + + +class Accumulation(RangeParameter): + def check_values(self, field, values): + if not np.all(values >= 0): + warnings.warn(f"Negative values for {field}: {np.amin(values)} {np.amax(values)}") + + def check_results(self): + if np.all(self.values < 0): + LOG.warning( + f"Negative values when computing accumutation for {self.param} ({self.date} {self.time}): min={np.amin(self.values)} max={np.amax(self.values)}" + ) + class AccumulationFromStart(Accumulation): buggy_steps = True @@ -201,7 +215,6 @@ def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency): class AccumulationFromLastStep(Accumulation): - buggy_steps = False def compute(self, values, startStep, endStep): @@ -241,11 +254,66 @@ def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency): ) +class Operation(RangeParameter): + + def compute(self, values, startStep, endStep): + + assert endStep - startStep == self.frequency, ( + startStep, + endStep, + self.frequency, + ) + + if self.startStep is None: + self.startStep = startStep + else: + self.startStep = min(self.startStep, startStep) + + if self.endStep is None: + self.endStep = endStep + else: + self.endStep = max(self.endStep, endStep) + + if self.values is None: + self.values = values.copy() + + self.values = self.operation(self.values, values) + + @classmethod + def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency): + assert frequency > 0, frequency + # assert step1 > 0, (step1, step2, frequency, add_step, base_date) + + steps = [] + for step in range(step1 + frequency, step2 + frequency, frequency): + steps.append(step + add_step) + return ( + base_date.year * 10000 + base_date.month * 100 + base_date.day, + base_date.hour * 100 + base_date.minute, + tuple(steps), + ) + + +class Maximum(Operation): + operation = np.maximum + + +class Mininmum(Operation): + operation = np.minimum + + def _identity(x): return x -def _compute_accumulations( +OPERATIONS = { + "accumulation": Accumulation, + "minimum": Mininmum, + "maximum": Maximum, +} + + +def _compute_range_parameter( context, dates, request, @@ -254,6 +322,7 @@ def _compute_accumulations( patch=_identity, base_times=None, use_cdsapi_dataset=None, + operation="accumulation", ): adjust_step = isinstance(user_accumulation_period, int) @@ -269,9 +338,14 @@ def _compute_accumulations( base_times = [t // 100 if t > 100 else t for t in base_times] - AccumulationClass = AccumulationFromStart if data_accumulation_period in (0, None) else AccumulationFromLastStep + if operation == "accumulation": + RangeParameterClass = ( + AccumulationFromStart if data_accumulation_period in (0, None) else AccumulationFromLastStep + ) + else: + RangeParameterClass = OPERATIONS[operation] - mars_date_time_steps = AccumulationClass.mars_date_time_steps( + mars_date_time_steps = RangeParameterClass.mars_date_time_steps( dates, step1, step2, @@ -317,7 +391,7 @@ def _compute_accumulations( ) accumulations = {} - for a in [AccumulationClass(out, frequency=frequency, **r) for r in requests]: + for a in [RangeParameterClass(out, frequency=frequency, **r) for r in requests]: for s in a.steps: key = (a.param, a.date, a.time, s, a.number) accumulations.setdefault(key, []).append(a) @@ -369,7 +443,7 @@ def _scda(request): return request -def accumulations(context, dates, use_cdsapi_dataset=None, **request): +def range_parameter(context, dates, operation, use_cdsapi_dataset=None, **request): _to_list(request["param"]) class_ = request.get("class", "od") stream = request.get("stream", "oper") @@ -391,37 +465,21 @@ def accumulations(context, dates, use_cdsapi_dataset=None, **request): kwargs = KWARGS.get((class_, stream), {}) - context.trace("🌧️", f"accumulations {request} {user_accumulation_period} {kwargs}") + context.trace("🌧️", f"{operation} {request} {user_accumulation_period} {kwargs}") - return _compute_accumulations( + return _compute_range_parameter( context, dates, request, + operation=operation, user_accumulation_period=user_accumulation_period, use_cdsapi_dataset=use_cdsapi_dataset, **kwargs, ) -execute = accumulations +def accumulations(context, dates, use_cdsapi_dataset=None, **request): + return range_parameter(context, dates, "accumulation", use_cdsapi_dataset=use_cdsapi_dataset, **request) -if __name__ == "__main__": - import yaml - - config = yaml.safe_load( - """ - class: ea - expver: '0001' - grid: 20./20. - levtype: sfc -# number: [0, 1] -# stream: enda - param: [cp, tp] -# accumulation_period: 6h - """ - ) - dates = yaml.safe_load("[2022-12-30 18:00, 2022-12-31 00:00, 2022-12-31 06:00, 2022-12-31 12:00]") - dates = to_datetime_list(dates) - for f in accumulations(None, dates, **config): - print(f, f.to_numpy().mean()) +execute = accumulations diff --git a/src/anemoi/datasets/create/functions/sources/maximum_since_previous_step.py b/src/anemoi/datasets/create/functions/sources/maximum_since_previous_step.py new file mode 100644 index 000000000..14834dbf7 --- /dev/null +++ b/src/anemoi/datasets/create/functions/sources/maximum_since_previous_step.py @@ -0,0 +1,18 @@ +# (C) Copyright 2025 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import logging + +from .accumulations import range_parameter + +LOG = logging.getLogger(__name__) + + +def execute(context, dates, use_cdsapi_dataset=None, **request): + return range_parameter(context, dates, "maximum", use_cdsapi_dataset=use_cdsapi_dataset, **request) diff --git a/src/anemoi/datasets/create/functions/sources/minimum_since_previous_step.py b/src/anemoi/datasets/create/functions/sources/minimum_since_previous_step.py new file mode 100644 index 000000000..0b1f99554 --- /dev/null +++ b/src/anemoi/datasets/create/functions/sources/minimum_since_previous_step.py @@ -0,0 +1,18 @@ +# (C) Copyright 2025 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import logging + +from .accumulations import range_parameter + +LOG = logging.getLogger(__name__) + + +def execute(context, dates, use_cdsapi_dataset=None, **request): + return range_parameter(context, dates, "minimum", use_cdsapi_dataset=use_cdsapi_dataset, **request)