Skip to content

Refactor polars configuration #18516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: branch-25.06
Choose a base branch
from

Conversation

TomAugspurger
Copy link
Contributor

Description

This updates our internal handling of the user-provided configuration for our polars GPUEngine. We use a set of dataclasses to manage the configuration options.

There aren't any user-facing changes.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

This updates our internal handling of the user-provided configuration
for our polars GPUEngine. We use a set of dataclasses to manage the
configuration options.
Copy link

copy-pr-bot bot commented Apr 17, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions bot added Python Affects Python cuDF API. cudf.polars Issues specific to cudf.polars labels Apr 17, 2025
- Restore old keyword name
- Fixed typed pass through to callback
- Adjust the valid shuffle options
@TomAugspurger
Copy link
Contributor Author

We need to figure out exactly how much validation to do, and which errors we raise when we encounter an issue.

@TomAugspurger TomAugspurger added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Apr 18, 2025
- move to enums
- validation
- more tests
- fixed type issues
@TomAugspurger TomAugspurger marked this pull request as ready for review April 21, 2025 13:44
@TomAugspurger TomAugspurger requested a review from a team as a code owner April 21, 2025 13:44
@TomAugspurger
Copy link
Contributor Author

TomAugspurger commented Apr 21, 2025

Looking into the CI failures

_______________________ test_parallel_scan[csv-scan_csv] _______________________
[gw0] linux -- Python 3.10.17 /opt/conda/envs/test/bin/python
Traceback (most recent call last):
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/runner.py", line 341, in from_call
    result: Optional[TResult] = func()
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/runner.py", line 262, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_hooks.py", line 513, in __call__
    return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_manager.py", line 120, in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_callers.py", line 182, in _multicall
    return outcome.get_result()
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_result.py", line 100, in get_result
    raise exc.with_traceback(exc.__traceback__)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_callers.py", line 103, in _multicall
    res = hook_impl.function(*args)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/runner.py", line 177, in pytest_runtest_call
    raise e
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/runner.py", line 169, in pytest_runtest_call
    item.runtest()
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/python.py", line 1792, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_hooks.py", line 513, in __call__
    return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_manager.py", line 120, in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_callers.py", line 139, in _multicall
    raise exception.with_traceback(exception.__traceback__)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/pluggy/_callers.py", line 103, in _multicall
    res = hook_impl.function(*args)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/_pytest/python.py", line 194, in pytest_pyfunc_call
    result = testfunction(**testargs)
  File "/__w/cudf/cudf/python/cudf_polars/tests/experimental/test_scan.py", line 60, in test_parallel_scan
    assert_gpu_result_equal(q, engine=engine)
  File "/__w/cudf/cudf/python/cudf_polars/cudf_polars/testing/asserts.py", line 107, in assert_gpu_result_equal
    got = lazydf.collect(**final_cudf_collect_kwargs, engine=engine)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/polars/_utils/deprecation.py", line 93, in wrapper
    return function(*args, **kwargs)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 2188, in collect
    return wrap_df(ldf.collect(engine, callback))
polars.exceptions.ComputeError: TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7fdec297a500>\n 0. 140594695501376\n>')
------------------------------ Captured log call -------------------------------
ERROR    distributed.protocol.pickle:pickle.py:79 Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fdec297a500>
 0. 140594695501376
>.
Traceback (most recent call last):
  File "/opt/conda/envs/test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
  File "<stringsource>", line 2, in rmm.pylibrmm.memory_resource.CudaMemoryResource.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
  File "<stringsource>", line 2, in rmm.pylibrmm.memory_resource.CudaMemoryResource.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "/opt/conda/envs/test/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
  File "<stringsource>", line 2, in rmm.pylibrmm.memory_resource.CudaMemoryResource.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

I wasn't able to immediately reproduce the failure.

Edit: I wasn't passing the --executor streaming --scheduler distributed flags. With those I can reproduce it.

@TomAugspurger
Copy link
Contributor Author

One other thing to figure out: we have a couple other configuration options floating around as environment variables:

  • CUDF_POLARS_NUM_WORKERS
  • POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY
  • POLARS_VERBOSE

Perhaps not POLARS_VERBOSE since it's a general polars setting, but maybe the others. I'd suggest making all our configuration options configurable through environment variables (with a consistent prefix) and through kwargs passed to GPUEngine.

But that can probably wait for a separate PR

ConfigOptions are serialized when using the distributed scheduler.
It's not immediatly clear what serialization / deserialization a
concrete MemoryResource object means.
Copy link
Contributor

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TomAugspurger, this is a nice change to make! I left some suggestions

less than or equal to 1. Each factor estimates the fractional number of
unique values in the column. By default, a value of ``1.0`` is used
for any column not found in ``cardinality_factor``.
parquet_blocksize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet_blocksize is related to partitioning, so it makes sense that it is in the StreamingExecutor. Can we name it something like partition_blocksize to avoid confusion since there's a ParquetOptions dataclass too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. It's a bit verbose, but I think parquet_partition_blocksize might be best since this is specific to parquet. @rjzamora how does that sound? I can put in a bit of compat code to accept either the new or old name.

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a first pass at this. I really like the overall design!

I pointed out a few details that need to be adjusted/resolved somehow.

Comment on lines +216 to +218
assert ir.config_options.executor.name == "streaming", (
"'in-memory' executor not supported in 'generate_ir_tasks'"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these kinds of assertions are necessary. Is there a specific reason you think the "in-memory" executor might end up here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type signature of ir.config_options.executor is StreamingExecutor | InMemoryExecutor, so without this we get:

$ mypy python/cudf_polars/cudf_polars
python/cudf_polars/cudf_polars/experimental/groupby.py:222: error: Item "InMemoryExecutor" of "StreamingExecutor | InMemoryExecutor" has no attribute "cardinality_factor"  [union-attr]
python/cudf_polars/cudf_polars/experimental/groupby.py:286: error: Item "InMemoryExecutor" of "StreamingExecutor | InMemoryExecutor" has no attribute "groupby_n_ary"  [union-attr]
Found 2 errors in 1 file (checked 50 source files)

So as far as mypy knows, it is possible to get here with an InMemoryExecutor.

I think that if we wanted to avoid the asserts, we would need to make IR generic over config_options and config_options generic over executor. I can attempt that (though IR is subclassing the generic Node, so it might get a little tricky. We'll see).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay - No need to avid the assertions if it's a pain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone more familiar with mypy and generics could probably make this work, but I got stuck on the first part: making ConfigOptions generic over the executor. Here's my test program that failed:

from __future__ import annotations

import dataclasses
import typing


@dataclasses.dataclass
class A:
    name: typing.Literal["a"] = dataclasses.field(default="a", init=False)


@dataclasses.dataclass
class B:
    name: typing.Literal["b"] = dataclasses.field(default="b", init=False)


E = typing.TypeVar("E", A, B)


@dataclasses.dataclass
class C(typing.Generic[E]):
    value: E = dataclasses.field(default_factory=A)

    @classmethod
    def from_value(cls, value: str) -> C[E]:
        match value:
            case "a":
                return cls(value=A())
            case "b":
                return cls(value=B())
            case _:
                raise ValueError(f"Invalid value: {value}")


reveal_type(C.from_value("a"))
reveal_type(C.from_value("b"))

which gives

$ mypy t.py
t.py:22: error: Incompatible types in assignment (expression has type "A", variable has type "E")  [assignment]
t.py:28: error: Argument "value" to "C" has incompatible type "A"; expected "B"  [arg-type]
t.py:30: error: Argument "value" to "C" has incompatible type "B"; expected "A"  [arg-type]
t.py:35: note: Revealed type is "t.C[t.A]"
t.py:36: note: Revealed type is "t.C[t.A]"
Found 3 errors in 1 file (checked 1 source file)


>>> chunked = config_options.get("parquet_options.chunked")
TASKS = "tasks"
RAPIDSMFP = "rapidsmpf"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need an "auto" option to emulate the None default we had previously (i.e. we should try using "rapidsmp" when we have a distributed schedule and rapidsmpf is available, otherwise use "tasks")

Copy link
Contributor Author

@TomAugspurger TomAugspurger Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll document this, but on ConfigOptions this is shuffle_method: ShuffleMethod | None and the default is to try rapidsmpf and fall back.

We could add "auto" as an allowed value here, but that would maybe require importing rapidsmp when we initialize the config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll document this, but on ConfigOptions this is shuffle_method: ShuffleMethod | None and the default is to try rapidsmpf and fall back.

Ah, okay.

We could add "auto" as an allowed value here, but that would maybe require importing rapidsmp when we initialize the config.

I was imagining the case that "auto" is the shuffle method as far as the ShuffleMethod is concerned. We wouldn't check if rapidsmpf was available until we needed to build the graph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that's what we should have now, it's just via executor.shuffle_method is None.

@TomAugspurger
Copy link
Contributor Author

TomAugspurger commented Apr 21, 2025

Is there an easy way to see which lines aren't covered in the CI failure here https://github.com/rapidsai/cudf/actions/runs/14580875852/job/40898672037?pr=18516#step:10:703?

I have 100% coverage locally when I run ./ci/run_cudf_polars_pytests.sh --cov cudf_polars --cov-fail-under=100 --cov-config=./pyproject.toml --junitxml=junit-cudf-polars.xml

Edit: nevermind, I can reproduce it now. At least locally, it's just from the if TYPE_CHECKING: block not being measured.

Edit again: I think my failing one was getting overwritten by a subsquent run in that script. Anyway, the actual failure is python -m pytest --cov cudf_polars --cov-fail-under=100 --cov-config=pyproject.toml --executor streaming tests and coverage report -m shows that it's from

cudf_polars/dsl/ir.py                          789      1    99%   632

That line isn't changed, but clearly I've changed the semantics of some configuration value. I'll investigate.

This is a convenience class to help manage the nested
dictionary of user-accessible `GPUEngine` options.
Upon encountering an unsupported operation, the streaming executor will fall
back to using a single-partition, which might use a large amount of memory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
back to using a single-partition, which might use a large amount of memory.
back to using a single partition, which might use a large amount of memory.

Nit.

Comment on lines +33 to +34
* ``FallbackMode.WARN`` : Emit a warning and fall back to the CPU engine.
* ``FallbackMode.SILENT``: Silently fall back to the CPU engine.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* ``FallbackMode.WARN`` : Emit a warning and fall back to the CPU engine.
* ``FallbackMode.SILENT``: Silently fall back to the CPU engine.
* ``FallbackMode.WARN`` : Emit a warning and fall back to a single partition.
* ``FallbackMode.SILENT``: Silently fall back to a single partition.

raise TypeError("broadcast_join_limit must be an int")

def __hash__(self) -> int:
# cardinatlity factory, a dict, isn't natively hashable. We'll dump it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# cardinatlity factory, a dict, isn't natively hashable. We'll dump it
# cardinality factory, a dict, isn't natively hashable. We'll dump it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cudf.polars Issues specific to cudf.polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants