Skip to content

Commit e17ada2

Browse files
fix: don't start a retry timer if connection was closed due to being idle (googleapis#2942)
1 parent f5c01da commit e17ada2

File tree

2 files changed

+59
-5
lines changed

2 files changed

+59
-5
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,31 +1273,38 @@ private void doneCallback(Throwable finalStatus) {
12731273
+ writerId
12741274
+ " Final status: "
12751275
+ finalStatus.toString());
1276+
boolean closedIdleConnection =
1277+
finalStatus.toString().contains("Closing the stream because it has been inactive");
12761278
this.lock.lock();
12771279
try {
12781280
this.streamConnectionIsConnected = false;
12791281
this.telemetryMetrics.recordConnectionEnd(
12801282
Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString());
12811283
if (connectionFinalStatus == null) {
1282-
if (connectionRetryStartTime == 0) {
1284+
if (!closedIdleConnection && connectionRetryStartTime == 0) {
12831285
connectionRetryStartTime = System.currentTimeMillis();
12841286
}
12851287
// If the error can be retried, don't set it here, let it try to retry later on.
12861288
if (isConnectionErrorRetriable(Status.fromThrowable(finalStatus).getCode())
12871289
&& !userClosed
12881290
&& (maxRetryDuration.toMillis() == 0f
1291+
|| closedIdleConnection
12891292
|| System.currentTimeMillis() - connectionRetryStartTime
12901293
<= maxRetryDuration.toMillis())) {
1291-
this.conectionRetryCountWithoutCallback++;
1292-
this.telemetryMetrics.recordConnectionStartWithRetry();
1294+
if (!closedIdleConnection) {
1295+
this.conectionRetryCountWithoutCallback++;
1296+
this.telemetryMetrics.recordConnectionStartWithRetry();
1297+
}
12931298
log.info(
12941299
"Connection is going to be reestablished with the next request. Retriable error "
12951300
+ finalStatus.toString()
12961301
+ " received, retry count "
12971302
+ conectionRetryCountWithoutCallback
12981303
+ ", millis left to retry "
12991304
+ (maxRetryDuration.toMillis()
1300-
- (System.currentTimeMillis() - connectionRetryStartTime))
1305+
- (connectionRetryStartTime > 0
1306+
? System.currentTimeMillis() - connectionRetryStartTime
1307+
: 0))
13011308
+ ", for stream "
13021309
+ streamName
13031310
+ " id:"
@@ -1311,7 +1318,10 @@ private void doneCallback(Throwable finalStatus) {
13111318
+ " for stream "
13121319
+ streamName
13131320
+ " with write id: "
1314-
+ writerId);
1321+
+ writerId
1322+
+ ", millis left to retry was "
1323+
+ (maxRetryDuration.toMillis()
1324+
- (System.currentTimeMillis() - connectionRetryStartTime)));
13151325
}
13161326
}
13171327
} finally {

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
3333
import com.google.protobuf.DescriptorProtos;
3434
import com.google.protobuf.Int64Value;
35+
import io.grpc.Status;
3536
import io.grpc.StatusRuntimeException;
3637
import io.opentelemetry.api.common.Attributes;
3738
import java.io.IOException;
@@ -898,6 +899,49 @@ public void testOpenTelemetryAttributesWithTraceId() throws Exception {
898899
exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null);
899900
}
900901

902+
@Test
903+
public void testDoubleDisconnectWithShorterRetryDuration() throws Exception {
904+
// simulate server disconnect due to idle stream
905+
testBigQueryWrite.setFailedStatus(
906+
Status.ABORTED.withDescription(
907+
"Closing the stream because it has been inactive for 600 seconds."));
908+
testBigQueryWrite.setCloseEveryNAppends(1);
909+
testBigQueryWrite.setTimesToClose(
910+
2); // Total of 2 connection failures. The time interval between the processing of these
911+
// failures will exceed the configured maxRetryDuration.
912+
testBigQueryWrite.addResponse(createAppendResponse(0));
913+
914+
ProtoSchema schema1 = createProtoSchema("foo");
915+
StreamWriter sw1 =
916+
StreamWriter.newBuilder(TEST_STREAM_1, client)
917+
.setLocation("us")
918+
.setWriterSchema(schema1)
919+
.build();
920+
ConnectionWorker connectionWorker =
921+
new ConnectionWorker(
922+
TEST_STREAM_1,
923+
"us",
924+
schema1,
925+
100000,
926+
100000,
927+
Duration.ofMillis(1), // very small maxRetryDuration
928+
FlowController.LimitExceededBehavior.Block,
929+
TEST_TRACE_ID,
930+
null,
931+
client.getSettings(),
932+
retrySettings,
933+
/*enableRequestProfiler=*/ false,
934+
/*enableOpenTelemetry=*/ false,
935+
/*isMultiplexing*/ false);
936+
937+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
938+
futures.add(
939+
sendTestMessage(
940+
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0));
941+
942+
assertEquals(0, futures.get(0).get().getAppendResult().getOffset().getValue());
943+
}
944+
901945
@Test
902946
public void testLocationName() throws Exception {
903947
assertEquals(

0 commit comments

Comments
 (0)