Skip to content

Commit b4bba47

Browse files
authored
[core] Fix empty resource update to GCS resource manager. (#39648) (#39664)
* [core] Fix empty resource update to GCS resource manager. (#39648) --------- Signed-off-by: rickyyx <[email protected]> * update Signed-off-by: rickyyx <[email protected]> --------- Signed-off-by: rickyyx <[email protected]>
1 parent 8cfb98a commit b4bba47

File tree

7 files changed

+208
-43
lines changed

7 files changed

+208
-43
lines changed

python/ray/autoscaler/v2/schema.py

+6
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,12 @@ class ClusterStatus:
196196
# Query metics
197197
stats: Stats = field(default_factory=Stats)
198198

199+
def total_resources(self) -> Dict[str, float]:
200+
return {r.resource_name: r.total for r in self.cluster_resource_usage}
201+
202+
def available_resources(self) -> Dict[str, float]:
203+
return {r.resource_name: r.total - r.used for r in self.cluster_resource_usage}
204+
199205
# TODO(rickyx): we don't show infeasible requests as of now.
200206
# (They will just be pending forever as part of the demands)
201207
# We should show them properly in the future.

python/ray/autoscaler/v2/tests/test_e2e.py

+74
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,80 @@ def verify():
315315
cluster.shutdown()
316316

317317

318+
def test_non_corrupted_resources():
319+
"""
320+
Test that when node's local gc happens due to object store pressure,
321+
the message doesn't corrupt the resource view on the gcs.
322+
See issue https://github.com/ray-project/ray/issues/39644
323+
"""
324+
num_worker_nodes = 5
325+
cluster = AutoscalingCluster(
326+
head_resources={"CPU": 2, "object_store_memory": 100 * 1024 * 1024},
327+
worker_node_types={
328+
"type-1": {
329+
"resources": {"CPU": 2},
330+
"node_config": {},
331+
"min_workers": num_worker_nodes,
332+
"max_workers": num_worker_nodes,
333+
},
334+
},
335+
)
336+
337+
driver_script = """
338+
339+
import ray
340+
import time
341+
342+
ray.init("auto")
343+
344+
@ray.remote(num_cpus=1)
345+
def foo():
346+
ray.put(bytearray(1024*1024* 50))
347+
348+
349+
while True:
350+
ray.get([foo.remote() for _ in range(50)])
351+
"""
352+
353+
try:
354+
# This should trigger many COMMANDS messages from NodeManager.
355+
cluster.start(
356+
_system_config={
357+
"debug_dump_period_milliseconds": 10,
358+
"raylet_report_resources_period_milliseconds": 10000,
359+
"global_gc_min_interval_s": 1,
360+
"local_gc_interval_s": 1,
361+
"high_plasma_storage_usage": 0.2,
362+
"raylet_check_gc_period_milliseconds": 10,
363+
},
364+
)
365+
ray.init("auto")
366+
367+
from ray.autoscaler.v2.sdk import get_cluster_status
368+
369+
def nodes_up():
370+
cluster_state = get_cluster_status()
371+
return len(cluster_state.healthy_nodes) == num_worker_nodes + 1
372+
373+
wait_for_condition(nodes_up)
374+
375+
# Schedule tasks
376+
run_string_as_driver_nonblocking(driver_script)
377+
start = time.time()
378+
379+
# Check the cluster state for 10 seconds
380+
while time.time() - start < 10:
381+
cluster_state = get_cluster_status()
382+
383+
# Verify total cluster resources never change
384+
assert len((cluster_state.healthy_nodes)) == num_worker_nodes + 1
385+
assert cluster_state.total_resources()["CPU"] == 2 * (num_worker_nodes + 1)
386+
387+
finally:
388+
ray.shutdown()
389+
cluster.shutdown()
390+
391+
318392
if __name__ == "__main__":
319393
if os.environ.get("PARALLEL_CI"):
320394
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))

python/ray/cluster_utils.py

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ def start(self, _system_config=None, override_env: Optional[Dict] = None):
7979
cmd.append("--num-cpus={}".format(self._head_resources.pop("CPU")))
8080
if "GPU" in self._head_resources:
8181
cmd.append("--num-gpus={}".format(self._head_resources.pop("GPU")))
82+
if "object_store_memory" in self._head_resources:
83+
cmd.append(
84+
"--object-store-memory={}".format(
85+
self._head_resources.pop("object_store_memory")
86+
)
87+
)
8288
if self._head_resources:
8389
cmd.append("--resources='{}'".format(json.dumps(self._head_resources)))
8490
if _system_config is not None:

src/ray/gcs/gcs_server/gcs_resource_manager.cc

+31-14
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ void GcsResourceManager::ConsumeSyncMessage(
4343
rpc::ResourcesData resources;
4444
resources.ParseFromString(message->sync_message());
4545
resources.set_node_id(message->node_id());
46-
UpdateFromResourceReport(resources);
46+
if (message->message_type() == syncer::MessageType::COMMANDS) {
47+
UpdateFromResourceCommand(resources);
48+
} else if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
49+
UpdateFromResourceView(resources);
50+
} else {
51+
RAY_LOG(FATAL) << "Unsupported message type: " << message->message_type();
52+
}
4753
},
4854
"GcsResourceManager::Update");
4955
}
@@ -124,7 +130,7 @@ void GcsResourceManager::HandleGetAllAvailableResources(
124130
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
125131
}
126132

127-
void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
133+
void GcsResourceManager::UpdateFromResourceView(const rpc::ResourcesData &data) {
128134
NodeID node_id = NodeID::FromBinary(data.node_id());
129135
// When gcs detects task pending, we may receive an local update. But it can be ignored
130136
// here because gcs' syncer has already broadcast it.
@@ -134,10 +140,11 @@ void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data
134140
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
135141
UpdateNodeNormalTaskResources(node_id, data);
136142
} else {
143+
// We will only update the node's resources if it's from resource view reports.
137144
if (!cluster_resource_manager_.UpdateNode(scheduling::NodeID(node_id.Binary()),
138145
data)) {
139146
RAY_LOG(INFO)
140-
<< "[UpdateFromResourceReport]: received resource usage from unknown node id "
147+
<< "[UpdateFromResourceView]: received resource usage from unknown node id "
141148
<< node_id;
142149
}
143150
}
@@ -167,7 +174,7 @@ void GcsResourceManager::HandleReportResourceUsage(
167174
rpc::ReportResourceUsageRequest request,
168175
rpc::ReportResourceUsageReply *reply,
169176
rpc::SendReplyCallback send_reply_callback) {
170-
UpdateFromResourceReport(request.resources());
177+
UpdateFromResourceView(request.resources());
171178

172179
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
173180
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
@@ -260,6 +267,19 @@ void GcsResourceManager::HandleGetAllResourceUsage(
260267
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
261268
}
262269

270+
void GcsResourceManager::UpdateFromResourceCommand(const rpc::ResourcesData &data) {
271+
const auto node_id = NodeID::FromBinary(data.node_id());
272+
auto iter = node_resource_usages_.find(node_id);
273+
if (iter == node_resource_usages_.end()) {
274+
return;
275+
}
276+
277+
// TODO(rickyx): We should change this to be part of RESOURCE_VIEW.
278+
// This is being populated from NodeManager as part of COMMANDS
279+
iter->second.set_cluster_full_of_actors_detected(
280+
data.cluster_full_of_actors_detected());
281+
}
282+
263283
void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
264284
const rpc::ResourcesData &resources) {
265285
auto iter = node_resource_usages_.find(node_id);
@@ -268,18 +288,15 @@ void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
268288
// If the node is not registered to GCS,
269289
// we are guaranteed that no resource usage will be reported.
270290
return;
271-
} else {
272-
if (resources.resources_total_size() > 0) {
273-
(*iter->second.mutable_resources_total()) = resources.resources_total();
274-
}
291+
}
292+
if (resources.resources_total_size() > 0) {
293+
(*iter->second.mutable_resources_total()) = resources.resources_total();
294+
}
275295

276-
(*iter->second.mutable_resources_available()) = resources.resources_available();
296+
(*iter->second.mutable_resources_available()) = resources.resources_available();
277297

278-
if (resources.resources_normal_task_changed()) {
279-
(*iter->second.mutable_resources_normal_task()) = resources.resources_normal_task();
280-
}
281-
iter->second.set_cluster_full_of_actors_detected(
282-
resources.cluster_full_of_actors_detected());
298+
if (resources.resources_normal_task_changed()) {
299+
(*iter->second.mutable_resources_normal_task()) = resources.resources_normal_task();
283300
}
284301
}
285302

src/ray/gcs/gcs_server/gcs_resource_manager.h

+14-2
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,25 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
123123
/// Update resource usage of given node.
124124
///
125125
/// \param node_id Node id.
126-
/// \param request Request containing resource usage.
126+
/// \param resources The resource usage of the node.
127+
/// \param from_resource_view Whether the resource report is from resource view, i.e.
128+
/// syncer::MessageType::RESOURCE_VIEW.
127129
void UpdateNodeResourceUsage(const NodeID &node_id,
128130
const rpc::ResourcesData &resources);
129131

130132
/// Process a new resource report from a node, independent of the rpc handler it came
131133
/// from.
132-
void UpdateFromResourceReport(const rpc::ResourcesData &data);
134+
///
135+
/// \param data The resource report.
136+
/// \param from_resource_view Whether the resource report is from resource view, i.e.
137+
/// syncer::MessageType::RESOURCE_VIEW.
138+
void UpdateFromResourceView(const rpc::ResourcesData &data);
139+
140+
/// Update the resource usage of a node from syncer COMMANDS
141+
///
142+
/// This is currently used for setting cluster full of actors info from syncer.
143+
/// \param data The resource report.
144+
void UpdateFromResourceCommand(const rpc::ResourcesData &data);
133145

134146
/// Update the placement group load information so that it will be reported through
135147
/// heartbeat.

src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc

+14-14
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
146146
return reply.is_accepted();
147147
}
148148

149-
void UpdateFromResourceReportSync(
149+
void UpdateFromResourceViewSync(
150150
const NodeID &node_id,
151151
const absl::flat_hash_map<std::string, double> &available_resources,
152152
const absl::flat_hash_map<std::string, double> &total_resources,
@@ -159,7 +159,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
159159
total_resources,
160160
idle_ms,
161161
is_draining);
162-
gcs_resource_manager_->UpdateFromResourceReport(resources_data);
162+
gcs_resource_manager_->UpdateFromResourceView(resources_data);
163163
}
164164

165165
rpc::autoscaler::GetClusterStatusReply GetClusterStatusSync() {
@@ -364,9 +364,9 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeAddUpdateRemove) {
364364

365365
// Update available resources.
366366
{
367-
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
368-
{/* available */ {"CPU", 1.75}},
369-
/* total*/ {{"CPU", 2}, {"GPU", 1}});
367+
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
368+
{/* available */ {"CPU", 1.75}},
369+
/* total*/ {{"CPU", 2}, {"GPU", 1}});
370370

371371
const auto &state = GetClusterResourceStateSync();
372372
ASSERT_EQ(state.node_states_size(), 1);
@@ -721,11 +721,11 @@ TEST_F(GcsAutoscalerStateManagerTest, TestDrainingStatus) {
721721
}
722722

723723
// Report draining info.
724-
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
725-
{/* available */ {"CPU", 2}, {"GPU", 1}},
726-
/* total*/ {{"CPU", 2}, {"GPU", 1}},
727-
/* idle_duration_ms */ 10,
728-
/* is_draining */ true);
724+
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
725+
{/* available */ {"CPU", 2}, {"GPU", 1}},
726+
/* total*/ {{"CPU", 2}, {"GPU", 1}},
727+
/* idle_duration_ms */ 10,
728+
/* is_draining */ true);
729729
{
730730
const auto &state = GetClusterResourceStateSync();
731731
ASSERT_EQ(state.node_states(0).status(), rpc::autoscaler::NodeStatus::DRAINING);
@@ -758,10 +758,10 @@ TEST_F(GcsAutoscalerStateManagerTest, TestIdleTime) {
758758
}
759759

760760
// Report idle node info.
761-
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
762-
{/* available */ {"CPU", 2}, {"GPU", 1}},
763-
/* total*/ {{"CPU", 2}, {"GPU", 1}},
764-
/* idle_duration_ms */ 10);
761+
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
762+
{/* available */ {"CPU", 2}, {"GPU", 1}},
763+
/* total*/ {{"CPU", 2}, {"GPU", 1}},
764+
/* idle_duration_ms */ 10);
765765

766766
// Check report idle time is set.
767767
{

0 commit comments

Comments
 (0)