Skip to content

Commit 8371f18

Browse files
committed
feat: zarr3
1 parent 2983cc3 commit 8371f18

File tree

11 files changed

+370
-152
lines changed

11 files changed

+370
-152
lines changed

src/anemoi/datasets/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from typing import List
1111

12+
from .add_zarr_support import ZarrSupport
1213
from .data import MissingDateError
1314
from .data import add_dataset_path
1415
from .data import add_named_dataset
@@ -30,4 +31,5 @@
3031
"MissingDateError",
3132
"open_dataset",
3233
"__version__",
34+
"ZarrSupport",
3335
]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# (C) Copyright 2025 Anemoi contributors.
2+
#
3+
# This software is licensed under the terms of the Apache Licence Version 2.0
4+
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5+
#
6+
# In applying this licence, ECMWF does not waive the privileges and immunities
7+
# granted to it by virtue of its status as an intergovernmental organisation
8+
# nor does it submit to any jurisdiction.
9+
import logging
10+
11+
import zarr
12+
13+
LOG = logging.getLogger(__name__)
14+
15+
16+
class ZarrSupportV2:
17+
@classmethod
18+
def base_store(cls):
19+
return zarr.storage.BaseStore
20+
21+
@classmethod
22+
def is_zarr_group(cls, obj):
23+
return isinstance(obj, zarr.hierarchy.Group)
24+
25+
@classmethod
26+
def create_array(cls, zarr_root, *args, **kwargs):
27+
return zarr_root.create_dataset(*args, **kwargs)
28+
29+
@classmethod
30+
def change_dtype_datetime64(cls, dtype):
31+
return dtype
32+
33+
@classmethod
34+
def cast_dtype_datetime64(cls, array, dtype):
35+
return array, dtype
36+
37+
@classmethod
38+
def get_not_found_exception(cls):
39+
return zarr.errors.PathNotFoundError
40+
41+
@classmethod
42+
def zarr_open_mode_append(cls):
43+
return "w+"
44+
45+
@classmethod
46+
def zarr_open_to_patch_in_tests(cls):
47+
return "zarr.convenience.open"
48+
49+
@classmethod
50+
def get_read_only_store_class(cls):
51+
class ReadOnlyStore(zarr.storage.BaseStore):
52+
"""A base class for read-only stores."""
53+
54+
def __delitem__(self, key: str) -> None:
55+
"""Prevent deletion of items."""
56+
raise NotImplementedError()
57+
58+
def __setitem__(self, key: str, value: bytes) -> None:
59+
"""Prevent setting of items."""
60+
raise NotImplementedError()
61+
62+
def __len__(self) -> int:
63+
"""Return the number of items in the store."""
64+
raise NotImplementedError()
65+
66+
def __iter__(self) -> iter:
67+
"""Return an iterator over the store."""
68+
raise NotImplementedError()
69+
70+
return ReadOnlyStore
71+
72+
73+
class ZarrSupportV3:
74+
@classmethod
75+
def base_store(cls):
76+
return zarr.abc.store.Store
77+
78+
@classmethod
79+
def is_zarr_group(cls, obj):
80+
return isinstance(obj, zarr.Group)
81+
82+
@classmethod
83+
def create_array(cls, zarr_root, *args, **kwargs):
84+
return zarr_root.create_array(*args, **kwargs)
85+
86+
@classmethod
87+
def get_not_found_exception(cls):
88+
return FileNotFoundError
89+
90+
@classmethod
91+
def zarr_open_mode_append(cls):
92+
return "a"
93+
94+
@classmethod
95+
def change_dtype_datetime64(cls, dtype):
96+
# remove this flag (and the relevant code) when Zarr 3 supports datetime64
97+
# https://github.com/zarr-developers/zarr-python/issues/2616
98+
import numpy as np
99+
100+
if dtype == "datetime64[s]":
101+
dtype = np.dtype("int64")
102+
return dtype
103+
104+
@classmethod
105+
def cast_dtype_datetime64(cls, array, dtype):
106+
# remove this flag (and the relevant code) when Zarr 3 supports datetime64
107+
# https://github.com/zarr-developers/zarr-python/issues/2616
108+
import numpy as np
109+
110+
if dtype == np.dtype("datetime64[s]"):
111+
dtype = "int64"
112+
array = array.astype(dtype)
113+
114+
return array, dtype
115+
116+
@classmethod
117+
def zarr_open_to_patch_in_tests(cls):
118+
return "zarr.open"
119+
120+
@classmethod
121+
def get_read_only_store_class(cls):
122+
raise NotImplementedError("TODO")
123+
124+
125+
if zarr.__version__.startswith("3"):
126+
ZarrSupport = ZarrSupportV3
127+
else:
128+
LOG.warning("Using Zarr 2 : only zarr datasets build with zarr 2 are supported")
129+
ZarrSupport = ZarrSupportV2

src/anemoi/datasets/commands/copy.py

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from anemoi.utils.remote import Transfer
2121
from anemoi.utils.remote import TransferMethodNotImplementedError
2222

23+
from anemoi.datasets import ZarrSupport
24+
2325
from . import Command
2426

2527
LOG = logging.getLogger(__name__)
@@ -49,8 +51,6 @@ class ZarrCopier:
4951
Flag to resume copying an existing dataset.
5052
verbosity : int
5153
Verbosity level of logging.
52-
nested : bool
53-
Flag to use ZARR's nested directory backend.
5454
rechunk : str
5555
Rechunk size for the target data array.
5656
"""
@@ -64,7 +64,6 @@ def __init__(
6464
overwrite: bool,
6565
resume: bool,
6666
verbosity: int,
67-
nested: bool,
6867
rechunk: str,
6968
**kwargs: Any,
7069
) -> None:
@@ -86,8 +85,6 @@ def __init__(
8685
Flag to resume copying an existing dataset.
8786
verbosity : int
8887
Verbosity level of logging.
89-
nested : bool
90-
Flag to use ZARR's nested directory backend.
9188
rechunk : str
9289
Rechunk size for the target data array.
9390
**kwargs : Any
@@ -100,7 +97,6 @@ def __init__(
10097
self.overwrite = overwrite
10198
self.resume = resume
10299
self.verbosity = verbosity
103-
self.nested = nested
104100
self.rechunk = rechunk
105101

106102
self.rechunking = rechunk.split(",") if rechunk else []
@@ -113,27 +109,6 @@ def __init__(
113109
raise NotImplementedError("Rechunking with SSH not implemented.")
114110
assert NotImplementedError("SSH not implemented.")
115111

116-
def _store(self, path: str, nested: bool = False) -> Any:
117-
"""Get the storage path.
118-
119-
Parameters
120-
----------
121-
path : str
122-
Path to the storage.
123-
nested : bool, optional
124-
Flag to use nested directory storage.
125-
126-
Returns
127-
-------
128-
Any
129-
Storage path.
130-
"""
131-
if nested:
132-
import zarr
133-
134-
return zarr.storage.NestedDirectoryStore(path)
135-
return path
136-
137112
def copy_chunk(self, n: int, m: int, source: Any, target: Any, _copy: Any, verbosity: int) -> Optional[slice]:
138113
"""Copy a chunk of data from source to target.
139114
@@ -237,7 +212,8 @@ def copy_data(self, source: Any, target: Any, _copy: Any, verbosity: int) -> Non
237212
target_data = (
238213
target["data"]
239214
if "data" in target
240-
else target.create_dataset(
215+
else ZarrSupport.create_array(
216+
target,
241217
"data",
242218
shape=source_data.shape,
243219
chunks=self.data_chunks,
@@ -317,13 +293,12 @@ def copy_group(self, source: Any, target: Any, _copy: Any, verbosity: int) -> No
317293
verbosity : int
318294
Verbosity level of logging.
319295
"""
320-
import zarr
321296

322297
for k, v in source.attrs.items():
323298
target.attrs[k] = v
324299

325300
for name in sorted(source.keys()):
326-
if isinstance(source[name], zarr.hierarchy.Group):
301+
if ZarrSupport.is_zarr_group(source[name]):
327302
group = target[name] if name in target else target.create_group(name)
328303
self.copy_group(
329304
source[name],
@@ -376,13 +351,13 @@ def run(self) -> None:
376351

377352
def target_exists() -> bool:
378353
try:
379-
zarr.open(self._store(self.target), mode="r")
354+
zarr.open(self.target, mode="r")
380355
return True
381356
except ValueError:
382357
return False
383358

384359
def target_finished() -> bool:
385-
target = zarr.open(self._store(self.target), mode="r")
360+
target = zarr.open(self.target, mode="r")
386361
if "_copy" in target:
387362
done = sum(1 if x else 0 for x in target["_copy"])
388363
todo = len(target["_copy"])
@@ -400,19 +375,19 @@ def target_finished() -> bool:
400375
def open_target() -> Any:
401376

402377
if not target_exists():
403-
return zarr.open(self._store(self.target, self.nested), mode="w")
378+
return zarr.open(self.target, mode="w")
404379

405380
if self.overwrite:
406381
LOG.error("Target already exists, overwriting.")
407-
return zarr.open(self._store(self.target, self.nested), mode="w")
382+
return zarr.open(self.target, mode="w")
408383

409384
if self.resume:
410385
if target_finished():
411386
LOG.error("Target already exists and is finished.")
412387
sys.exit(0)
413388

414389
LOG.error("Target already exists, resuming copy.")
415-
return zarr.open(self._store(self.target, self.nested), mode="w+")
390+
return zarr.open(self.target, mode=ZarrSupport.zarr_open_mode_append())
416391

417392
LOG.error("Target already exists, use either --overwrite or --resume.")
418393
sys.exit(1)
@@ -421,7 +396,7 @@ def open_target() -> Any:
421396

422397
assert target is not None, target
423398

424-
source = zarr.open(self._store(self.source), mode="r")
399+
source = zarr.open(self.source, mode="r")
425400
self.copy(source, target, self.verbosity)
426401

427402

@@ -455,7 +430,6 @@ def add_arguments(self, command_parser: Any) -> None:
455430
help="Verbosity level. 0 is silent, 1 is normal, 2 is verbose.",
456431
default=1,
457432
)
458-
command_parser.add_argument("--nested", action="store_true", help="Use ZARR's nested directpry backend.")
459433
command_parser.add_argument(
460434
"--rechunk", help="Rechunk the target data array. Rechunk size should be a diviser of the block size."
461435
)

src/anemoi/datasets/create/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from earthkit.data.core.order import build_remapping
3333

3434
from anemoi.datasets import MissingDateError
35+
from anemoi.datasets import ZarrSupport
3536
from anemoi.datasets import open_dataset
3637
from anemoi.datasets.create.input.trace import enable_trace
3738
from anemoi.datasets.create.persistent import build_storage
@@ -154,7 +155,7 @@ def _path_readable(path: str) -> bool:
154155
try:
155156
zarr.open(path, "r")
156157
return True
157-
except zarr.errors.PathNotFoundError:
158+
except ZarrSupport.get_not_found_exception():
158159
return False
159160

160161

@@ -208,7 +209,7 @@ def update_metadata(self, **kwargs: Any) -> None:
208209
import zarr
209210

210211
LOG.debug(f"Updating metadata {kwargs}")
211-
z = zarr.open(self.path, mode="w+")
212+
z = zarr.open(self.path, mode=ZarrSupport.zarr_open_mode_append())
212213
for k, v in kwargs.items():
213214
if isinstance(v, np.datetime64):
214215
v = v.astype(datetime.datetime)
@@ -1520,7 +1521,7 @@ def run(self) -> None:
15201521

15211522
LOG.info(stats)
15221523

1523-
if not all(self.registry.get_flags(sync=False)):
1524+
if not all(self.registry.get_flags()):
15241525
raise Exception(f"❗Zarr {self.path} is not fully built, not writing statistics into dataset.")
15251526

15261527
for k in ["mean", "stdev", "minimum", "maximum", "sums", "squares", "count", "has_nans"]:

src/anemoi/datasets/create/patch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import zarr
1616

17+
from anemoi.datasets import ZarrSupport
18+
1719
LOG = logging.getLogger(__name__)
1820

1921

@@ -134,7 +136,7 @@ def apply_patch(path: str, verbose: bool = True, dry_run: bool = False) -> None:
134136

135137
try:
136138
attrs = zarr.open(path, mode="r").attrs.asdict()
137-
except zarr.errors.PathNotFoundError as e:
139+
except ZarrSupport.get_not_found_exception() as e:
138140
LOG.error(f"Failed to open {path}")
139141
LOG.error(e)
140142
exit(0)

0 commit comments

Comments
 (0)