Skip to content

Commit 6ef05de

Browse files
authored
feat: Added the PipelineJob.from_pipeline_func method (#1415)
The new factory method reduces the pipeline submission boilerplate to absolute minimum. ```python aiplatform.PipelineJob.from_pipeline_func(training_pipeline).submit() ``` What it does: 1. Compiles pipeline 2. Provides sensible default values for the pipeline display name, job_id, context etc. 3. Generates GCS directory for the pipeline output artifacts if needed 4. Creates the GCS bucket for the artifacts if it does not exist. (And gives the Pipelines service account the required permissions) Example usage: ```python def training_pipeline(number_of_epochs: int = 10): train_op( number_of_epochs=number_of_epochs, learning_rate="0.1", ) job = aiplatform.PipelineJob.from_pipeline_func(training_pipeline) job.submit() ``` --- Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-aiplatform/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 91ed3a6 commit 6ef05de

File tree

5 files changed

+372
-4
lines changed

5 files changed

+372
-4
lines changed

google/cloud/aiplatform/pipeline_jobs.py

+139-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import logging
2020
import time
2121
import re
22-
from typing import Any, Dict, List, Optional, Union
22+
import tempfile
23+
from typing import Any, Callable, Dict, List, Optional, Union
2324

2425
from google.auth import credentials as auth_credentials
2526
from google.cloud import aiplatform
@@ -33,6 +34,7 @@
3334
from google.cloud.aiplatform.metadata import constants as metadata_constants
3435
from google.cloud.aiplatform.metadata import experiment_resources
3536
from google.cloud.aiplatform.metadata import utils as metadata_utils
37+
from google.cloud.aiplatform.utils import gcs_utils
3638
from google.cloud.aiplatform.utils import yaml_utils
3739
from google.cloud.aiplatform.utils import pipeline_utils
3840
from google.protobuf import json_format
@@ -131,7 +133,9 @@ def __init__(
131133
Optional. The unique ID of the job run.
132134
If not specified, pipeline name + timestamp will be used.
133135
pipeline_root (str):
134-
Optional. The root of the pipeline outputs. Default to be staging bucket.
136+
Optional. The root of the pipeline outputs. If not set, the staging bucket
137+
set in aiplatform.init will be used. If that's not set a pipeline-specific
138+
artifacts bucket will be used.
135139
parameter_values (Dict[str, Any]):
136140
Optional. The mapping from runtime parameter names to its values that
137141
control the pipeline run.
@@ -219,6 +223,13 @@ def __init__(
219223
or pipeline_job["pipelineSpec"].get("defaultPipelineRoot")
220224
or initializer.global_config.staging_bucket
221225
)
226+
pipeline_root = (
227+
pipeline_root
228+
or gcs_utils.generate_gcs_directory_for_pipeline_artifacts(
229+
project=project,
230+
location=location,
231+
)
232+
)
222233
builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json(
223234
pipeline_job
224235
)
@@ -332,6 +343,13 @@ def submit(
332343
if network:
333344
self._gca_resource.network = network
334345

346+
gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
347+
output_artifacts_gcs_dir=self.pipeline_spec.get("gcsOutputDirectory"),
348+
service_account=self._gca_resource.service_account,
349+
project=self.project,
350+
location=self.location,
351+
)
352+
335353
# Prevents logs from being supressed on TFX pipelines
336354
if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"):
337355
_LOGGER.setLevel(logging.INFO)
@@ -772,6 +790,125 @@ def clone(
772790

773791
return cloned
774792

793+
@staticmethod
794+
def from_pipeline_func(
795+
# Parameters for the PipelineJob constructor
796+
pipeline_func: Callable,
797+
parameter_values: Optional[Dict[str, Any]] = None,
798+
output_artifacts_gcs_dir: Optional[str] = None,
799+
enable_caching: Optional[bool] = None,
800+
context_name: Optional[str] = "pipeline",
801+
display_name: Optional[str] = None,
802+
labels: Optional[Dict[str, str]] = None,
803+
job_id: Optional[str] = None,
804+
# Parameters for the Vertex SDK
805+
project: Optional[str] = None,
806+
location: Optional[str] = None,
807+
credentials: Optional[auth_credentials.Credentials] = None,
808+
encryption_spec_key_name: Optional[str] = None,
809+
) -> "PipelineJob":
810+
"""Creates PipelineJob by compiling a pipeline function.
811+
812+
Args:
813+
pipeline_func (Callable):
814+
Required. A pipeline function to compile.
815+
A pipeline function creates instances of components and connects
816+
component inputs to outputs.
817+
parameter_values (Dict[str, Any]):
818+
Optional. The mapping from runtime parameter names to its values that
819+
control the pipeline run.
820+
output_artifacts_gcs_dir (str):
821+
Optional. The GCS location of the pipeline outputs.
822+
A GCS bucket for artifacts will be created if not specified.
823+
enable_caching (bool):
824+
Optional. Whether to turn on caching for the run.
825+
826+
If this is not set, defaults to the compile time settings, which
827+
are True for all tasks by default, while users may specify
828+
different caching options for individual tasks.
829+
830+
If this is set, the setting applies to all tasks in the pipeline.
831+
832+
Overrides the compile time settings.
833+
context_name (str):
834+
Optional. The name of metadata context. Used for cached execution reuse.
835+
display_name (str):
836+
Optional. The user-defined name of this Pipeline.
837+
labels (Dict[str, str]):
838+
Optional. The user defined metadata to organize PipelineJob.
839+
job_id (str):
840+
Optional. The unique ID of the job run.
841+
If not specified, pipeline name + timestamp will be used.
842+
843+
project (str):
844+
Optional. The project that you want to run this PipelineJob in. If not set,
845+
the project set in aiplatform.init will be used.
846+
location (str):
847+
Optional. Location to create PipelineJob. If not set,
848+
location set in aiplatform.init will be used.
849+
credentials (auth_credentials.Credentials):
850+
Optional. Custom credentials to use to create this PipelineJob.
851+
Overrides credentials set in aiplatform.init.
852+
encryption_spec_key_name (str):
853+
Optional. The Cloud KMS resource identifier of the customer
854+
managed encryption key used to protect the job. Has the
855+
form:
856+
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
857+
The key needs to be in the same region as where the compute
858+
resource is created.
859+
860+
If this is set, then all
861+
resources created by the PipelineJob will
862+
be encrypted with the provided encryption key.
863+
864+
Overrides encryption_spec_key_name set in aiplatform.init.
865+
866+
Returns:
867+
A Vertex AI PipelineJob.
868+
869+
Raises:
870+
ValueError: If job_id or labels have incorrect format.
871+
"""
872+
873+
# Importing the KFP module here to prevent import errors when the kfp package is not installed.
874+
try:
875+
from kfp.v2 import compiler as compiler_v2
876+
except ImportError as err:
877+
raise RuntimeError(
878+
"Cannot import the kfp.v2.compiler module. Please install or update the kfp package."
879+
) from err
880+
881+
automatic_display_name = " ".join(
882+
[
883+
pipeline_func.__name__.replace("_", " "),
884+
datetime.datetime.now().isoformat(sep=" "),
885+
]
886+
)
887+
display_name = display_name or automatic_display_name
888+
job_id = job_id or re.sub(
889+
r"[^-a-z0-9]", "-", automatic_display_name.lower()
890+
).strip("-")
891+
pipeline_file = tempfile.mktemp(suffix=".json")
892+
compiler_v2.Compiler().compile(
893+
pipeline_func=pipeline_func,
894+
pipeline_name=context_name,
895+
package_path=pipeline_file,
896+
)
897+
pipeline_job = PipelineJob(
898+
template_path=pipeline_file,
899+
parameter_values=parameter_values,
900+
pipeline_root=output_artifacts_gcs_dir,
901+
enable_caching=enable_caching,
902+
display_name=display_name,
903+
job_id=job_id,
904+
labels=labels,
905+
project=project,
906+
location=location,
907+
credentials=credentials,
908+
encryption_spec_key_name=encryption_spec_key_name,
909+
)
910+
return pipeline_job
911+
775912
def get_associated_experiment(self) -> Optional["aiplatform.Experiment"]:
776913
"""Gets the aiplatform.Experiment associated with this PipelineJob,
777914
or None if this PipelineJob is not associated with an experiment.

google/cloud/aiplatform/utils/gcs_utils.py

+102-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22

3-
# Copyright 2021 Google LLC
3+
# Copyright 2022 Google LLC
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@
2525
from google.cloud import storage
2626

2727
from google.cloud.aiplatform import initializer
28-
28+
from google.cloud.aiplatform.utils import resource_manager_utils
2929

3030
_logger = logging.getLogger(__name__)
3131

@@ -163,3 +163,103 @@ def stage_local_data_in_gcs(
163163
)
164164

165165
return staged_data_uri
166+
167+
168+
def generate_gcs_directory_for_pipeline_artifacts(
169+
project: Optional[str] = None,
170+
location: Optional[str] = None,
171+
):
172+
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.
173+
174+
Args:
175+
service_account: Optional. Google Cloud service account that will be used
176+
to run the pipelines. If this function creates a new bucket it will give
177+
permission to the specified service account to access the bucket.
178+
If not provided, the Google Cloud Compute Engine service account will be used.
179+
project: Optional. Google Cloud Project that contains the staging bucket.
180+
location: Optional. Google Cloud location to use for the staging bucket.
181+
182+
Returns:
183+
Google Cloud Storage URI of the staged data.
184+
"""
185+
project = project or initializer.global_config.project
186+
location = location or initializer.global_config.location
187+
188+
pipelines_bucket_name = project + "-vertex-pipelines-" + location
189+
output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/"
190+
return output_artifacts_gcs_dir
191+
192+
193+
def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
194+
output_artifacts_gcs_dir: Optional[str] = None,
195+
service_account: Optional[str] = None,
196+
project: Optional[str] = None,
197+
location: Optional[str] = None,
198+
credentials: Optional[auth_credentials.Credentials] = None,
199+
):
200+
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.
201+
202+
Args:
203+
output_artifacts_gcs_dir: Optional. The GCS location for the pipeline outputs.
204+
It will be generated if not specified.
205+
service_account: Optional. Google Cloud service account that will be used
206+
to run the pipelines. If this function creates a new bucket it will give
207+
permission to the specified service account to access the bucket.
208+
If not provided, the Google Cloud Compute Engine service account will be used.
209+
project: Optional. Google Cloud Project that contains the staging bucket.
210+
location: Optional. Google Cloud location to use for the staging bucket.
211+
credentials: The custom credentials to use when making API calls.
212+
If not provided, default credentials will be used.
213+
214+
Returns:
215+
Google Cloud Storage URI of the staged data.
216+
"""
217+
project = project or initializer.global_config.project
218+
location = location or initializer.global_config.location
219+
credentials = credentials or initializer.global_config.credentials
220+
221+
output_artifacts_gcs_dir = (
222+
output_artifacts_gcs_dir
223+
or generate_gcs_directory_for_pipeline_artifacts(
224+
project=project,
225+
location=location,
226+
)
227+
)
228+
229+
# Creating the bucket if needed
230+
storage_client = storage.Client(
231+
project=project,
232+
credentials=credentials,
233+
)
234+
235+
pipelines_bucket = storage.Blob.from_string(
236+
uri=output_artifacts_gcs_dir,
237+
client=storage_client,
238+
).bucket
239+
240+
if not pipelines_bucket.exists():
241+
_logger.info(
242+
f'Creating GCS bucket for Vertex Pipelines: "{pipelines_bucket.name}"'
243+
)
244+
pipelines_bucket = storage_client.create_bucket(
245+
bucket_or_name=pipelines_bucket,
246+
project=project,
247+
location=location,
248+
)
249+
# Giving the service account read and write access to the new bucket
250+
# Workaround for error: "Failed to create pipeline job. Error: Service account `[email protected]`
251+
# does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`.
252+
# Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account."
253+
if not service_account:
254+
# Getting the project number to use in service account
255+
project_number = resource_manager_utils.get_project_number(project)
256+
service_account = f"{project_number}[email protected]"
257+
bucket_iam_policy = pipelines_bucket.get_iam_policy()
258+
bucket_iam_policy.setdefault("roles/storage.objectCreator", set()).add(
259+
f"serviceAccount:{service_account}"
260+
)
261+
bucket_iam_policy.setdefault("roles/storage.objectViewer", set()).add(
262+
f"serviceAccount:{service_account}"
263+
)
264+
pipelines_bucket.set_iam_policy(bucket_iam_policy)
265+
return output_artifacts_gcs_dir

google/cloud/aiplatform/utils/resource_manager_utils.py

+27
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,30 @@ def get_project_id(
4848
project = projects_client.get_project(name=f"projects/{project_number}")
4949

5050
return project.project_id
51+
52+
53+
def get_project_number(
54+
project_id: str,
55+
credentials: Optional[auth_credentials.Credentials] = None,
56+
) -> str:
57+
"""Gets project ID given the project number
58+
59+
Args:
60+
project_id (str):
61+
Required. Google Cloud project unique ID.
62+
credentials: The custom credentials to use when making API calls.
63+
Optional. If not provided, default credentials will be used.
64+
65+
Returns:
66+
str - The automatically generated unique numerical identifier for your GCP project.
67+
68+
"""
69+
70+
credentials = credentials or initializer.global_config.credentials
71+
72+
projects_client = resourcemanager.ProjectsClient(credentials=credentials)
73+
74+
project = projects_client.get_project(name=f"projects/{project_id}")
75+
project_number = project.name.split("/", 1)[1]
76+
77+
return project_number
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2022 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import pytest
19+
20+
from google.cloud import aiplatform
21+
from tests.system.aiplatform import e2e_base
22+
23+
24+
@pytest.mark.usefixtures("tear_down_resources")
25+
class TestPipelineJob(e2e_base.TestEndToEnd):
26+
27+
_temp_prefix = "tmpvrtxsdk-e2e"
28+
29+
def test_add_pipeline_job_to_experiment(self, shared_state):
30+
from kfp import components
31+
32+
# Components:
33+
def train(
34+
number_of_epochs: int,
35+
learning_rate: float,
36+
):
37+
print(f"number_of_epochs={number_of_epochs}")
38+
print(f"learning_rate={learning_rate}")
39+
40+
train_op = components.create_component_from_func(train)
41+
42+
# Pipeline:
43+
def training_pipeline(number_of_epochs: int = 10):
44+
train_op(
45+
number_of_epochs=number_of_epochs,
46+
learning_rate="0.1",
47+
)
48+
49+
# Submitting the pipeline:
50+
aiplatform.init(
51+
project=e2e_base._PROJECT,
52+
location=e2e_base._LOCATION,
53+
)
54+
job = aiplatform.PipelineJob.from_pipeline_func(
55+
pipeline_func=training_pipeline,
56+
)
57+
job.submit()
58+
59+
shared_state.setdefault("resources", []).append(job)
60+
61+
job.wait()

0 commit comments

Comments
 (0)