Skip to content

feat: minimum and maximum since last step #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 96 additions & 38 deletions src/anemoi/datasets/create/functions/sources/accumulations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) Copyright 2024 Anemoi contributors.
# (C) Copyright 2024-2025 Anemoi contributors.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
Expand All @@ -17,8 +17,6 @@
from earthkit.data.core.temporary import temp_file
from earthkit.data.readers.grib.output import new_grib_output

from anemoi.datasets.create.utils import to_datetime_list

from .mars import mars

LOG = logging.getLogger(__name__)
Expand All @@ -32,7 +30,9 @@ def _member(field):
return number


class Accumulation:
class RangeParameter:
buggy_steps = False

def __init__(self, out, /, param, date, time, number, step, frequency, **kwargs):
self.out = out
self.param = param
Expand Down Expand Up @@ -85,10 +85,7 @@ def check(self, field):
def write(self, template):

assert self.startStep != self.endStep, (self.startStep, self.endStep)
if np.all(self.values < 0):
LOG.warning(
f"Negative values when computing accumutation for {self.param} ({self.date} {self.time}): min={np.amin(self.values)} max={np.amax(self.values)}"
)
self.check_results()

self.out.write(
self.values,
Expand All @@ -109,8 +106,7 @@ def add(self, field, values):
if step not in self.steps:
return

if not np.all(values >= 0):
warnings.warn(f"Negative values for {field}: {np.amin(values)} {np.amax(values)}")
self.check_values(field, values)

assert not self.done, (self.key, step)
assert step not in self.seen, (self.key, step)
Expand Down Expand Up @@ -154,6 +150,24 @@ def mars_date_time_steps(cls, dates, step1, step2, frequency, base_times, adjust
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.key})"

def check_values(self, field, values):
pass

def check_results(self):
pass


class Accumulation(RangeParameter):
def check_values(self, field, values):
if not np.all(values >= 0):
warnings.warn(f"Negative values for {field}: {np.amin(values)} {np.amax(values)}")

def check_results(self):
if np.all(self.values < 0):
LOG.warning(
f"Negative values when computing accumutation for {self.param} ({self.date} {self.time}): min={np.amin(self.values)} max={np.amax(self.values)}"
)


class AccumulationFromStart(Accumulation):
buggy_steps = True
Expand Down Expand Up @@ -201,7 +215,6 @@ def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency):


class AccumulationFromLastStep(Accumulation):
buggy_steps = False

def compute(self, values, startStep, endStep):

Expand Down Expand Up @@ -241,11 +254,66 @@ def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency):
)


class Operation(RangeParameter):

def compute(self, values, startStep, endStep):

assert endStep - startStep == self.frequency, (
startStep,
endStep,
self.frequency,
)

if self.startStep is None:
self.startStep = startStep
else:
self.startStep = min(self.startStep, startStep)

if self.endStep is None:
self.endStep = endStep
else:
self.endStep = max(self.endStep, endStep)

if self.values is None:
self.values = values.copy()

self.values = self.operation(self.values, values)

@classmethod
def _mars_date_time_step(cls, base_date, step1, step2, add_step, frequency):
assert frequency > 0, frequency
# assert step1 > 0, (step1, step2, frequency, add_step, base_date)

steps = []
for step in range(step1 + frequency, step2 + frequency, frequency):
steps.append(step + add_step)
return (
base_date.year * 10000 + base_date.month * 100 + base_date.day,
base_date.hour * 100 + base_date.minute,
tuple(steps),
)


class Maximum(Operation):
operation = np.maximum


class Mininmum(Operation):
operation = np.minimum


def _identity(x):
return x


def _compute_accumulations(
OPERATIONS = {
"accumulation": Accumulation,
"minimum": Mininmum,
"maximum": Maximum,
}


def _compute_range_parameter(
context,
dates,
request,
Expand All @@ -254,6 +322,7 @@ def _compute_accumulations(
patch=_identity,
base_times=None,
use_cdsapi_dataset=None,
operation="accumulation",
):
adjust_step = isinstance(user_accumulation_period, int)

Expand All @@ -269,9 +338,14 @@ def _compute_accumulations(

base_times = [t // 100 if t > 100 else t for t in base_times]

AccumulationClass = AccumulationFromStart if data_accumulation_period in (0, None) else AccumulationFromLastStep
if operation == "accumulation":
RangeParameterClass = (
AccumulationFromStart if data_accumulation_period in (0, None) else AccumulationFromLastStep
)
else:
RangeParameterClass = OPERATIONS[operation]

mars_date_time_steps = AccumulationClass.mars_date_time_steps(
mars_date_time_steps = RangeParameterClass.mars_date_time_steps(
dates,
step1,
step2,
Expand Down Expand Up @@ -317,7 +391,7 @@ def _compute_accumulations(
)

accumulations = {}
for a in [AccumulationClass(out, frequency=frequency, **r) for r in requests]:
for a in [RangeParameterClass(out, frequency=frequency, **r) for r in requests]:
for s in a.steps:
key = (a.param, a.date, a.time, s, a.number)
accumulations.setdefault(key, []).append(a)
Expand Down Expand Up @@ -369,7 +443,7 @@ def _scda(request):
return request


def accumulations(context, dates, use_cdsapi_dataset=None, **request):
def range_parameter(context, dates, operation, use_cdsapi_dataset=None, **request):
_to_list(request["param"])
class_ = request.get("class", "od")
stream = request.get("stream", "oper")
Expand All @@ -391,37 +465,21 @@ def accumulations(context, dates, use_cdsapi_dataset=None, **request):

kwargs = KWARGS.get((class_, stream), {})

context.trace("🌧️", f"accumulations {request} {user_accumulation_period} {kwargs}")
context.trace("🌧️", f"{operation} {request} {user_accumulation_period} {kwargs}")

return _compute_accumulations(
return _compute_range_parameter(
context,
dates,
request,
operation=operation,
user_accumulation_period=user_accumulation_period,
use_cdsapi_dataset=use_cdsapi_dataset,
**kwargs,
)


execute = accumulations
def accumulations(context, dates, use_cdsapi_dataset=None, **request):
return range_parameter(context, dates, "accumulation", use_cdsapi_dataset=use_cdsapi_dataset, **request)

if __name__ == "__main__":
import yaml

config = yaml.safe_load(
"""
class: ea
expver: '0001'
grid: 20./20.
levtype: sfc
# number: [0, 1]
# stream: enda
param: [cp, tp]
# accumulation_period: 6h
"""
)
dates = yaml.safe_load("[2022-12-30 18:00, 2022-12-31 00:00, 2022-12-31 06:00, 2022-12-31 12:00]")
dates = to_datetime_list(dates)

for f in accumulations(None, dates, **config):
print(f, f.to_numpy().mean())
execute = accumulations
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# (C) Copyright 2025 Anemoi contributors.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
#
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import logging

from .accumulations import range_parameter

LOG = logging.getLogger(__name__)


def execute(context, dates, use_cdsapi_dataset=None, **request):
return range_parameter(context, dates, "maximum", use_cdsapi_dataset=use_cdsapi_dataset, **request)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# (C) Copyright 2025 Anemoi contributors.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
#
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import logging

from .accumulations import range_parameter

LOG = logging.getLogger(__name__)


def execute(context, dates, use_cdsapi_dataset=None, **request):
return range_parameter(context, dates, "minimum", use_cdsapi_dataset=use_cdsapi_dataset, **request)
Loading