diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 893b903aeb..c947fcdc63 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -22,6 +22,7 @@ from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union import warnings +import cloudpickle import google.api_core.exceptions from google.cloud import ( bigquery, @@ -458,6 +459,11 @@ def wrapper(func): session=session, # type: ignore ) + # To respect the user code/environment let's use a copy of the + # original udf, especially since we would be setting some properties + # on it + func = cloudpickle.loads(cloudpickle.dumps(func)) + # In the unlikely case where the user is trying to re-deploy the same # function, cleanup the attributes we add below, first. This prevents # the pickle from having dependencies that might not otherwise be @@ -499,6 +505,18 @@ def try_delattr(attr): cloud_function_memory_mib=cloud_function_memory_mib, ) + # TODO(shobs): Find a better way to support udfs with param named "name". + # This causes an issue in the ibis compilation. + func.__signature__ = inspect.signature(func).replace( # type: ignore + parameters=[ + inspect.Parameter( + f"bigframes_{param.name}", + param.kind, + ) + for param in inspect.signature(func).parameters.values() + ] + ) + # TODO: Move ibis logic to compiler step node = ibis.udf.scalar.builtin( func, diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 39e3bfd8f0..b4c74e90d6 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -144,16 +144,21 @@ def read_gbq_function( # The name "args" conflicts with the Ibis operator, so we use # non-standard names for the arguments here. - def func(*ignored_args, **ignored_kwargs): + def func(*bigframes_args, **bigframes_kwargs): f"""Remote function {str(routine_ref)}.""" nonlocal node # type: ignore - expr = node(*ignored_args, **ignored_kwargs) # type: ignore + expr = node(*bigframes_args, **bigframes_kwargs) # type: ignore return ibis_client.execute(expr) func.__signature__ = inspect.signature(func).replace( # type: ignore parameters=[ - inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD) + # TODO(shobs): Find a better way to support functions with param + # named "name". This causes an issue in the ibis compilation. + inspect.Parameter( + f"bigframes_{name}", + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) for name in ibis_signature.parameter_names ] ) diff --git a/bigframes/series.py b/bigframes/series.py index 5192a9cf49..d9e3bb19dd 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1481,12 +1481,8 @@ def apply( ex.message += f"\n{_remote_function_recommendation_message}" raise - # We are working with remote function at this point. - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_series = Series(self._block._force_reproject()) - result_series = reprojected_series._apply_unary_op( + # We are working with remote function at this point + result_series = self._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index b000354ed4..5ffda56f92 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import re import google.api_core.exceptions @@ -972,3 +973,112 @@ def echo_len(row): bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." ): scalars_df[[column]].apply(echo_len_remote, axis=1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_application_repr(session, dataset_id_permanent): + # This function deliberately has a param with name "name", this is to test + # a specific ibis' internal handling of object names + def should_mask(name: str) -> bool: + hash = 0 + for char_ in name: + hash += ord(char_) + return hash % 2 == 0 + + assert "name" in inspect.signature(should_mask).parameters + + should_mask = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(should_mask) + )(should_mask) + + s = bigframes.series.Series(["Alice", "Bob", "Caroline"]) + + repr(s.apply(should_mask)) + repr(s.where(s.apply(should_mask))) + repr(s.where(~s.apply(should_mask))) + repr(s.mask(should_mask)) + repr(s.mask(should_mask, "REDACTED")) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_index): + gbq_function = f"{dataset_id}.should_mask" + + # This function deliberately has a param with name "name", this is to test + # a specific ibis' internal handling of object names + session.bqclient.query_and_wait( + f"CREATE OR REPLACE FUNCTION `{gbq_function}`(name STRING) RETURNS BOOL AS (MOD(LENGTH(name), 2) = 1)" + ) + routine = session.bqclient.get_routine(gbq_function) + assert "name" in [arg.name for arg in routine.arguments] + + # read the function and apply to dataframe + should_mask = session.read_gbq_function(gbq_function) + + s = scalars_df_index["string_col"] + + repr(s.apply(should_mask)) + repr(s.where(s.apply(should_mask))) + repr(s.where(~s.apply(should_mask))) + repr(s.mask(should_mask)) + repr(s.mask(should_mask, "REDACTED")) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs): + + # This function is deliberately written to not work with NA input + def plus_one(x: int) -> int: + return x + 1 + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + + # make sure there are NA values in the test column + assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]]) + + # create a remote function + plus_one_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(plus_one) + )(plus_one) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas() + + # after filtering out nulls the remote function application should works + # similar to pandas + pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ].apply(plus_one) + bf_result = ( + scalars_df[scalars_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ] + .apply(plus_one_remote) + .to_pandas() + ) + + # ignore pandas "int64" vs bigframes "Int64" dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent): + session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial")) + + df = session.read_gbq("bigquery-public-data.baseball.schedules")[ + ["duration_minutes"] + ] + + def plus_one(x: int) -> int: + return x + 1 + + plus_one = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(plus_one) + )(plus_one) + + df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one)) + repr(df1)