Skip to content

Commit dfa76bc

Browse files
GuestPlatform Teamcopybara-github
GuestPlatform Team
authored andcommitted
Add a function to check if the client is dead.
AcsAgentClient would try to re-create the reactor_ and register the connection when the rpc is closed down by the server. However, this self-saving effort could still fail, and then the caller of the AcsAgentClient should stop calling the SendMessage function and instead kill the object. PiperOrigin-RevId: 693190583
1 parent eb7352c commit dfa76bc

File tree

3 files changed

+63
-12
lines changed

3 files changed

+63
-12
lines changed

cpp/acs_agent_client.cc

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ absl::Status AcsAgentClient::SendMessage(MessageBody message_body) {
136136
return AddRequest(request);
137137
}
138138

139+
bool AcsAgentClient::IsDead() {
140+
// If the stream_state_ is kStreamFailedToInitialize or kShutdown, the client
141+
// is dead, no point to retry sending messages. If the stream_state_ is
142+
// kStreamClosed, the restart_client_thread_ will restart the client soon, the
143+
// caller can retry sending messages later with a backoff mechanism. If the
144+
// stream_state_ is kStreamNotInitialized, the client is not dead yet, just
145+
// waiting for the successful registration.
146+
absl::MutexLock lock(&reactor_mtx_);
147+
return stream_state_ == ClientState::kStreamFailedToInitialize ||
148+
stream_state_ == ClientState::kShutdown;
149+
}
150+
139151
absl::Status AcsAgentClient::Init() {
140152
// Register the connection. The registration request should only be sent once.
141153
std::unique_ptr<Request> registration_request =
@@ -169,7 +181,7 @@ absl::Status AcsAgentClient::AddRequestAndWaitForResponse(
169181
absl::MutexLock lock(&reactor_mtx_);
170182
if (request.has_register_connection() &&
171183
(stream_state_ != ClientState::kStreamNotInitialized &&
172-
stream_state_ != ClientState::kStreamClosed)) {
184+
stream_state_ != ClientState::kStreamTemporarilyDown)) {
173185
return absl::InternalError(
174186
"The stream is not in the correct state to accept new registration "
175187
"request.");
@@ -272,7 +284,7 @@ void AcsAgentClient::ReactorReadCallback(Response response, bool ok) {
272284
ABSL_LOG(WARNING) << "ReactorReadCallback not ok";
273285
// Wakes up RestartReactor() to restart the stream.
274286
absl::MutexLock lock(&reactor_mtx_);
275-
stream_state_ = ClientState::kStreamClosed;
287+
stream_state_ = ClientState::kStreamTemporarilyDown;
276288
return;
277289
}
278290
// Wake up ClientReadMessage().
@@ -346,7 +358,7 @@ void AcsAgentClient::RestartClient() {
346358
while (true) {
347359
reactor_mtx_.LockWhen(absl::Condition(
348360
+[](ClientState* stream_state) {
349-
return *stream_state == ClientState::kStreamClosed ||
361+
return *stream_state == ClientState::kStreamTemporarilyDown ||
350362
*stream_state == ClientState::kShutdown;
351363
},
352364
&stream_state_));
@@ -369,7 +381,7 @@ void AcsAgentClient::RestartClient() {
369381
}
370382
std::unique_ptr<AcsStub> stub = GenerateConnectionIdAndStub();
371383
if (stub == nullptr) {
372-
stream_state_ = ClientState::kStreamFailedToRestart;
384+
stream_state_ = ClientState::kStreamFailedToInitialize;
373385
reactor_mtx_.Unlock();
374386
return;
375387
}
@@ -378,7 +390,7 @@ void AcsAgentClient::RestartClient() {
378390
absl::bind_front(&AcsAgentClient::ReactorReadCallback, this),
379391
connection_id_);
380392
if (reactor_ == nullptr) {
381-
stream_state_ = ClientState::kStreamFailedToRestart;
393+
stream_state_ = ClientState::kStreamFailedToInitialize;
382394
reactor_mtx_.Unlock();
383395
ABSL_LOG(WARNING) << "Failed to generate connection id and reactor.";
384396
return;
@@ -387,7 +399,7 @@ void AcsAgentClient::RestartClient() {
387399
// Initialize the client.
388400
absl::Status init_status = Init();
389401
if (!init_status.ok()) {
390-
stream_state_ = ClientState::kStreamFailedToRestart;
402+
stream_state_ = ClientState::kStreamFailedToInitialize;
391403
reactor_mtx_.Unlock();
392404
return;
393405
}

cpp/acs_agent_client.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ class AcsAgentClient {
6767
ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_)
6868
ABSL_LOCKS_EXCLUDED(reactor_mtx_);
6969

70+
// Checks if the client is dead. If the caller of this class has failure of
71+
// sending or receiving messages, it can call this function to check if the
72+
// client is dead and needs a restart. If the client is dead, the caller can
73+
// call Shutdown() or directly invoke destructor to clean up the client. If
74+
// the client is not dead, the caller can retry sending messages later with a
75+
// self-determined backoff mechanism.
76+
bool IsDead() ABSL_LOCKS_EXCLUDED(reactor_mtx_);
77+
7078
// Shuts down the client by joining the restart client thread and the
7179
// read_response_thread_, and then cancel the RPC.
7280
void Shutdown() ABSL_LOCKS_EXCLUDED(reactor_mtx_)
@@ -218,12 +226,12 @@ class AcsAgentClient {
218226
enum class ClientState {
219227
// The client is ready to read any Response from the server.
220228
kReady,
221-
// Stream not initialized,
229+
// Stream not initialized, waiting for the first registration request.
222230
kStreamNotInitialized,
223-
// The RPC is down, needs a restart.
224-
kStreamClosed,
225-
// The RPC failed to be restarted.
226-
kStreamFailedToRestart,
231+
// The RPC is temporarily down, waiting for a restart.
232+
kStreamTemporarilyDown,
233+
// The RPC failed to be initialized.
234+
kStreamFailedToInitialize,
227235
// The client is being shutdown.
228236
kShutdown,
229237
};

cpp/acs_agent_client_test.cc

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,8 +580,11 @@ TEST_F(AcsAgentClientTest, TestClientRecreatedAfterServerCancellation) {
580580
}
581581

582582
TEST_F(AcsAgentClientTest, TestFailureToRegisterConnection) {
583-
// Shutdown the client.
583+
// Shutdown the client and test if IsClientDead() returns true.
584+
(*client_)->Shutdown();
585+
EXPECT_TRUE((*client_)->IsDead());
584586
*client_ = nullptr;
587+
585588
// Make sure server does delay response.
586589
SetServerDelay(true, absl::Seconds(5));
587590
std::chrono::system_clock::time_point deadline =
@@ -620,5 +623,33 @@ TEST_F(AcsAgentClientTest, TestFailureToRegisterConnection) {
620623
"Timeout waiting for promise to be set for message with id"));
621624
}
622625

626+
TEST_F(AcsAgentClientTest, TestFailureToRestartClientAndClientIsDead) {
627+
// Make sure server does delay response, then registration will fail to
628+
// complete due to timeout.
629+
SetServerDelay(true, absl::Seconds(5));
630+
std::chrono::system_clock::time_point deadline =
631+
std::chrono::system_clock::now() + std::chrono::seconds(1);
632+
server_->Shutdown(deadline);
633+
server_->Wait();
634+
server_ = nullptr;
635+
server_ = std::make_unique<FakeAcsAgentServer>(&service_);
636+
637+
// Create a channel to the server, and ensure server is connectable.
638+
grpc::ChannelArguments channel_args;
639+
// Keepalive settings
640+
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 600 * 1000); // 600 seconds
641+
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
642+
100 * 1000); // 100 seconds
643+
std::shared_ptr<grpc::Channel> channel = grpc::CreateCustomChannel(
644+
server_->GetServerAddress(), grpc::InsecureChannelCredentials(),
645+
channel_args);
646+
deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
647+
ASSERT_TRUE(channel->WaitForConnected(deadline));
648+
649+
// Client will transition to kStreamFailedToInitialize eventually.
650+
EXPECT_TRUE(WaitUntil([this]() { return (*client_)->IsDead(); },
651+
absl::Seconds(20), absl::Seconds(0.1)));
652+
}
653+
623654
} // namespace
624655
} // namespace agent_communication

0 commit comments

Comments
 (0)