Skip to content

feat: support interface for BigQuery managed functions #1373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bigframes/_config/experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ExperimentOptions:
def __init__(self):
self._semantic_operators: bool = False
self._blob: bool = False
self._udf: bool = False

@property
def semantic_operators(self) -> bool:
Expand Down Expand Up @@ -53,3 +54,17 @@ def blob(self, value: bool):
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._blob = value

@property
def udf(self) -> bool:
return self._udf

@udf.setter
def udf(self, value: bool):
if value is True:
msg = (
"BigFrames managed function (udf) is still under experiments. "
"It may not work and subject to change in the future."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._udf = value
8 changes: 5 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3997,9 +3997,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
msg = "axis=1 scenario is in preview."
warnings.warn(msg, category=bfe.PreviewWarning)

# Check if the function is a remote function
if not hasattr(func, "bigframes_remote_function"):
raise ValueError("For axis=1 a remote function must be used.")
# Check if the function is a remote function.
if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_function"
):
raise ValueError("For axis=1 a bigframes function must be used.")

is_row_processor = getattr(func, "is_row_processor")
if is_row_processor:
Expand Down
155 changes: 117 additions & 38 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,39 +55,77 @@


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation
# Wait time (in seconds) for an IAM binding to take effect after creation.
_iam_wait_seconds = 120

# TODO(b/392707725): Convert all necessary parameters for cloud function
# deployment into method parameters.
def __init__(
self,
gcp_project_id,
cloud_function_region,
cloud_functions_client,
bq_location,
bq_dataset,
bq_client,
bq_connection_id,
bq_connection_manager,
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
cloud_function_region=None,
cloud_functions_client=None,
cloud_function_service_account=None,
cloud_function_kms_key_name=None,
cloud_function_docker_repository=None,
*,
session: Session,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._bq_location = bq_location
self._bq_dataset = bq_dataset
self._bq_client = bq_client
self._bq_connection_id = bq_connection_id
self._bq_connection_manager = bq_connection_manager
self._session = session

# Optional attributes only for remote functions.
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._cloud_function_service_account = cloud_function_service_account
self._cloud_function_kms_key_name = cloud_function_kms_key_name
self._cloud_function_docker_repository = cloud_function_docker_repository
self._session = session

def _create_bq_connection(self) -> None:
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)

def _ensure_dataset_exists(self) -> None:
# Make sure the dataset exists, i.e. if it doesn't exist, go ahead and
# create it.
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission.
self._bq_client.create_dataset(dataset, exists_ok=True)

def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def create_bq_remote_function(
self,
Expand All @@ -101,13 +139,7 @@ def create_bq_remote_function(
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)
self._create_bq_connection()

# Create BQ function
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
Expand Down Expand Up @@ -144,31 +176,78 @@ def create_bq_remote_function(

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
# create it
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)
self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
def create_bq_managed_function(
self,
func,
input_types,
output_type,
language,
runtime_version,
bq_function_name,
packages,
):
"""Create a BigQuery managed function."""
self._create_bq_connection()

import cloudpickle

pickled = cloudpickle.dumps(func)

code_block = f"""\
import cloudpickle

udf = cloudpickle.loads({pickled})

def managed_func(*args, **kwargs):
return udf(*args, **kwargs)
"""
# Create BQ managed function.
bq_function_args = []
bq_function_return_type = output_type

input_args = inspect.getargs(func.__code__).args
# We expect the input type annotations to be 1:1 with the input args.
for name, type_ in zip(input_args, input_types):
bq_function_args.append(f"{name} {type_}")

managed_function_options = {
"runtime_version": runtime_version,
"entry_point": "managed_func",
}

managed_function_options_str = ", ".join(
[
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
for key, val in managed_function_options.items()
if val is not None
]
)
if not packages:
packages = ["cloudpickle"]
if "cloudpickle" not in packages:
packages += ["cloudpickle"]
managed_function_options_str = (
f"{managed_function_options_str}, packages={packages}"
)

logger.info(f"Created remote function {query_job.ddl_target_routine}")
persistent_func_id = (
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
)
create_function_ddl = f"""
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
LANGUAGE {language}
OPTIONS ({managed_function_options_str})
AS r'''
{code_block}
'''
"""

self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

def get_cloud_function_fully_qualified_parent(self):
"Get the fully qualilfied parent for a cloud function."
Expand Down
Loading