Skip to content

Commit f90c9cc

Browse files
yinghsienwucopybara-github
authored andcommitted
feat: Enable Ray cluster creation with custom_image for each Resource
PiperOrigin-RevId: 614291640
1 parent d6490ff commit f90c9cc

14 files changed

+535
-338
lines changed

google/cloud/aiplatform/preview/vertex_ray/client_builder.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,26 @@ def __init__(self, address: Optional[str]) -> None:
120120
)
121121
local_ray_verion = _validation_utils.get_local_ray_version()
122122
if cluster.ray_version != local_ray_verion:
123-
raise ValueError(
124-
f"[Ray on Vertex AI]: Local runtime has Ray version {local_ray_verion}, but the cluster runtime has {cluster.ray_version}. Please ensure that the Ray versions match."
125-
)
123+
if cluster.head_node_type.custom_image is None:
124+
install_ray_version = _validation_utils.SUPPORTED_RAY_VERSIONS.get(
125+
cluster.ray_version
126+
)
127+
logging.info(
128+
"[Ray on Vertex]: Local runtime has Ray version %s"
129+
", but the requested cluster runtime has %s. Please "
130+
"ensure that the Ray versions match for client connectivity. You may "
131+
'"pip install --user --force-reinstall ray[default]==%s"'
132+
" and restart runtime before cluster connection.",
133+
local_ray_verion,
134+
cluster.ray_version,
135+
install_ray_version,
136+
)
137+
else:
138+
logging.info(
139+
"[Ray on Vertex]: Local runtime has Ray version %s."
140+
"Please ensure that the Ray versions match for client connectivity.",
141+
local_ray_verion,
142+
)
126143
super().__init__(address)
127144

128145
def connect(self) -> _VertexRayClientContext:

google/cloud/aiplatform/preview/vertex_ray/cluster_init.py

+51-25
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343

4444
def create_ray_cluster(
4545
head_node_type: Optional[resources.Resources] = resources.Resources(),
46-
python_version: Optional[str] = "3_10",
47-
ray_version: Optional[str] = "2_4",
46+
python_version: Optional[str] = "3.10",
47+
ray_version: Optional[str] = "2.9",
4848
network: Optional[str] = None,
4949
cluster_name: Optional[str] = None,
5050
worker_node_types: Optional[List[resources.Resources]] = None,
@@ -62,19 +62,22 @@ def create_ray_cluster(
6262
node_count=1,
6363
accelerator_type="NVIDIA_TESLA_K80",
6464
accelerator_count=1,
65+
custom_image="us-docker.pkg.dev/my-project/ray-cpu-image.2.9:latest", # Optional
6566
)
6667
6768
worker_node_types = [Resources(
6869
machine_type="n1-standard-8",
6970
node_count=2,
7071
accelerator_type="NVIDIA_TESLA_K80",
7172
accelerator_count=1,
73+
custom_image="us-docker.pkg.dev/my-project/ray-gpu-image.2.9:latest", # Optional
7274
)]
7375
7476
cluster_resource_name = vertex_ray.create_ray_cluster(
7577
head_node_type=head_node_type,
7678
network="projects/my-project-number/global/networks/my-vpc-name",
7779
worker_node_types=worker_node_types,
80+
ray_version="2.9",
7881
)
7982
8083
After a ray cluster is set up, you can call
@@ -100,7 +103,10 @@ def create_ray_cluster(
100103
worker_node_types: The list of Resources of the worker nodes. The same
101104
Resources object should not appear multiple times in the list.
102105
custom_images: The NodeImages which specifies head node and worker nodes
103-
images. Allowlist only.
106+
images. All the workers will share the same image. If each Resource
107+
has a specific custom image, use `Resources.custom_image` for
108+
head/worker_node_type(s). Note that configuring `Resources.custom_image`
109+
will override `custom_images` here. Allowlist only.
104110
labels:
105111
The labels with user-defined metadata to organize Ray cluster.
106112
@@ -121,14 +127,24 @@ def create_ray_cluster(
121127

122128
local_ray_verion = _validation_utils.get_local_ray_version()
123129
if ray_version != local_ray_verion:
124-
install_ray_version = ".".join(ray_version.split("_"))
125-
logging.info(
126-
f"[Ray on Vertex]: Local runtime has Ray version {local_ray_verion}"
127-
+ f", but the requested cluster runtime has {ray_version}. Please "
128-
+ "ensure that the Ray versions match for client connectivity. You may "
129-
+ f'"pip install --user --force-reinstall ray[default]=={install_ray_version}"'
130-
+ " and restart runtime before cluster connection."
131-
)
130+
if custom_images is None and head_node_type.custom_image is None:
131+
install_ray_version = "2.9.3" if ray_version == "2.9" else "2.4.0"
132+
logging.info(
133+
"[Ray on Vertex]: Local runtime has Ray version %s"
134+
", but the requested cluster runtime has %s. Please "
135+
"ensure that the Ray versions match for client connectivity. You may "
136+
'"pip install --user --force-reinstall ray[default]==%s"'
137+
" and restart runtime before cluster connection.",
138+
local_ray_verion,
139+
ray_version,
140+
install_ray_version,
141+
)
142+
else:
143+
logging.info(
144+
"[Ray on Vertex]: Local runtime has Ray version %s."
145+
"Please ensure that the Ray versions match for client connectivity.",
146+
local_ray_verion,
147+
)
132148

133149
if cluster_name is None:
134150
cluster_name = "ray-cluster-" + utils.timestamped_unique_name()
@@ -161,15 +177,18 @@ def create_ray_cluster(
161177
resource_pool_0.disk_spec.boot_disk_size_gb = head_node_type.boot_disk_size_gb
162178

163179
enable_cuda = True if head_node_type.accelerator_count > 0 else False
164-
image_uri = _validation_utils.get_image_uri(
165-
ray_version, python_version, enable_cuda
166-
)
167-
if custom_images is not None:
168-
if custom_images.head is None or custom_images.worker is None:
169-
raise ValueError(
170-
"[Ray on Vertex AI]: custom_images.head and custom_images.worker must be specified when custom_images is set."
171-
)
180+
if head_node_type.custom_image is not None:
181+
image_uri = head_node_type.custom_image
182+
elif custom_images is None:
183+
image_uri = _validation_utils.get_image_uri(
184+
ray_version, python_version, enable_cuda
185+
)
186+
elif custom_images.head is not None and custom_images.worker is not None:
172187
image_uri = custom_images.head
188+
else:
189+
raise ValueError(
190+
"[Ray on Vertex AI]: custom_images.head and custom_images.worker must be specified when custom_images is set."
191+
)
173192

174193
resource_pool_images[resource_pool_0.id] = image_uri
175194

@@ -210,11 +229,16 @@ def create_ray_cluster(
210229
)
211230
worker_pools.append(resource_pool)
212231
enable_cuda = True if worker_node_type.accelerator_count > 0 else False
213-
image_uri = _validation_utils.get_image_uri(
214-
ray_version, python_version, enable_cuda
215-
)
216-
if custom_images is not None:
232+
233+
if worker_node_type.custom_image is not None:
234+
image_uri = worker_node_type.custom_image
235+
elif custom_images is None:
236+
image_uri = _validation_utils.get_image_uri(
237+
ray_version, python_version, enable_cuda
238+
)
239+
else:
217240
image_uri = custom_images.worker
241+
218242
resource_pool_images[resource_pool.id] = image_uri
219243

220244
i += 1
@@ -395,8 +419,10 @@ def update_ray_cluster(
395419
if len(worker_node_types) != len(previous_worker_node_types):
396420
raise ValueError(
397421
"[Ray on Vertex AI]: Desired number of worker_node_types "
398-
+ f"({len(worker_node_types)}) does not match the number of the "
399-
+ f"existing worker_node_type({len(previous_worker_node_types)})."
422+
+ "(%i) does not match the number of the "
423+
+ "existing worker_node_type(%i).",
424+
len(worker_node_types),
425+
len(previous_worker_node_types),
400426
)
401427

402428
# Merge worker_node_type and head_node_type if they share

google/cloud/aiplatform/preview/vertex_ray/util/_gapic_utils.py

+35-21
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from google.cloud.aiplatform.preview.vertex_ray.util import _validation_utils
2929
from google.cloud.aiplatform.preview.vertex_ray.util.resources import (
3030
Cluster,
31-
NodeImages,
3231
Resources,
3332
)
3433
from google.cloud.aiplatform_v1beta1.types.persistent_resource import (
@@ -39,6 +38,10 @@
3938
)
4039

4140

41+
_PRIVATE_PREVIEW_IMAGE = "-docker.pkg.dev/vertex-ai/training/tf-"
42+
_OFFICIAL_IMAGE = "-docker.pkg.dev/vertex-ai/training/ray-"
43+
44+
4245
def create_persistent_resource_client():
4346
# location is inhereted from the global configuration at aiplatform.init().
4447
return initializer.global_config.create_client(
@@ -131,7 +134,7 @@ def get_persistent_resource(
131134

132135
def persistent_resource_to_cluster(
133136
persistent_resource: PersistentResource,
134-
) -> Cluster:
137+
) -> Optional[Cluster]:
135138
"""Format a PersistentResource to a dictionary.
136139
137140
Args:
@@ -156,51 +159,52 @@ def persistent_resource_to_cluster(
156159
persistent_resource.name,
157160
)
158161
return
162+
resource_pools = persistent_resource.resource_pools
159163

164+
head_resource_pool = resource_pools[0]
165+
head_id = head_resource_pool.id
160166
head_image_uri = (
161-
persistent_resource.resource_runtime_spec.ray_spec.resource_pool_images[
162-
"head-node"
163-
]
167+
persistent_resource.resource_runtime_spec.ray_spec.resource_pool_images[head_id]
164168
)
165-
worker_image_uri = (
166-
persistent_resource.resource_runtime_spec.ray_spec.resource_pool_images.get(
167-
"worker-pool1", None
168-
)
169-
)
170-
if worker_image_uri is None:
171-
worker_image_uri = head_image_uri
172169

173170
if not head_image_uri:
174171
head_image_uri = persistent_resource.resource_runtime_spec.ray_spec.image_uri
172+
175173
try:
176174
python_version, ray_version = _validation_utils.get_versions_from_image_uri(
177175
head_image_uri
178176
)
179177
except IndexError:
180-
logging.info(
181-
"[Ray on Vertex AI]: The image of cluster %s is outdated. It is recommended to delete and recreate the cluster to obtain the latest image.",
182-
persistent_resource.name,
183-
)
184-
return
178+
if _PRIVATE_PREVIEW_IMAGE in head_image_uri:
179+
# If using outdated images
180+
logging.info(
181+
"[Ray on Vertex AI]: The image of cluster %s is outdated. It is recommended to delete and recreate the cluster to obtain the latest image.",
182+
persistent_resource.name,
183+
)
184+
return None
185+
else:
186+
# Custom image might also cause IndexError
187+
python_version = None
188+
ray_version = None
185189
cluster.python_version = python_version
186190
cluster.ray_version = ray_version
187-
cluster.node_images = NodeImages(head=head_image_uri, worker=worker_image_uri)
188191

189-
resource_pools = persistent_resource.resource_pools
190-
191-
head_resource_pool = resource_pools[0]
192192
accelerator_type = head_resource_pool.machine_spec.accelerator_type
193193
if accelerator_type.value != 0:
194194
accelerator_type = accelerator_type.name
195195
else:
196196
accelerator_type = None
197+
if _OFFICIAL_IMAGE in head_image_uri:
198+
# Official training image is not custom
199+
head_image_uri = None
197200
head_node_type = Resources(
198201
machine_type=head_resource_pool.machine_spec.machine_type,
199202
accelerator_type=accelerator_type,
200203
accelerator_count=head_resource_pool.machine_spec.accelerator_count,
201204
boot_disk_type=head_resource_pool.disk_spec.boot_disk_type,
202205
boot_disk_size_gb=head_resource_pool.disk_spec.boot_disk_size_gb,
203206
node_count=1,
207+
custom_image=head_image_uri,
204208
)
205209
worker_node_types = []
206210
if head_resource_pool.replica_count > 1:
@@ -215,6 +219,7 @@ def persistent_resource_to_cluster(
215219
boot_disk_type=head_resource_pool.disk_spec.boot_disk_type,
216220
boot_disk_size_gb=head_resource_pool.disk_spec.boot_disk_size_gb,
217221
node_count=worker_node_count,
222+
custom_image=head_image_uri,
218223
)
219224
)
220225
for i in range(len(resource_pools) - 1):
@@ -225,6 +230,14 @@ def persistent_resource_to_cluster(
225230
accelerator_type = accelerator_type.name
226231
else:
227232
accelerator_type = None
233+
worker_image_uri = (
234+
persistent_resource.resource_runtime_spec.ray_spec.resource_pool_images[
235+
resource_pools[i + 1].id
236+
]
237+
)
238+
if _OFFICIAL_IMAGE in worker_image_uri:
239+
# Official training image is not custom
240+
worker_image_uri = None
228241
worker_node_types.append(
229242
Resources(
230243
machine_type=resource_pools[i + 1].machine_spec.machine_type,
@@ -233,6 +246,7 @@ def persistent_resource_to_cluster(
233246
boot_disk_type=resource_pools[i + 1].disk_spec.boot_disk_type,
234247
boot_disk_size_gb=resource_pools[i + 1].disk_spec.boot_disk_size_gb,
235248
node_count=resource_pools[i + 1].replica_count,
249+
custom_image=worker_image_uri,
236250
)
237251
)
238252

google/cloud/aiplatform/preview/vertex_ray/util/_validation_utils.py

+20-10
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import logging
2121
import ray
2222
import re
23+
from immutabledict import immutabledict
2324

2425
from google.cloud.aiplatform import initializer
2526
from google.cloud.aiplatform.utils import resource_manager_utils
2627

28+
SUPPORTED_RAY_VERSIONS = immutabledict({"2.4": "2.4.0", "2.9": "2.9.3"})
29+
SUPPORTED_PY_VERSION = ["3.10"]
2730

2831
# Artifact Repository available regions.
2932
_AVAILABLE_REGIONS = ["us", "europe", "asia"]
@@ -73,25 +76,28 @@ def get_local_ray_version():
7376
ray_version = ray.__version__.split(".")
7477
if len(ray_version) == 3:
7578
ray_version = ray_version[:2]
76-
return "_".join(ray_version)
79+
return ".".join(ray_version)
7780

7881

7982
def get_image_uri(ray_version, python_version, enable_cuda):
8083
"""Image uri for a given ray version and python version."""
81-
if ray_version not in ["2_4", "2_9"]:
84+
if ray_version not in SUPPORTED_RAY_VERSIONS:
8285
raise ValueError(
83-
"[Ray on Vertex AI]: The supported Ray versions are 2_4 (2.4.0) and 2_9 (2.9.3)."
86+
"[Ray on Vertex AI]: The supported Ray versions are %s (%s) and %s (%s).",
87+
list(SUPPORTED_RAY_VERSIONS.keys())[0],
88+
list(SUPPORTED_RAY_VERSIONS.values())[0],
89+
list(SUPPORTED_RAY_VERSIONS.keys())[1],
90+
list(SUPPORTED_RAY_VERSIONS.values())[1],
8491
)
85-
if python_version not in ["3_10"]:
86-
raise ValueError("[Ray on Vertex AI]: The supported Python version is 3_10.")
92+
if python_version not in SUPPORTED_PY_VERSION:
93+
raise ValueError("[Ray on Vertex AI]: The supported Python version is 3.10.")
8794

8895
location = initializer.global_config.location
8996
region = location.split("-")[0]
9097
if region not in _AVAILABLE_REGIONS:
9198
region = _DEFAULT_REGION
92-
ray_version = ray_version.replace("_", "-")
99+
ray_version = ray_version.replace(".", "-")
93100
if enable_cuda:
94-
# TODO(b/292003337) update eligible image uris
95101
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-gpu.{ray_version}.py310:latest"
96102
else:
97103
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-cpu.{ray_version}.py310:latest"
@@ -101,9 +107,13 @@ def get_versions_from_image_uri(image_uri):
101107
"""Get ray version and python version from image uri."""
102108
logging.info(f"[Ray on Vertex AI]: Getting versions from image uri: {image_uri}")
103109
image_label = image_uri.split("/")[-1].split(":")[0]
104-
py_version = image_label[-3] + "_" + image_label[-2:]
105-
ray_version = image_label.split(".")[1].replace("-", "_")
106-
return py_version, ray_version
110+
py_version = image_label[-3] + "." + image_label[-2:]
111+
ray_version = image_label.split(".")[1].replace("-", ".")
112+
if ray_version in SUPPORTED_RAY_VERSIONS and py_version in SUPPORTED_PY_VERSION:
113+
return py_version, ray_version
114+
else:
115+
# May not parse custom image and get the versions correctly
116+
return None, None
107117

108118

109119
def valid_dashboard_address(address):

0 commit comments

Comments
 (0)