Skip to content

Commit 3156569

Browse files
authored
Enable expression-based Dask Dataframe support (#4325)
**[WIP]** I'm using this PR to debug/add support for `DASK_DATAFRAME__QUERY_PLANNING=True`. **NOTES**: - Depends on dask/dask-expr#1041 [Merged] - Depends on dask/dask-expr#1044 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Rick Ratzel (https://github.com/rlratzel) - Ray Douglass (https://github.com/raydouglass) URL: #4325
1 parent 1c3f3a8 commit 3156569

File tree

12 files changed

+38
-37
lines changed

12 files changed

+38
-37
lines changed

benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def generate_rmat_dataset(
344344
del label_df
345345
gc.collect()
346346

347-
dask_label_df = dask_cudf.from_dask_dataframe(dask_label_df)
347+
dask_label_df = dask_label_df.to_backend("cudf")
348348

349349
node_offsets = {"paper": 0}
350350
edge_offsets = {("paper", "cites", "paper"): 0}

ci/test_python.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33

44
set -euo pipefail
55

6-
# TODO: Enable dask query planning (by default) once some bugs are fixed.
7-
# xref: https://github.com/rapidsai/cudf/issues/15027
8-
export DASK_DATAFRAME__QUERY_PLANNING=False
9-
106
# Support invoking test_python.sh outside the script directory
117
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../
128

ci/test_wheel.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33

44
set -eoxu pipefail
55

6-
# TODO: Enable dask query planning (by default) once some bugs are fixed.
7-
# xref: https://github.com/rapidsai/cudf/issues/15027
8-
export DASK_DATAFRAME__QUERY_PLANNING=False
9-
106
package_name=$1
117
package_dir=$2
128

python/cugraph/cugraph/dask/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
1+
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -11,6 +11,8 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14+
from dask import config
15+
1416
from .link_analysis.pagerank import pagerank
1517
from .link_analysis.hits import hits
1618
from .traversal.bfs import bfs
@@ -34,3 +36,6 @@
3436
from .link_prediction.sorensen import sorensen
3537
from .link_prediction.overlap import overlap
3638
from .community.leiden import leiden
39+
40+
# Avoid "p2p" shuffling in dask for now
41+
config.set({"dataframe.shuffle.method": "tasks"})

python/cugraph/cugraph/dask/common/input_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
1+
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -15,8 +15,8 @@
1515

1616
from collections.abc import Sequence
1717
from collections import OrderedDict
18-
from dask_cudf.core import DataFrame as dcDataFrame
19-
from dask_cudf.core import Series as daskSeries
18+
from dask_cudf import DataFrame as dcDataFrame
19+
from dask_cudf import Series as daskSeries
2020

2121
import cugraph.dask.comms.comms as Comms
2222

python/cugraph/cugraph/dask/common/part_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import collections
1919
import dask_cudf
2020
from dask.array.core import Array as daskArray
21-
from dask_cudf.core import DataFrame as daskDataFrame
22-
from dask_cudf.core import Series as daskSeries
21+
from dask_cudf import DataFrame as daskDataFrame
22+
from dask_cudf import Series as daskSeries
2323
from functools import reduce
2424
import cugraph.dask.comms.comms as Comms
2525
from dask.delayed import delayed

python/cugraph/cugraph/structure/convert_matrix.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def from_edgelist(
4040
4141
Parameters
4242
----------
43-
df : cudf.DataFrame, pandas.DataFrame, dask_cudf.core.DataFrame
43+
df : cudf.DataFrame, pandas.DataFrame, dask_cudf.DataFrame
4444
This DataFrame contains columns storing edge source vertices,
4545
destination (or target following NetworkX's terminology) vertices, and
4646
(optional) weights.
@@ -95,7 +95,7 @@ def from_edgelist(
9595
renumber=renumber,
9696
)
9797

98-
elif df_type is dask_cudf.core.DataFrame:
98+
elif df_type is dask_cudf.DataFrame:
9999
if create_using is None:
100100
G = Graph()
101101
elif isinstance(create_using, Graph):

python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -285,19 +285,20 @@ def __from_edgelist(
285285
symmetrize=not self.properties.directed,
286286
)
287287

288+
# Create a dask_cudf dataframe from the cudf series
289+
# or dataframe objects obtained from symmetrization
288290
if isinstance(source_col, dask_cudf.Series):
289-
# Create a dask_cudf dataframe from the cudf series obtained
290-
# from symmetrization
291-
input_ddf = source_col.to_frame()
292-
input_ddf = input_ddf.rename(columns={source_col.name: source})
293-
input_ddf[destination] = dest_col
291+
frames = [
292+
source_col.to_frame(name=source),
293+
dest_col.to_frame(name=destination),
294+
]
294295
else:
295-
# Multi column dask_cudf dataframe
296-
input_ddf = dask_cudf.concat([source_col, dest_col], axis=1)
296+
frames = [source_col, dest_col]
297297

298298
if value_col is not None:
299-
for vc in value_col_names:
300-
input_ddf[vc] = value_col[vc]
299+
frames.append(value_col[value_col_names])
300+
301+
input_ddf = dask_cudf.concat(frames, axis=1)
301302

302303
self.input_df = input_ddf
303304

python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ def df_type_id(dataframe_type):
159159
return s + "cudf.DataFrame"
160160
if dataframe_type == pd.DataFrame:
161161
return s + "pandas.DataFrame"
162-
if dataframe_type == dask_cudf.core.DataFrame:
163-
return s + "dask_cudf.core.DataFrame"
162+
if dataframe_type == dask_cudf.DataFrame:
163+
return s + "dask_cudf.DataFrame"
164164
return s + "?"
165165

166166

python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,17 @@ def test_mg_symmetrize(dask_client, read_datasets):
232232

233233
# create a dask DataFrame from the dask Series
234234
if isinstance(sym_src, dask_cudf.Series):
235-
ddf2 = sym_src.to_frame()
236-
ddf2 = ddf2.rename(columns={sym_src.name: "src"})
237-
ddf2["dst"] = sym_dst
235+
frames = [
236+
sym_src.to_frame(name="src"),
237+
sym_dst.to_frame(name="dst"),
238+
]
238239
else:
239-
ddf2 = dask_cudf.concat([sym_src, sym_dst], axis=1)
240+
frames = [sym_src, sym_dst]
240241

241242
if val_col_name is not None:
242-
ddf2["weight"] = sym_val
243+
frames.append(sym_val.to_frame(name="weight"))
244+
245+
ddf2 = dask_cudf.concat(frames, axis=1)
243246

244247
compare(ddf, ddf2, src_col_name, dst_col_name, val_col_name)
245248

python/cugraph/cugraph/tests/structure/test_graph_mg.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ def test_nodes_functionality(dask_client, input_combo):
9999
expected_nodes = (
100100
dask_cudf.concat([ddf["src"], ddf["dst"]])
101101
.drop_duplicates()
102-
.to_frame()
103-
.sort_values(0)
102+
.to_frame(name="0")
103+
.sort_values("0")
104104
)
105105

106106
expected_nodes = expected_nodes.compute().reset_index(drop=True)
107107

108-
result_nodes["expected_nodes"] = expected_nodes[0]
108+
result_nodes["expected_nodes"] = expected_nodes["0"]
109109

110110
compare = result_nodes.query("result_nodes != expected_nodes")
111111

python/cugraph/cugraph/tests/utils/test_dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def test_reader_dask(dask_client, dataset):
198198
E = dataset.get_dask_edgelist(download=True)
199199

200200
assert E is not None
201-
assert isinstance(E, dask_cudf.core.DataFrame)
201+
assert isinstance(E, dask_cudf.DataFrame)
202202
dataset.unload()
203203

204204

0 commit comments

Comments
 (0)