Skip to content

feat: zarr3 #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dependencies = [
"pyyaml",
"semantic-version",
"tqdm",
"zarr<=2.18.4",
"zarr",
]

optional-dependencies.all = [
Expand Down Expand Up @@ -94,6 +94,7 @@ optional-dependencies.docs = [

optional-dependencies.remote = [
"boto3",
"obstore",
"requests",
]

Expand Down
45 changes: 9 additions & 36 deletions src/anemoi/datasets/commands/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from anemoi.utils.remote import TransferMethodNotImplementedError

from anemoi.datasets.check import check_zarr
from anemoi.datasets.zarr_versions import zarr_2_or_3

from . import Command

Expand Down Expand Up @@ -51,8 +52,6 @@ class ZarrCopier:
Flag to resume copying an existing dataset.
verbosity : int
Verbosity level of logging.
nested : bool
Flag to use ZARR's nested directory backend.
rechunk : str
Rechunk size for the target data array.
"""
Expand All @@ -66,7 +65,6 @@ def __init__(
overwrite: bool,
resume: bool,
verbosity: int,
nested: bool,
rechunk: str,
**kwargs: Any,
) -> None:
Expand All @@ -88,8 +86,6 @@ def __init__(
Flag to resume copying an existing dataset.
verbosity : int
Verbosity level of logging.
nested : bool
Flag to use ZARR's nested directory backend.
rechunk : str
Rechunk size for the target data array.
**kwargs : Any
Expand All @@ -102,7 +98,6 @@ def __init__(
self.overwrite = overwrite
self.resume = resume
self.verbosity = verbosity
self.nested = nested
self.rechunk = rechunk

self.rechunking = rechunk.split(",") if rechunk else []
Expand All @@ -115,27 +110,6 @@ def __init__(
raise NotImplementedError("Rechunking with SSH not implemented.")
assert NotImplementedError("SSH not implemented.")

def _store(self, path: str, nested: bool = False) -> Any:
"""Get the storage path.

Parameters
----------
path : str
Path to the storage.
nested : bool, optional
Flag to use nested directory storage.

Returns
-------
Any
Storage path.
"""
if nested:
import zarr

return zarr.storage.NestedDirectoryStore(path)
return path

def copy_chunk(self, n: int, m: int, source: Any, target: Any, _copy: Any, verbosity: int) -> Optional[slice]:
"""Copy a chunk of data from source to target.

Expand Down Expand Up @@ -239,7 +213,8 @@ def copy_data(self, source: Any, target: Any, _copy: Any, verbosity: int) -> Non
target_data = (
target["data"]
if "data" in target
else target.create_dataset(
else zarr_2_or_3.create_array(
target,
"data",
shape=source_data.shape,
chunks=self.data_chunks,
Expand Down Expand Up @@ -319,7 +294,6 @@ def copy_group(self, source: Any, target: Any, _copy: Any, verbosity: int) -> No
verbosity : int
Verbosity level of logging.
"""
import zarr

if self.verbosity > 0:
LOG.info(f"Copying group {source} to {target}")
Expand All @@ -345,7 +319,7 @@ def copy_group(self, source: Any, target: Any, _copy: Any, verbosity: int) -> No
LOG.info(f"Skipping {name}")
continue

if isinstance(source[name], zarr.hierarchy.Group):
if zarr_2_or_3.is_zarr_group(source[name]):
group = target[name] if name in target else target.create_group(name)
self.copy_group(
source[name],
Expand Down Expand Up @@ -403,13 +377,13 @@ def run(self) -> None:

def target_exists() -> bool:
try:
zarr.open(self._store(self.target), mode="r")
zarr.open(self.target, mode="r")
return True
except ValueError:
return False

def target_finished() -> bool:
target = zarr.open(self._store(self.target), mode="r")
target = zarr.open(self.target, mode="r")
if "_copy" in target:
done = sum(1 if x else 0 for x in target["_copy"])
todo = len(target["_copy"])
Expand All @@ -427,19 +401,19 @@ def target_finished() -> bool:
def open_target() -> Any:

if not target_exists():
return zarr.open(self._store(self.target, self.nested), mode="w")
return zarr.open(self.target, mode="w")

if self.overwrite:
LOG.error("Target already exists, overwriting.")
return zarr.open(self._store(self.target, self.nested), mode="w")
return zarr.open(self.target, mode="w")

if self.resume:
if target_finished():
LOG.error("Target already exists and is finished.")
sys.exit(0)

LOG.error("Target already exists, resuming copy.")
return zarr.open(self._store(self.target, self.nested), mode="w+")
return zarr.open(self.target, mode=zarr_2_or_3.open_mode_append)

LOG.error("Target already exists, use either --overwrite or --resume.")
sys.exit(1)
Expand Down Expand Up @@ -495,7 +469,6 @@ def add_arguments(self, command_parser: Any) -> None:
help="Verbosity level. 0 is silent, 1 is normal, 2 is verbose.",
default=1,
)
command_parser.add_argument("--nested", action="store_true", help="Use ZARR's nested directpry backend.")
command_parser.add_argument(
"--rechunk", help="Rechunk the target data array. Rechunk size should be a diviser of the block size."
)
Expand Down
1 change: 1 addition & 0 deletions src/anemoi/datasets/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def add_arguments(self, command_parser: Any) -> None:
group.add_argument("--threads", help="Use `n` parallel thread workers.", type=int, default=0)
group.add_argument("--processes", help="Use `n` parallel process workers.", type=int, default=0)
command_parser.add_argument("--trace", action="store_true")
command_parser.add_argument("--force-zarr3", action="store_true")

def run(self, args: Any) -> None:
"""Execute the create command.
Expand Down
1 change: 1 addition & 0 deletions src/anemoi/datasets/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add_arguments(self, subparser: Any) -> None:
subparser.add_argument("--cache", help="Location to store the downloaded data.", metavar="DIR")

subparser.add_argument("--trace", action="store_true")
subparser.add_argument("--force-zarr3", action="store_true", help="Force the use of Zarr v3 format.")

def run(self, args: Any) -> None:
"""Execute the command with the provided arguments.
Expand Down
6 changes: 3 additions & 3 deletions src/anemoi/datasets/commands/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def ready(self) -> bool:
if "_build_flags" not in self.zarr:
return False

build_flags = self.zarr["_build_flags"]
build_flags = self.zarr["_build_flags"][:]
return all(build_flags)

@property
Expand Down Expand Up @@ -711,15 +711,15 @@ def build_flags(self) -> Optional[NDArray]:
if "_build" not in self.zarr:
return None
build = self.zarr["_build"]
return build.get("flags")
return build.get("flags")[:]

@property
def build_lengths(self) -> Optional[NDArray]:
"""Get the build lengths for the dataset."""
if "_build" not in self.zarr:
return None
build = self.zarr["_build"]
return build.get("lengths")
return build.get("lengths")[:]


VERSIONS = {
Expand Down
46 changes: 39 additions & 7 deletions src/anemoi/datasets/create/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from anemoi.datasets.data.misc import as_first_date
from anemoi.datasets.data.misc import as_last_date
from anemoi.datasets.dates.groups import Groups
from anemoi.datasets.zarr_versions import zarr_2_or_3

from .check import DatasetName
from .check import check_data_values
Expand Down Expand Up @@ -156,7 +157,7 @@ def _path_readable(path: str) -> bool:
try:
zarr.open(path, "r")
return True
except zarr.errors.PathNotFoundError:
except zarr_2_or_3.FileNotFoundException:
return False


Expand All @@ -173,6 +174,11 @@ def __init__(self, path: str):
"""
self.path = path

# if zarr_2_or_3.version != 2:
# raise ValueError(
# f"Only zarr version 2 is supported when creating datasets, found version: {zarr.__version__}"
# )

_, ext = os.path.splitext(self.path)
if ext != ".zarr":
raise ValueError(f"Unsupported extension={ext} for path={self.path}")
Expand All @@ -192,10 +198,9 @@ def add_dataset(self, mode: str = "r+", **kwargs: Any) -> zarr.Array:
zarr.Array
The added dataset.
"""
import zarr

z = zarr.open(self.path, mode=mode)
from .zarr import add_zarr_dataset
from .misc import add_zarr_dataset

return add_zarr_dataset(zarr_root=z, **kwargs)

Expand All @@ -210,7 +215,7 @@ def update_metadata(self, **kwargs: Any) -> None:
import zarr

LOG.debug(f"Updating metadata {kwargs}")
z = zarr.open(self.path, mode="w+")
z = zarr.open(self.path, mode=zarr_2_or_3.open_mode_append)
for k, v in kwargs.items():
if isinstance(v, np.datetime64):
v = v.astype(datetime.datetime)
Expand Down Expand Up @@ -445,7 +450,7 @@ def check_missing_dates(expected: list[np.datetime64]) -> None:
"""
import zarr

z = zarr.open(path, "r")
z = zarr.open(path, mode="r")
missing_dates = z.attrs.get("missing_dates", [])
missing_dates = sorted([np.datetime64(d) for d in missing_dates])
if missing_dates != expected:
Expand Down Expand Up @@ -517,7 +522,7 @@ class HasRegistryMixin:
@cached_property
def registry(self) -> Any:
"""Get the registry."""
from .zarr import ZarrBuiltRegistry
from .misc import ZarrBuiltRegistry

return ZarrBuiltRegistry(self.path, use_threads=self.use_threads)

Expand Down Expand Up @@ -581,6 +586,7 @@ def __init__(
progress: Any = None,
test: bool = False,
cache: Optional[str] = None,
force_zarr3: bool = False,
**kwargs: Any,
):
"""Initialize an Init instance.
Expand Down Expand Up @@ -609,6 +615,32 @@ def __init__(
if _path_readable(path) and not overwrite:
raise Exception(f"{path} already exists. Use overwrite=True to overwrite.")

version = zarr_2_or_3.version
if not zarr_2_or_3.supports_datetime64():
LOG.warning("⚠️" * 80)
LOG.warning(f"This version of Zarr ({zarr.__version__}) does not support datetime64.")
LOG.warning("⚠️" * 80)

if version != 2:

pytesting = "PYTEST_CURRENT_TEST" in os.environ

if pytesting or force_zarr3:
LOG.warning("⚠️" * 80)
LOG.warning("Zarr version 3 is used, but this is an unsupported feature.")
LOG.warning("⚠️" * 80)
else:
LOG.warning("⚠️" * 80)
LOG.warning(
f"Only Zarr version 2 is supported when creating datasets, found version: {zarr.__version__}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not support writing in zarr3 format for the time being, only reading. Is there a use case for it?

)
LOG.warning("If you want to use Zarr version 3, please set --force-zarr3 option.")
LOG.warning("Please note that this is an unsupported feature.")
LOG.warning("⚠️" * 80)
raise ValueError(
f"Only Zarr version 2 is supported when creating datasets, found version: {zarr.__version__}"
)

super().__init__(path, cache=cache)
self.config = config
self.check_name = check_name
Expand Down Expand Up @@ -1520,7 +1552,7 @@ def run(self) -> None:

LOG.info(stats)

if not all(self.registry.get_flags(sync=False)):
if not all(self.registry.get_flags()):
raise Exception(f"❗Zarr {self.path} is not fully built, not writing statistics into dataset.")

for k in [
Expand Down
Loading