Skip to content

Commit 7dd2743

Browse files
ctuguinayleewujung
andauthored
Enhance compute_MVBS feasability (#1470)
* add range var max and reindex to compute_MVBS parameters and drop flox maximum version * add tests for compute MVBS changes * add todo for compute NASC * add np.nan fill value * Update echopype/commongrid/api.py Co-authored-by: Wu-Jung Lee <[email protected]> --------- Co-authored-by: Wu-Jung Lee <[email protected]>
1 parent e5d1b2a commit 7dd2743

File tree

4 files changed

+86
-7
lines changed

4 files changed

+86
-7
lines changed

echopype/commongrid/api.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ def compute_MVBS(
3232
range_var: Literal["echo_range", "depth"] = "echo_range",
3333
range_bin: str = "20m",
3434
ping_time_bin: str = "20s",
35-
method="map-reduce",
36-
skipna=True,
35+
method: str = "map-reduce",
36+
reindex: bool = False,
37+
skipna: bool = True,
38+
fill_value: float = np.nan,
3739
closed: Literal["left", "right"] = "left",
40+
range_var_max: str = None,
3841
**flox_kwargs,
3942
):
4043
"""
@@ -63,11 +66,23 @@ def compute_MVBS(
6366
The flox strategy for reduction of dask arrays only.
6467
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
6568
for more details.
69+
reindex: bool, default False
70+
If False, reindex after the blockwise stage. If True, reindex at the blockwise stage.
71+
Generally, `reindex=False` results in less memory at the cost of computation speed.
72+
Can only be used when method='map-reduce'.
73+
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
74+
for more details.
6675
skipna: bool, default True
6776
If true, the mean operation skips NaN values.
6877
Else, the mean operation includes NaN values.
78+
fill_value: float, default np.nan
79+
Fill value when no group data exists to aggregate.
6980
closed: {'left', 'right'}, default 'left'
7081
Which side of bin interval is closed.
82+
range_var_max: str, default None
83+
Range variable maximum. Can be true range variable maximum or the maximum depth the
84+
user wishes to regrid to. If known, users can pass in range variable maximum to
85+
ensure that `compute_MVBS` can lazily run without any computation.
7186
**flox_kwargs
7287
Additional keyword arguments to be passed
7388
to flox reduction function.
@@ -76,6 +91,8 @@ def compute_MVBS(
7691
-------
7792
A dataset containing bin-averaged Sv
7893
"""
94+
if method != "map-reduce" and reindex is not None:
95+
raise ValueError(f"Passing in reindex={reindex} is only allowed when method='map_reduce'.")
7996

8097
# Setup and validate
8198
# * Sv dataset must contain specified range_var
@@ -86,10 +103,15 @@ def compute_MVBS(
86103
if not isinstance(ping_time_bin, str):
87104
raise TypeError("ping_time_bin must be a string")
88105

89-
# create bin information for echo_range
90-
# this computes the echo range max since there might NaNs in the data
91-
echo_range_max = ds_Sv[range_var].max()
92-
range_interval = np.arange(0, echo_range_max + range_bin, range_bin)
106+
# Create bin information for the range variable
107+
if range_var_max is None:
108+
# This computes the range variable max since there might be NaNs in the data
109+
range_var_max = ds_Sv[range_var].max(skipna=True)
110+
else:
111+
# Parse string and small increase to ensure that we get the bin
112+
# corresponding to range_var_max
113+
range_var_max = _parse_x_bin(range_var_max) + 1e-8
114+
range_interval = np.arange(0, range_var_max + range_bin, range_bin)
93115

94116
# create bin information needed for ping_time
95117
d_index = (
@@ -109,7 +131,9 @@ def compute_MVBS(
109131
ping_interval,
110132
range_var=range_var,
111133
method=method,
134+
reindex=reindex,
112135
skipna=skipna,
136+
fill_value=fill_value,
113137
**flox_kwargs,
114138
)
115139

@@ -275,6 +299,7 @@ def compute_NASC(
275299
) -> xr.Dataset:
276300
"""
277301
Compute Nautical Areal Scattering Coefficient (NASC) from an Sv dataset.
302+
TODO: Add range_var_max and reindex parameters to match `compute_MVBS`.
278303
279304
Parameters
280305
----------

echopype/commongrid/utils.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ def compute_raw_MVBS(
2020
ping_interval: Union[pd.IntervalIndex, np.ndarray],
2121
range_var: Literal["echo_range", "depth"] = "echo_range",
2222
method="map-reduce",
23+
reindex=False,
2324
skipna=True,
25+
fill_value=np.nan,
2426
**flox_kwargs,
2527
):
2628
"""
@@ -46,9 +48,17 @@ def compute_raw_MVBS(
4648
The flox strategy for reduction of dask arrays only.
4749
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
4850
for more details.
51+
reindex: bool, default False
52+
If False, reindex after the blockwise stage. If True, reindex at the blockwise stage.
53+
Generally, `reindex=False` results in less memory at the cost of computation speed.
54+
Can only be used when method='map-reduce'.
55+
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
56+
for more details.
4957
skipna: bool, default True
5058
If true, the mean operation skips NaN values.
5159
Else, the mean operation includes NaN values.
60+
fill_value: float, default np.nan
61+
Fill value when no group data exists to aggregate.
5262
**flox_kwargs
5363
Additional keyword arguments to be passed
5464
to flox reduction function.
@@ -69,8 +79,10 @@ def compute_raw_MVBS(
6979
x_var=x_var,
7080
range_var=range_var,
7181
method=method,
82+
reindex=reindex,
7283
func="nanmean" if skipna else "mean",
7384
skipna=skipna,
85+
fill_value=fill_value,
7486
**flox_kwargs,
7587
)
7688

@@ -495,8 +507,10 @@ def _groupby_x_along_channels(
495507
x_var: Literal["ping_time", "distance_nmi"] = "ping_time",
496508
range_var: Literal["echo_range", "depth"] = "echo_range",
497509
method: str = "map-reduce",
510+
reindex: bool = False,
498511
func: str = "nanmean",
499512
skipna: bool = True,
513+
fill_value: float = np.nan,
500514
**flox_kwargs,
501515
) -> xr.Dataset:
502516
"""
@@ -534,6 +548,12 @@ def _groupby_x_along_channels(
534548
The flox strategy for reduction of dask arrays only.
535549
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
536550
for more details.
551+
reindex: bool, default False
552+
If False, reindex after the blockwise stage. If True, reindex at the blockwise stage.
553+
Generally, `reindex=False` results in less memory at the cost of computation speed.
554+
Can only be used when method='map-reduce'.
555+
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
556+
for more details.
537557
func: str, default 'nanmean'
538558
The aggregation function used for reducing the data array.
539559
By default, 'nanmean' is used. Other options can be found in the flox `documentation
@@ -543,6 +563,8 @@ def _groupby_x_along_channels(
543563
Else, aggregation function includes NaN values.
544564
Note that if ``func`` is set to 'mean' and ``skipna`` is set to True, then aggregation
545565
will have the same behavior as if func is set to 'nanmean'.
566+
fill_value: float, default np.nan
567+
Fill value when no group data exists to aggregate.
546568
**flox_kwargs
547569
Additional keyword arguments to be passed
548570
to flox reduction function.
@@ -593,8 +615,10 @@ def _groupby_x_along_channels(
593615
expected_groups=(None, x_interval, range_interval),
594616
isbin=[False, True, True],
595617
method=method,
618+
reindex=reindex,
596619
func=func,
597620
skipna=skipna,
621+
fill_value=fill_value,
598622
**flox_kwargs,
599623
)
600624
return sv_mean

echopype/tests/commongrid/test_commongrid_api.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,3 +555,33 @@ def test_assign_actual_range(request):
555555
round(float(ds_MVBS["Sv"].max().values), 2),
556556
]
557557
)
558+
559+
560+
@pytest.mark.integration
561+
def test_compute_MVBS_range_var_max(request):
562+
"""
563+
Tests compute_MVBS when user specified range_var_max is passed in.
564+
"""
565+
# Grab mock Sv dataset
566+
ds_Sv = request.getfixturevalue("mock_Sv_dataset_regular")
567+
568+
# Compute MVBS
569+
ds_MVBS = ep.commongrid.compute_MVBS(ds_Sv, range_bin="1m", range_var_max="8m")
570+
571+
# Ensure that last echo range value matches range variable maximum
572+
assert ds_MVBS["echo_range"].max().compute() == 8
573+
574+
575+
@pytest.mark.integration
576+
def test_compute_reindex_non_NaN_not_map_reduce(request):
577+
"""
578+
Tests compute_MVBS when user passes in reindex non-NaN without method as map-reduce.
579+
"""
580+
# Grab mock Sv dataset
581+
ds_Sv = request.getfixturevalue("mock_Sv_dataset_regular")
582+
583+
# Compute MVBS with invalid parameters
584+
for method in ["blockwise", "cohorts"]:
585+
for reindex in [True, False]:
586+
with pytest.raises(ValueError, match=f"Passing in reindex={reindex} is only allowed when method='map_reduce'."):
587+
ep.commongrid.compute_MVBS(ds_Sv, method=method, reindex=reindex)

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ aiohttp
22
bottleneck
33
dask[array,distributed]
44
dask-image
5-
flox>=0.7.2,<1.0.0
5+
flox>=0.7.2
66
fsspec
77
geopy
88
jinja2

0 commit comments

Comments
 (0)