Skip to content

Commit 095717c

Browse files
munagekarsararob
andauthored
feat: support autoscaling metrics when deploying models (#1197)
* feat: support autoscaling metrics when deploying models * feat: support model deploy to endpoint with autoscaling metrics * fix autoscaling_target_accelerator_duty_cycle check * fix docstring: specify that autoscaling_params are optional * bug fix: add autoscaling_target_cpu_utilization to custom_resource_spec * add tests * add _TEST_METRIC_NAME_CPU_UTILIZATION and _TEST_METRIC_NAME_GPU_UTILIZATION * remove not required arguments in tests * fix tests: wait for LRO to complete even if not sync * fix lint: run black Co-authored-by: Sara Robinson <[email protected]>
1 parent 15bc80b commit 095717c

File tree

2 files changed

+234
-9
lines changed

2 files changed

+234
-9
lines changed

google/cloud/aiplatform/models.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ def deploy(
643643
metadata: Optional[Sequence[Tuple[str, str]]] = (),
644644
sync=True,
645645
deploy_request_timeout: Optional[float] = None,
646+
autoscaling_target_cpu_utilization: Optional[int] = None,
647+
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
646648
) -> None:
647649
"""Deploys a Model to the Endpoint.
648650
@@ -716,6 +718,13 @@ def deploy(
716718
be immediately returned and synced when the Future has completed.
717719
deploy_request_timeout (float):
718720
Optional. The timeout for the deploy request in seconds.
721+
autoscaling_target_cpu_utilization (int):
722+
Target CPU Utilization to use for Autoscaling Replicas.
723+
A default value of 60 will be used if not specified.
724+
autoscaling_target_accelerator_duty_cycle (int):
725+
Target Accelerator Duty Cycle.
726+
Must also set accelerator_type and accelerator_count if specified.
727+
A default value of 60 will be used if not specified.
719728
"""
720729
self._sync_gca_resource_if_skipped()
721730

@@ -746,6 +755,8 @@ def deploy(
746755
metadata=metadata,
747756
sync=sync,
748757
deploy_request_timeout=deploy_request_timeout,
758+
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
759+
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
749760
)
750761

751762
@base.optional_sync()
@@ -766,6 +777,8 @@ def _deploy(
766777
metadata: Optional[Sequence[Tuple[str, str]]] = (),
767778
sync=True,
768779
deploy_request_timeout: Optional[float] = None,
780+
autoscaling_target_cpu_utilization: Optional[int] = None,
781+
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
769782
) -> None:
770783
"""Deploys a Model to the Endpoint.
771784
@@ -839,6 +852,13 @@ def _deploy(
839852
be immediately returned and synced when the Future has completed.
840853
deploy_request_timeout (float):
841854
Optional. The timeout for the deploy request in seconds.
855+
autoscaling_target_cpu_utilization (int):
856+
Target CPU Utilization to use for Autoscaling Replicas.
857+
A default value of 60 will be used if not specified.
858+
autoscaling_target_accelerator_duty_cycle (int):
859+
Target Accelerator Duty Cycle.
860+
Must also set accelerator_type and accelerator_count if specified.
861+
A default value of 60 will be used if not specified.
842862
Raises:
843863
ValueError: If there is not current traffic split and traffic percentage
844864
is not 0 or 100.
@@ -865,6 +885,8 @@ def _deploy(
865885
explanation_parameters=explanation_parameters,
866886
metadata=metadata,
867887
deploy_request_timeout=deploy_request_timeout,
888+
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
889+
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
868890
)
869891

870892
_LOGGER.log_action_completed_against_resource("model", "deployed", self)
@@ -891,6 +913,8 @@ def _deploy_call(
891913
explanation_parameters: Optional[explain.ExplanationParameters] = None,
892914
metadata: Optional[Sequence[Tuple[str, str]]] = (),
893915
deploy_request_timeout: Optional[float] = None,
916+
autoscaling_target_cpu_utilization: Optional[int] = None,
917+
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
894918
):
895919
"""Helper method to deploy model to endpoint.
896920
@@ -964,6 +988,13 @@ def _deploy_call(
964988
be immediately returned and synced when the Future has completed.
965989
deploy_request_timeout (float):
966990
Optional. The timeout for the deploy request in seconds.
991+
autoscaling_target_cpu_utilization (int):
992+
Optional. Target CPU Utilization to use for Autoscaling Replicas.
993+
A default value of 60 will be used if not specified.
994+
autoscaling_target_accelerator_duty_cycle (int):
995+
Optional. Target Accelerator Duty Cycle.
996+
Must also set accelerator_type and accelerator_count if specified.
997+
A default value of 60 will be used if not specified.
967998
Raises:
968999
ValueError: If there is not current traffic split and traffic percentage
9691000
is not 0 or 100.
@@ -979,6 +1010,14 @@ def _deploy_call(
9791010
"Both `accelerator_type` and `accelerator_count` should be specified or None."
9801011
)
9811012

1013+
if autoscaling_target_accelerator_duty_cycle is not None and (
1014+
not accelerator_type or not accelerator_count
1015+
):
1016+
raise ValueError(
1017+
"Both `accelerator_type` and `accelerator_count` should be set "
1018+
"when specifying autoscaling_target_accelerator_duty_cycle`"
1019+
)
1020+
9821021
deployed_model = gca_endpoint_compat.DeployedModel(
9831022
model=model.resource_name,
9841023
display_name=deployed_model_display_name,
@@ -994,7 +1033,11 @@ def _deploy_call(
9941033
in model.supported_deployment_resources_types
9951034
)
9961035
provided_custom_machine_spec = (
997-
machine_type or accelerator_type or accelerator_count
1036+
machine_type
1037+
or accelerator_type
1038+
or accelerator_count
1039+
or autoscaling_target_accelerator_duty_cycle
1040+
or autoscaling_target_cpu_utilization
9981041
)
9991042

10001043
# If the model supports both automatic and dedicated deployment resources,
@@ -1006,30 +1049,51 @@ def _deploy_call(
10061049
if provided_custom_machine_spec and not use_dedicated_resources:
10071050
_LOGGER.info(
10081051
"Model does not support dedicated deployment resources. "
1009-
"The machine_type, accelerator_type and accelerator_count parameters are ignored."
1052+
"The machine_type, accelerator_type and accelerator_count,"
1053+
"autoscaling_target_accelerator_duty_cycle,"
1054+
"autoscaling_target_cpu_utilization parameters are ignored."
10101055
)
10111056

10121057
if use_dedicated_resources and not machine_type:
10131058
machine_type = _DEFAULT_MACHINE_TYPE
10141059
_LOGGER.info(f"Using default machine_type: {machine_type}")
10151060

10161061
if use_dedicated_resources:
1062+
1063+
dedicated_resources = gca_machine_resources_compat.DedicatedResources(
1064+
min_replica_count=min_replica_count,
1065+
max_replica_count=max_replica_count,
1066+
)
1067+
10171068
machine_spec = gca_machine_resources_compat.MachineSpec(
10181069
machine_type=machine_type
10191070
)
10201071

1072+
if autoscaling_target_cpu_utilization:
1073+
autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec(
1074+
metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization",
1075+
target=autoscaling_target_cpu_utilization,
1076+
)
1077+
dedicated_resources.autoscaling_metric_specs.extend(
1078+
[autoscaling_metric_spec]
1079+
)
1080+
10211081
if accelerator_type and accelerator_count:
10221082
utils.validate_accelerator_type(accelerator_type)
10231083
machine_spec.accelerator_type = accelerator_type
10241084
machine_spec.accelerator_count = accelerator_count
10251085

1026-
deployed_model.dedicated_resources = (
1027-
gca_machine_resources_compat.DedicatedResources(
1028-
machine_spec=machine_spec,
1029-
min_replica_count=min_replica_count,
1030-
max_replica_count=max_replica_count,
1031-
)
1032-
)
1086+
if autoscaling_target_accelerator_duty_cycle:
1087+
autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec(
1088+
metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle",
1089+
target=autoscaling_target_accelerator_duty_cycle,
1090+
)
1091+
dedicated_resources.autoscaling_metric_specs.extend(
1092+
[autoscaling_metric_spec]
1093+
)
1094+
1095+
dedicated_resources.machine_spec = machine_spec
1096+
deployed_model.dedicated_resources = dedicated_resources
10331097

10341098
elif supports_automatic_resources:
10351099
deployed_model.automatic_resources = (
@@ -1994,6 +2058,8 @@ def deploy(
19942058
encryption_spec_key_name: Optional[str] = None,
19952059
sync=True,
19962060
deploy_request_timeout: Optional[float] = None,
2061+
autoscaling_target_cpu_utilization: Optional[int] = None,
2062+
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
19972063
) -> Endpoint:
19982064
"""Deploys model to endpoint. Endpoint will be created if unspecified.
19992065
@@ -2078,6 +2144,13 @@ def deploy(
20782144
be immediately returned and synced when the Future has completed.
20792145
deploy_request_timeout (float):
20802146
Optional. The timeout for the deploy request in seconds.
2147+
autoscaling_target_cpu_utilization (int):
2148+
Optional. Target CPU Utilization to use for Autoscaling Replicas.
2149+
A default value of 60 will be used if not specified.
2150+
autoscaling_target_accelerator_duty_cycle (int):
2151+
Optional. Target Accelerator Duty Cycle.
2152+
Must also set accelerator_type and accelerator_count if specified.
2153+
A default value of 60 will be used if not specified.
20812154
Returns:
20822155
endpoint ("Endpoint"):
20832156
Endpoint with the deployed model.
@@ -2112,6 +2185,8 @@ def deploy(
21122185
or initializer.global_config.encryption_spec_key_name,
21132186
sync=sync,
21142187
deploy_request_timeout=deploy_request_timeout,
2188+
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
2189+
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
21152190
)
21162191

21172192
@base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False)
@@ -2133,6 +2208,8 @@ def _deploy(
21332208
encryption_spec_key_name: Optional[str] = None,
21342209
sync: bool = True,
21352210
deploy_request_timeout: Optional[float] = None,
2211+
autoscaling_target_cpu_utilization: Optional[int] = None,
2212+
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
21362213
) -> Endpoint:
21372214
"""Deploys model to endpoint. Endpoint will be created if unspecified.
21382215
@@ -2217,6 +2294,13 @@ def _deploy(
22172294
be immediately returned and synced when the Future has completed.
22182295
deploy_request_timeout (float):
22192296
Optional. The timeout for the deploy request in seconds.
2297+
autoscaling_target_cpu_utilization (int):
2298+
Optional. Target CPU Utilization to use for Autoscaling Replicas.
2299+
A default value of 60 will be used if not specified.
2300+
autoscaling_target_accelerator_duty_cycle (int):
2301+
Optional. Target Accelerator Duty Cycle.
2302+
Must also set accelerator_type and accelerator_count if specified.
2303+
A default value of 60 will be used if not specified.
22202304
Returns:
22212305
endpoint ("Endpoint"):
22222306
Endpoint with the deployed model.
@@ -2252,6 +2336,8 @@ def _deploy(
22522336
explanation_parameters=explanation_parameters,
22532337
metadata=metadata,
22542338
deploy_request_timeout=deploy_request_timeout,
2339+
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
2340+
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
22552341
)
22562342

22572343
_LOGGER.log_action_completed_against_resource("model", "deployed", endpoint)

tests/unit/aiplatform/test_endpoints.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@
103103
_TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_P100"
104104
_TEST_ACCELERATOR_COUNT = 2
105105

106+
_TEST_METRIC_NAME_CPU_UTILIZATION = (
107+
"aiplatform.googleapis.com/prediction/online/cpu/utilization"
108+
)
109+
_TEST_METRIC_NAME_GPU_UTILIZATION = (
110+
"aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle"
111+
)
112+
106113
_TEST_EXPLANATIONS = [gca_prediction_service.explanation.Explanation(attributions=[])]
107114

108115
_TEST_ATTRIBUTIONS = [
@@ -1054,6 +1061,138 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync):
10541061
timeout=None,
10551062
)
10561063

1064+
@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
1065+
@pytest.mark.parametrize("sync", [True, False])
1066+
def test_deploy_with_autoscaling_target_cpu_utilization(
1067+
self, deploy_model_mock, sync
1068+
):
1069+
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
1070+
test_model = models.Model(_TEST_ID)
1071+
test_model._gca_resource.supported_deployment_resources_types.append(
1072+
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
1073+
)
1074+
test_endpoint.deploy(
1075+
model=test_model,
1076+
machine_type=_TEST_MACHINE_TYPE,
1077+
service_account=_TEST_SERVICE_ACCOUNT,
1078+
sync=sync,
1079+
deploy_request_timeout=None,
1080+
autoscaling_target_cpu_utilization=70,
1081+
)
1082+
1083+
if not sync:
1084+
test_endpoint.wait()
1085+
1086+
expected_machine_spec = gca_machine_resources.MachineSpec(
1087+
machine_type=_TEST_MACHINE_TYPE,
1088+
)
1089+
1090+
expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec(
1091+
metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION,
1092+
target=70,
1093+
)
1094+
1095+
expected_dedicated_resources = gca_machine_resources.DedicatedResources(
1096+
machine_spec=expected_machine_spec,
1097+
min_replica_count=1,
1098+
max_replica_count=1,
1099+
)
1100+
expected_dedicated_resources.autoscaling_metric_specs.extend(
1101+
[expected_autoscaling_metric_spec]
1102+
)
1103+
1104+
expected_deployed_model = gca_endpoint.DeployedModel(
1105+
dedicated_resources=expected_dedicated_resources,
1106+
model=test_model.resource_name,
1107+
display_name=None,
1108+
service_account=_TEST_SERVICE_ACCOUNT,
1109+
)
1110+
deploy_model_mock.assert_called_once_with(
1111+
endpoint=test_endpoint.resource_name,
1112+
deployed_model=expected_deployed_model,
1113+
traffic_split={"0": 100},
1114+
metadata=(),
1115+
timeout=None,
1116+
)
1117+
1118+
@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
1119+
@pytest.mark.parametrize("sync", [True, False])
1120+
def test_deploy_with_autoscaling_target_accelerator_duty_cycle(
1121+
self, deploy_model_mock, sync
1122+
):
1123+
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
1124+
test_model = models.Model(_TEST_ID)
1125+
test_model._gca_resource.supported_deployment_resources_types.append(
1126+
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
1127+
)
1128+
test_endpoint.deploy(
1129+
model=test_model,
1130+
machine_type=_TEST_MACHINE_TYPE,
1131+
accelerator_type=_TEST_ACCELERATOR_TYPE,
1132+
accelerator_count=_TEST_ACCELERATOR_COUNT,
1133+
service_account=_TEST_SERVICE_ACCOUNT,
1134+
sync=sync,
1135+
deploy_request_timeout=None,
1136+
autoscaling_target_accelerator_duty_cycle=70,
1137+
)
1138+
1139+
if not sync:
1140+
test_endpoint.wait()
1141+
1142+
expected_machine_spec = gca_machine_resources.MachineSpec(
1143+
machine_type=_TEST_MACHINE_TYPE,
1144+
accelerator_type=_TEST_ACCELERATOR_TYPE,
1145+
accelerator_count=_TEST_ACCELERATOR_COUNT,
1146+
)
1147+
1148+
expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec(
1149+
metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION,
1150+
target=70,
1151+
)
1152+
1153+
expected_dedicated_resources = gca_machine_resources.DedicatedResources(
1154+
machine_spec=expected_machine_spec,
1155+
min_replica_count=1,
1156+
max_replica_count=1,
1157+
)
1158+
expected_dedicated_resources.autoscaling_metric_specs.extend(
1159+
[expected_autoscaling_metric_spec]
1160+
)
1161+
1162+
expected_deployed_model = gca_endpoint.DeployedModel(
1163+
dedicated_resources=expected_dedicated_resources,
1164+
model=test_model.resource_name,
1165+
display_name=None,
1166+
service_account=_TEST_SERVICE_ACCOUNT,
1167+
)
1168+
deploy_model_mock.assert_called_once_with(
1169+
endpoint=test_endpoint.resource_name,
1170+
deployed_model=expected_deployed_model,
1171+
traffic_split={"0": 100},
1172+
metadata=(),
1173+
timeout=None,
1174+
)
1175+
1176+
@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
1177+
@pytest.mark.parametrize("sync", [True, False])
1178+
def test_deploy_with_autoscaling_target_accelerator_duty_cycle_and_no_accelerator_type_or_count_raises(
1179+
self, sync
1180+
):
1181+
with pytest.raises(ValueError):
1182+
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
1183+
test_model = models.Model(_TEST_ID)
1184+
test_model._gca_resource.supported_deployment_resources_types.append(
1185+
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
1186+
)
1187+
test_endpoint.deploy(
1188+
model=test_model,
1189+
sync=sync,
1190+
autoscaling_target_accelerator_duty_cycle=70,
1191+
)
1192+
1193+
if not sync:
1194+
test_endpoint.wait()
1195+
10571196
@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
10581197
@pytest.mark.parametrize("sync", [True, False])
10591198
def test_deploy_with_explanations(self, deploy_model_with_explanations_mock, sync):

0 commit comments

Comments
 (0)