Skip to content

Commit e87537b

Browse files
authored
Fix CONNECT_ACK state bugs in client/server (#60)
## Client bug Bug would manifest as a `AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR` if an `APPLICATION_MESSAGE` was sent immediately upon receiving a `CONNECT_ACK` Turns out the client was setting `state=CONNECT_PROCESSED` in two places: 1) When writing the CONNECT message 2) When the CONNECT message's write-completion callback fired. Fix is to ONLY set `state=CONNECT_PROCESSED` in place 1. Bug would occur if the CONNECT_ACK was received before the CONNECT's write-complete callback fired. The state would go from CONNECT_ACK_PROCESSED back to CONNECT_PROCESSED. This bug began manifesting when aws-c-io changed (PR# awslabs/aws-c-io#353) so that the write-completion callback never fired synchronously. This made it possible to receive a CONNECT_ACK before the CONNECT's write-completion callback fired. The write-completion callback always worked this way on windows though, so it is still righteous to fix the bug in aws-c-event-stream, rather than make further changes to aws-c-io. ## Server bug Server was only setting `state=CONNECT_ACK_PROCESSED` in one place, but it was setting it from the write-completion callback. Seems like a similar thing could happen: 1) server sends CONNECT_ACK 2) client receives CONNECT_ACK 3) client send APPLICATION_MESSAGE 4) server receives APPLICATION_MESSAGE. This causes error because state is not yet `CONNECT_ACK_PROCESSED` 5) CONNECT_ACK write-completion fires, we set `STATE=CONNECT_ACK_PROCESSED`, but too late connection is already shutting down in error Fix is to set `state=CONNECT_ACK_PROCESSED` when writing the CONNECT_ACK message, not when the write completes. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent c80e62b commit e87537b

File tree

3 files changed

+37
-13
lines changed

3 files changed

+37
-13
lines changed

source/event_stream_rpc_client.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,10 @@ static void s_on_protocol_message_written_fn(
412412
(void *)message);
413413

414414
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
415-
AWS_LOGF_DEBUG(
415+
AWS_LOGF_TRACE(
416416
AWS_LS_EVENT_STREAM_RPC_CLIENT,
417-
"id=%p: connect message flushed to the wire, waiting on connect ack.",
417+
"id=%p: connect message flushed to the wire.",
418418
(void *)message_args->connection);
419-
aws_atomic_store_int(&message_args->connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
420419
}
421420

422421
if (message_args->end_stream) {
@@ -535,7 +534,10 @@ static int s_send_protocol_message(
535534
}
536535

537536
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
538-
AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: sending connect message", (void *)connection);
537+
AWS_LOGF_DEBUG(
538+
AWS_LS_EVENT_STREAM_RPC_CLIENT,
539+
"id=%p: sending connect message, waiting on connect ack",
540+
(void *)connection);
539541
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
540542
}
541543

source/event_stream_rpc_server.c

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -484,12 +484,10 @@ static void s_on_protocol_message_written_fn(
484484
aws_error_debug_str(error_code));
485485

486486
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
487-
AWS_LOGF_INFO(
487+
AWS_LOGF_TRACE(
488488
AWS_LS_EVENT_STREAM_RPC_SERVER,
489-
"id=%p: connect ack message sent, the connect handshake is completed",
489+
"id=%p: connect ack message flushed to wire",
490490
(void *)message_args->connection);
491-
aws_atomic_store_int(
492-
&message_args->connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
493491
}
494492

495493
if (message_args->end_stream) {
@@ -580,13 +578,20 @@ static int s_send_protocol_message(
580578
}
581579
}
582580

583-
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK &&
584-
!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
585-
AWS_LOGF_DEBUG(
581+
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
582+
AWS_LOGF_INFO(
586583
AWS_LS_EVENT_STREAM_RPC_SERVER,
587-
"id=%p: connection ack was rejected closing connection",
584+
"id=%p: sending connect ack message, the connect handshake is completed",
588585
(void *)connection);
589-
args->terminate_connection = true;
586+
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
587+
588+
if (!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
589+
AWS_LOGF_DEBUG(
590+
AWS_LS_EVENT_STREAM_RPC_SERVER,
591+
"id=%p: connection ack was rejected closing connection",
592+
(void *)connection);
593+
args->terminate_connection = true;
594+
}
590595
}
591596

592597
args->flush_fn = flush_fn;

tests/event_stream_rpc_client_connection_test.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
10661066
test_data->user_data = &client_test_data;
10671067
test_data->server_continuation_user_data = &client_test_data;
10681068

1069+
/* client sends CONNECT */
10691070
struct aws_byte_buf connect_payload = aws_byte_buf_from_c_str("{ \"message\": \" connect message \" }");
10701071
struct aws_event_stream_rpc_message_args connect_args = {
10711072
.headers_count = 0,
@@ -1078,6 +1079,8 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
10781079
test_data->client_connection, &connect_args, s_rpc_client_message_flush, &client_test_data));
10791080

10801081
aws_mutex_lock(&client_test_data.sync_lock);
1082+
1083+
/* ...wait until sent and received... */
10811084
aws_condition_variable_wait_pred(
10821085
&client_test_data.sync_cvar,
10831086
&client_test_data.sync_lock,
@@ -1092,6 +1095,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
10921095
client_test_data.received_payload.len);
10931096
aws_byte_buf_clean_up(&client_test_data.received_payload);
10941097

1098+
/* server sends CONNECT_ACK */
10951099
client_test_data.received_message_type = 0;
10961100
client_test_data.message_received = false;
10971101
client_test_data.message_sent = false;
@@ -1102,6 +1106,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11021106
ASSERT_SUCCESS(aws_event_stream_rpc_server_connection_send_protocol_message(
11031107
test_data->server_connection, &connect_args, s_rpc_client_message_flush, &client_test_data));
11041108

1109+
/* ...wait until sent and received... */
11051110
aws_condition_variable_wait_pred(
11061111
&client_test_data.sync_cvar,
11071112
&client_test_data.sync_lock,
@@ -1116,6 +1121,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11161121
client_test_data.received_payload.len);
11171122
aws_byte_buf_clean_up(&client_test_data.received_payload);
11181123

1124+
/* client sends message creating new stream */
11191125
client_test_data.received_message_type = 0;
11201126
client_test_data.message_received = false;
11211127
client_test_data.message_sent = false;
@@ -1143,6 +1149,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11431149
ASSERT_SUCCESS(aws_event_stream_rpc_client_continuation_activate(
11441150
client_token, operation_name, &operation_args, s_rpc_client_message_flush, &client_test_data));
11451151

1152+
/* ...wait until sent and received... */
11461153
aws_condition_variable_wait_pred(
11471154
&client_test_data.sync_cvar,
11481155
&client_test_data.sync_lock,
@@ -1164,6 +1171,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11641171
client_test_data.last_seen_operation_name.len);
11651172
aws_byte_buf_clean_up(&client_test_data.last_seen_operation_name);
11661173

1174+
/* server sends response with TERMINATE_STREAM flag set */
11671175
client_test_data.received_message_type = 0;
11681176
client_test_data.message_received = false;
11691177
client_test_data.message_sent = false;
@@ -1174,6 +1182,14 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11741182
ASSERT_SUCCESS(aws_event_stream_rpc_server_continuation_send_message(
11751183
client_test_data.server_token, &operation_args, s_rpc_client_message_flush, &client_test_data));
11761184

1185+
/* ...wait until sent and received... */
1186+
aws_condition_variable_wait_pred(
1187+
&client_test_data.sync_cvar,
1188+
&client_test_data.sync_lock,
1189+
s_rpc_client_message_transmission_completed_pred,
1190+
&client_test_data);
1191+
1192+
/* ...wait until client stream closed... */
11771193
aws_condition_variable_wait_pred(
11781194
&client_test_data.sync_cvar,
11791195
&client_test_data.sync_lock,
@@ -1190,6 +1206,7 @@ static int s_test_event_stream_rpc_client_connection_continuation_send_message_o
11901206

11911207
aws_byte_buf_clean_up(&client_test_data.received_payload);
11921208

1209+
/* should not be allowed to send further stream messages */
11931210
ASSERT_ERROR(
11941211
AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED,
11951212
aws_event_stream_rpc_client_continuation_send_message(

0 commit comments

Comments
 (0)