Skip to content

feat: support interface for BigQuery managed functions #1373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bigframes/_config/experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ExperimentOptions:
def __init__(self):
self._semantic_operators: bool = False
self._blob: bool = False
self._udf: bool = False

@property
def semantic_operators(self) -> bool:
Expand Down Expand Up @@ -53,3 +54,17 @@ def blob(self, value: bool):
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._blob = value

@property
def udf(self) -> bool:
return self._udf

@udf.setter
def udf(self, value: bool):
if value is True:
msg = (
"BigFrames managed function (udf) is still under experiments. "
"It may not work and subject to change in the future."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._udf = value
13 changes: 10 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4097,9 +4097,16 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
msg = "axis=1 scenario is in preview."
warnings.warn(msg, category=bfe.PreviewWarning)

# Check if the function is a remote function
if not hasattr(func, "bigframes_remote_function"):
raise ValueError("For axis=1 a remote function must be used.")
# TODO(jialuo): Deprecate the "bigframes_remote_function" attribute.
# We have some tests using pre-defined remote_function that were
# defined based on "bigframes_remote_function" instead of
# "bigframes_bigquery_function". So we need to fix those pre-defined
# remote functions before deprecating the "bigframes_remote_function"
# attribute. Check if the function is a remote function.
if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_bigquery_function"
):
raise ValueError("For axis=1 a bigframes function must be used.")

is_row_processor = getattr(func, "is_row_processor")
if is_row_processor:
Expand Down
180 changes: 131 additions & 49 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import random
import shutil
import string
import sys
import tempfile
import textwrap
import types
from typing import cast, Tuple, TYPE_CHECKING

Expand Down Expand Up @@ -55,39 +55,86 @@


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation
# Wait time (in seconds) for an IAM binding to take effect after creation.
_iam_wait_seconds = 120

# TODO(b/392707725): Convert all necessary parameters for cloud function
# deployment into method parameters.
def __init__(
self,
gcp_project_id,
cloud_function_region,
cloud_functions_client,
bq_location,
bq_dataset,
bq_client,
bq_connection_id,
bq_connection_manager,
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
cloud_function_region=None,
cloud_functions_client=None,
cloud_function_service_account=None,
cloud_function_kms_key_name=None,
cloud_function_docker_repository=None,
*,
session: Session,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._bq_location = bq_location
self._bq_dataset = bq_dataset
self._bq_client = bq_client
self._bq_connection_id = bq_connection_id
self._bq_connection_manager = bq_connection_manager
self._session = session

# Optional attributes only for remote functions.
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._cloud_function_service_account = cloud_function_service_account
self._cloud_function_kms_key_name = cloud_function_kms_key_name
self._cloud_function_docker_repository = cloud_function_docker_repository
self._session = session

def _create_bq_connection(self) -> None:
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)

def _ensure_dataset_exists(self) -> None:
# Make sure the dataset exists, i.e. if it doesn't exist, go ahead and
# create it.
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission.
self._bq_client.create_dataset(dataset, exists_ok=True)

def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def _format_function_options(self, function_options: dict) -> str:
return ", ".join(
[
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
for key, val in function_options.items()
if val is not None
]
)

def create_bq_remote_function(
self,
Expand All @@ -101,13 +148,7 @@ def create_bq_remote_function(
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)
self._create_bq_connection()

# Create BQ function
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
Expand All @@ -128,12 +169,8 @@ def create_bq_remote_function(
# bigframes specific metadata for the lack of a better option
remote_function_options["description"] = metadata

remote_function_options_str = ", ".join(
[
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
for key, val in remote_function_options.items()
if val is not None
]
remote_function_options_str = self._format_function_options(
remote_function_options
)

create_function_ddl = f"""
Expand All @@ -144,31 +181,78 @@ def create_bq_remote_function(

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
# create it
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)
self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
def provision_bq_managed_function(
self,
func,
input_types,
output_type,
name,
packages,
is_row_processor,
):
"""Create a BigQuery managed function."""
import cloudpickle

pickled = cloudpickle.dumps(func)

# Create BQ managed function.
bq_function_args = []
bq_function_return_type = output_type

input_args = inspect.getargs(func.__code__).args
# We expect the input type annotations to be 1:1 with the input args.
for name_, type_ in zip(input_args, input_types):
bq_function_args.append(f"{name_} {type_}")

managed_function_options = {
"runtime_version": _utils.get_python_version(),
"entry_point": "bigframes_handler",
}

# Augment user package requirements with any internal package
# requirements.
packages = _utils._get_updated_package_requirements(packages, is_row_processor)
if packages:
managed_function_options["packages"] = packages
managed_function_options_str = self._format_function_options(
managed_function_options
)

logger.info(f"Created remote function {query_job.ddl_target_routine}")
session_id = None if name else self._session.session_id
bq_function_name = name
if not bq_function_name:
# Compute a unique hash representing the user code.
function_hash = _utils._get_hash(func, packages)
bq_function_name = _utils.get_bigframes_function_name(
function_hash,
session_id,
)

persistent_func_id = (
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
)
create_function_ddl = textwrap.dedent(
f"""
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
LANGUAGE python
OPTIONS ({managed_function_options_str})
AS r'''
import cloudpickle
udf = cloudpickle.loads({pickled})
def bigframes_handler(*args):
return udf(*args)
'''
"""
).strip()

self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

return bq_function_name

def get_cloud_function_fully_qualified_parent(self):
"Get the fully qualilfied parent for a cloud function."
Expand Down Expand Up @@ -262,9 +346,7 @@ def create_cloud_function(
# TODO(shobs): Figure out how to achieve version compatibility, specially
# when pickle (internally used by cloudpickle) guarantees that:
# https://docs.python.org/3/library/pickle.html#:~:text=The%20pickle%20serialization%20format%20is,unique%20breaking%20change%20language%20boundary.
python_version = "python{}{}".format(
sys.version_info.major, sys.version_info.minor
)
python_version = _utils.get_python_version(is_compat=True)

# Determine an upload URL for user code
upload_url_request = functions_v2.GenerateUploadUrlRequest(
Expand Down Expand Up @@ -443,7 +525,7 @@ def provision_bq_remote_function(
# Derive the name of the remote function
remote_function_name = name
if not remote_function_name:
remote_function_name = _utils.get_remote_function_name(
remote_function_name = _utils.get_bigframes_function_name(
function_hash, self._session.session_id, uniq_suffix
)
rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name)
Expand Down
Loading