Skip to content

Commit 4f015f3

Browse files
Merge remote-tracking branch 'upstream/main' into feature/batch-prediction/service-account
2 parents 553e04c + a6a792e commit 4f015f3

File tree

5 files changed

+110
-19
lines changed

5 files changed

+110
-19
lines changed

google/cloud/aiplatform/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,8 @@ def _construct_sdk_resource_from_gapic(
557557
location=endpoint.location,
558558
credentials=credentials,
559559
)
560+
endpoint.authorized_session = None
561+
endpoint.raw_predict_request_url = None
560562

561563
return endpoint
562564

google/cloud/aiplatform/utils/gcs_utils.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,6 @@ def generate_gcs_directory_for_pipeline_artifacts(
172172
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.
173173
174174
Args:
175-
service_account: Optional. Google Cloud service account that will be used
176-
to run the pipelines. If this function creates a new bucket it will give
177-
permission to the specified service account to access the bucket.
178-
If not provided, the Google Cloud Compute Engine service account will be used.
179175
project: Optional. Google Cloud Project that contains the staging bucket.
180176
location: Optional. Google Cloud location to use for the staging bucket.
181177

tests/system/aiplatform/test_batch_prediction.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242

4343
class TestBatchPredictionJob(e2e_base.TestEndToEnd):
4444
_temp_prefix = "temp_e2e_batch_prediction_test_"
45-
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
46-
model = aiplatform.Model(_PERMANENT_CHURN_MODEL_ID)
4745

4846
def test_model_monitoring(self):
47+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
48+
model = aiplatform.Model(_PERMANENT_CHURN_MODEL_ID)
4949
skew_detection_config = aiplatform.model_monitoring.SkewDetectionConfig(
5050
data_source=_PERMANENT_CHURN_TRAINING_DATA,
5151
target_field="churned",
@@ -62,7 +62,7 @@ def test_model_monitoring(self):
6262

6363
bpj = aiplatform.BatchPredictionJob.create(
6464
job_display_name=self._make_display_name(key=_TEST_JOB_DISPLAY_NAME),
65-
model_name=self.model,
65+
model_name=model,
6666
gcs_source=_PERMANENT_CHURN_TESTING_DATA,
6767
gcs_destination_prefix=_PERMANENT_CHURN_GS_DEST,
6868
machine_type=_TEST_MACHINE_TYPE,

tests/system/aiplatform/test_model_monitoring.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
# constants used for testing
3333
USER_EMAIL = "[email protected]"
34-
PERMANENT_CHURN_ENDPOINT_ID = "1843089351408353280"
34+
PERMANENT_CHURN_MODEL_ID = "5295507484113371136"
3535
CHURN_MODEL_PATH = "gs://mco-mm/churn"
3636
DEFAULT_INPUT = {
3737
"cnt_ad_reward": 0,
@@ -117,15 +117,26 @@
117117
objective_config2 = model_monitoring.ObjectiveConfig(skew_config, drift_config2)
118118

119119

120+
@pytest.mark.usefixtures("tear_down_resources")
120121
class TestModelDeploymentMonitoring(e2e_base.TestEndToEnd):
121122
_temp_prefix = "temp_e2e_model_monitoring_test_"
122-
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
123-
endpoint = aiplatform.Endpoint(PERMANENT_CHURN_ENDPOINT_ID)
124123

125-
def test_mdm_two_models_one_valid_config(self):
124+
def test_create_endpoint(self, shared_state):
125+
# initial setup
126+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
127+
self.endpoint = aiplatform.Endpoint.create(self._make_display_name("endpoint"))
128+
shared_state["resources"] = [self.endpoint]
129+
self.model = aiplatform.Model(PERMANENT_CHURN_MODEL_ID)
130+
self.endpoint.deploy(self.model)
131+
self.endpoint.deploy(self.model, traffic_percentage=50)
132+
133+
def test_mdm_two_models_one_valid_config(self, shared_state):
126134
"""
127135
Enable model monitoring on two existing models deployed to the same endpoint.
128136
"""
137+
assert len(shared_state["resources"]) == 1
138+
self.endpoint = shared_state["resources"][0]
139+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
129140
# test model monitoring configurations
130141
job = aiplatform.ModelDeploymentMonitoringJob.create(
131142
display_name=self._make_display_name(key=JOB_NAME),
@@ -153,6 +164,7 @@ def test_mdm_two_models_one_valid_config(self):
153164
== [USER_EMAIL]
154165
)
155166
assert gapic_job.model_monitoring_alert_config.enable_logging
167+
assert len(gapic_job.model_deployment_monitoring_objective_configs) == 2
156168

157169
gca_obj_config = gapic_job.model_deployment_monitoring_objective_configs[
158170
0
@@ -181,8 +193,11 @@ def test_mdm_two_models_one_valid_config(self):
181193
with pytest.raises(core_exceptions.NotFound):
182194
job.api_client.get_model_deployment_monitoring_job(name=job_resource)
183195

184-
def test_mdm_pause_and_update_config(self):
196+
def test_mdm_pause_and_update_config(self, shared_state):
185197
"""Test objective config updates for existing MDM job"""
198+
assert len(shared_state["resources"]) == 1
199+
self.endpoint = shared_state["resources"][0]
200+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
186201
job = aiplatform.ModelDeploymentMonitoringJob.create(
187202
display_name=self._make_display_name(key=JOB_NAME),
188203
logging_sampling_strategy=sampling_strategy,
@@ -245,7 +260,10 @@ def test_mdm_pause_and_update_config(self):
245260
with pytest.raises(core_exceptions.NotFound):
246261
job.state
247262

248-
def test_mdm_two_models_two_valid_configs(self):
263+
def test_mdm_two_models_two_valid_configs(self, shared_state):
264+
assert len(shared_state["resources"]) == 1
265+
self.endpoint = shared_state["resources"][0]
266+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
249267
[deployed_model1, deployed_model2] = list(
250268
map(lambda x: x.id, self.endpoint.list_models())
251269
)
@@ -302,7 +320,10 @@ def test_mdm_two_models_two_valid_configs(self):
302320

303321
job.delete()
304322

305-
def test_mdm_invalid_config_incorrect_model_id(self):
323+
def test_mdm_invalid_config_incorrect_model_id(self, shared_state):
324+
assert len(shared_state["resources"]) == 1
325+
self.endpoint = shared_state["resources"][0]
326+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
306327
with pytest.raises(ValueError) as e:
307328
aiplatform.ModelDeploymentMonitoringJob.create(
308329
display_name=self._make_display_name(key=JOB_NAME),
@@ -318,7 +339,10 @@ def test_mdm_invalid_config_incorrect_model_id(self):
318339
)
319340
assert "Invalid model ID" in str(e.value)
320341

321-
def test_mdm_invalid_config_xai(self):
342+
def test_mdm_invalid_config_xai(self, shared_state):
343+
assert len(shared_state["resources"]) == 1
344+
self.endpoint = shared_state["resources"][0]
345+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
322346
with pytest.raises(RuntimeError) as e:
323347
objective_config.explanation_config = model_monitoring.ExplanationConfig()
324348
aiplatform.ModelDeploymentMonitoringJob.create(
@@ -337,7 +361,10 @@ def test_mdm_invalid_config_xai(self):
337361
in str(e.value)
338362
)
339363

340-
def test_mdm_two_models_invalid_configs_xai(self):
364+
def test_mdm_two_models_invalid_configs_xai(self, shared_state):
365+
assert len(shared_state["resources"]) == 1
366+
self.endpoint = shared_state["resources"][0]
367+
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
341368
[deployed_model1, deployed_model2] = list(
342369
map(lambda x: x.id, self.endpoint.list_models())
343370
)

tests/unit/aiplatform/test_utils.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from google.cloud.aiplatform import compat, utils
3737
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
3838
from google.cloud.aiplatform.utils import (
39+
gcs_utils,
3940
pipeline_utils,
4041
prediction_utils,
4142
tensorboard_utils,
@@ -52,9 +53,10 @@
5253
model_service_client_default = model_service_client_v1
5354

5455

55-
GCS_BUCKET = "FAKE_BUCKET"
56-
GCS_PREFIX = "FAKE/PREFIX"
57-
FAKE_FILENAME = "FAKE_FILENAME"
56+
GCS_BUCKET = "fake-bucket"
57+
GCS_PREFIX = "fake/prefix"
58+
FAKE_FILENAME = "fake-filename"
59+
EXPECTED_TIME = datetime.datetime(2023, 1, 6, 8, 54, 41, 734495)
5860

5961

6062
@pytest.fixture
@@ -78,6 +80,31 @@ def get_blobs(prefix):
7880
yield mock_storage_client
7981

8082

83+
@pytest.fixture()
84+
def mock_datetime():
85+
with patch.object(datetime, "datetime", autospec=True) as mock_datetime:
86+
mock_datetime.now.return_value = EXPECTED_TIME
87+
yield mock_datetime
88+
89+
90+
@pytest.fixture
91+
def mock_storage_blob_upload_from_filename():
92+
with patch(
93+
"google.cloud.storage.Blob.upload_from_filename"
94+
) as mock_blob_upload_from_filename, patch(
95+
"google.cloud.storage.Bucket.exists", return_value=True
96+
):
97+
yield mock_blob_upload_from_filename
98+
99+
100+
@pytest.fixture()
101+
def mock_bucket_not_exist():
102+
with patch("google.cloud.storage.Blob.from_string") as mock_bucket_not_exist, patch(
103+
"google.cloud.storage.Bucket.exists", return_value=False
104+
):
105+
yield mock_bucket_not_exist
106+
107+
81108
def test_invalid_region_raises_with_invalid_region():
82109
with pytest.raises(ValueError):
83110
aiplatform.utils.validate_region(region="us-east5")
@@ -458,6 +485,45 @@ def test_timestamped_unique_name():
458485
assert re.match(r"\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-.{5}", name)
459486

460487

488+
@pytest.mark.usefixtures("google_auth_mock")
489+
class TestGcsUtils:
490+
def test_upload_to_gcs(self, json_file, mock_storage_blob_upload_from_filename):
491+
gcs_utils.upload_to_gcs(json_file, f"gs://{GCS_BUCKET}/{GCS_PREFIX}")
492+
assert mock_storage_blob_upload_from_filename.called_once_with(json_file)
493+
494+
def test_stage_local_data_in_gcs(
495+
self, json_file, mock_datetime, mock_storage_blob_upload_from_filename
496+
):
497+
timestamp = EXPECTED_TIME.isoformat(sep="-", timespec="milliseconds")
498+
staging_gcs_dir = f"gs://{GCS_BUCKET}/{GCS_PREFIX}"
499+
data_uri = gcs_utils.stage_local_data_in_gcs(json_file, staging_gcs_dir)
500+
assert mock_storage_blob_upload_from_filename.called_once_with(json_file)
501+
assert (
502+
data_uri
503+
== f"{staging_gcs_dir}/vertex_ai_auto_staging/{timestamp}/test.json"
504+
)
505+
506+
def test_generate_gcs_directory_for_pipeline_artifacts(self):
507+
output = gcs_utils.generate_gcs_directory_for_pipeline_artifacts(
508+
"project", "us-central1"
509+
)
510+
assert output == "gs://project-vertex-pipelines-us-central1/output_artifacts/"
511+
512+
def test_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
513+
self, mock_bucket_not_exist, mock_storage_client
514+
):
515+
output = (
516+
gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
517+
project="test-project", location="us-central1"
518+
)
519+
)
520+
assert mock_storage_client.called
521+
assert mock_bucket_not_exist.called
522+
assert (
523+
output == "gs://test-project-vertex-pipelines-us-central1/output_artifacts/"
524+
)
525+
526+
461527
class TestPipelineUtils:
462528
SAMPLE_JOB_SPEC = {
463529
"pipelineSpec": {

0 commit comments

Comments
 (0)