diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index 89c08ff3a45ca..7a2efa51b7220 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -4,6 +4,7 @@ package io.airbyte.api.client; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.DbMigrationApi; @@ -19,6 +20,10 @@ import io.airbyte.api.client.generated.StateApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; +import java.util.Random; +import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all @@ -34,6 +39,13 @@ */ public class AirbyteApiClient { + private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteApiClient.class); + private static final Random RANDOM = new Random(); + + public static final int DEFAULT_MAX_RETRIES = 4; + public static final int DEFAULT_RETRY_INTERVAL_SECS = 10; + public static final int DEFAULT_FINAL_INTERVAL_SECS = 10 * 60; + private final ConnectionApi connectionApi; private final DestinationDefinitionApi destinationDefinitionApi; private final DestinationApi destinationApi; @@ -128,4 +140,67 @@ public StateApi getStateApi() { return stateApi; } + /** + * Default to 4 retries with a randomised 1 - 10 seconds interval between the first two retries and + * an 10-minute wait for the last retry. + */ + public static T retryWithJitter(final Callable call, final String desc) { + return retryWithJitter(call, desc, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES); + } + + /** + * Provides a simple retry wrapper for api calls. This retry behaviour is slightly different from + * generally available retries libraries - the last retry is able to wait an interval inconsistent + * with regular intervals/exponential backoff. + *

+ * Since the primary retries use case is long-running workflows, the benefit of waiting a couple of + * minutes as a last ditch effort to outlast networking disruption outweighs the cost of slightly + * longer jobs. + * + * @param call method to execute + * @param desc short readable explanation of why this method is executed + * @param jitterMaxIntervalSecs upper limit of the randomised retry interval. Minimum value is 1. + * @param finalIntervalSecs retry interval before the last retry. + */ + @VisibleForTesting + // This is okay since we are logging the stack trace, which PMD is not detecting. + @SuppressWarnings("PMD.PreserveStackTrace") + public static T retryWithJitter(final Callable call, + final String desc, + final int jitterMaxIntervalSecs, + final int finalIntervalSecs, + final int maxTries) { + int currRetries = 0; + boolean keepTrying = true; + + T data = null; + while (keepTrying && currRetries < maxTries) { + try { + LOGGER.info("Attempt {} to {}", currRetries, desc); + data = call.call(); + + keepTrying = false; + } catch (final Exception e) { + LOGGER.info("Attempt {} to {} error: {}", currRetries, desc, e); + currRetries++; + + // Sleep anywhere from 1 to jitterMaxIntervalSecs seconds. + final var backoffTimeSecs = Math.max(RANDOM.nextInt(jitterMaxIntervalSecs + 1), 1); + var backoffTimeMs = backoffTimeSecs * 1000; + + if (currRetries == maxTries - 1) { + // sleep for finalIntervalMins on the last attempt. + backoffTimeMs = finalIntervalSecs * 1000; + } + + try { + Thread.sleep(backoffTimeMs); + } catch (final InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + return data; + } + } diff --git a/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java new file mode 100644 index 0000000000000..2c96d9ef264c8 --- /dev/null +++ b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.api.client; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Callable; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +public class AirbyteApiClientTest { + + // These set of configurations are so each test case takes ~3 secs. + private static final int TEST_JITTER_INTERVAL_SECS = 1; + private static final int TEST_FINAL_INTERVAL_SECS = 1; + private static final int TEST_MAX_RETRIES = 2; + @Mock + private Callable mockCallable; + + @Nested + class RetryWithJitter { + + @Test + @DisplayName("Should not retry on success") + void ifSucceedShouldNotRetry() throws Exception { + mockCallable = mock(Callable.class); + when(mockCallable.call()).thenReturn("Success!"); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); + + verify(mockCallable, times(1)).call(); + } + + @Test + @DisplayName("Should retry up to the configured max retries on continued errors") + void onlyRetryTillMaxRetries() throws Exception { + mockCallable = mock(Callable.class); + when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!")); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); + + verify(mockCallable, times(TEST_MAX_RETRIES)).call(); + + } + + @Test + @DisplayName("Should retry only if there are errors") + void onlyRetryOnErrors() throws Exception { + mockCallable = mock(Callable.class); + // Because we succeed on the second try, we should only call the method twice. + when(mockCallable.call()) + .thenThrow(new RuntimeException("Bomb!")) + .thenReturn("Success!"); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, 3); + + verify(mockCallable, times(2)).call(); + + } + + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 27531fce2919e..78b74764a8034 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -139,7 +139,10 @@ public OUTPUT get() { } LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion); - saveWorkflowIdForCancellation(airbyteApiClient); + AirbyteApiClient.retryWithJitter(() -> { + saveWorkflowIdForCancellation(airbyteApiClient); + return null; + }, "save workflow id for cancellation"); final Worker worker = workerSupplier.get(); final CompletableFuture outputFuture = new CompletableFuture<>(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index 768f397ed86f3..381d6ebbebfb1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -11,7 +11,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; @@ -153,11 +152,13 @@ private CheckedSupplier, Exception> getContainerL final WorkerConfigs workerConfigs, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final Supplier activityContext) - throws ApiException { + final Supplier activityContext) { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); + + final var jobScope = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), + "get job scope"); final var connectionId = UUID.fromString(jobScope); return () -> new DbtLauncherWorker( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index b37e41e8aa33b..81410f668bf92 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -11,7 +11,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; @@ -162,11 +161,12 @@ private CheckedSupplier, Except final WorkerConfigs workerConfigs, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final Supplier activityContext) - throws ApiException { + final Supplier activityContext) { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); + final var jobScope = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), + "get job scope"); final var connectionId = UUID.fromString(jobScope); return () -> new NormalizationLauncherWorker( connectionId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index 38bf7111eb25a..eec1fd1af591a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -10,7 +10,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.client.model.generated.JobIdRequestBody; @@ -48,8 +47,10 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList; try { - AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)); - } catch (final ApiException e) { + AttemptNormalizationStatusReadList = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)), + "get normalization statuses"); + } catch (final Exception e) { throw Activity.wrap(e); } final AtomicLong totalRecordsCommitted = new AtomicLong(0L); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index f9ed3057066ca..4b6dd00753b16 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate; @@ -54,8 +53,10 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut try { final Optional maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState()); if (maybeStateWrapper.isPresent()) { - final ConnectionState previousState = airbyteApiClient.getStateApi() - .getState(new ConnectionIdRequestBody().connectionId(connectionId)); + final ConnectionState previousState = + AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)), + "get state"); if (featureFlags.needStateValidation() && previousState != null) { final StateType newStateType = maybeStateWrapper.get().getStateType(); final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType()); @@ -65,12 +66,17 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut } } - airbyteApiClient.getStateApi().createOrUpdateState( - new ConnectionStateCreateOrUpdate() - .connectionId(connectionId) - .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); + AirbyteApiClient.retryWithJitter( + () -> { + airbyteApiClient.getStateApi().createOrUpdateState( + new ConnectionStateCreateOrUpdate() + .connectionId(connectionId) + .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); + return null; + }, + "create or update state"); } - } catch (final ApiException e) { + } catch (final Exception e) { throw new RuntimeException(e); } return true; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index b7104497e71aa..e5494e8702bf7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -307,7 +307,9 @@ private CheckedSupplier, Exception> throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobInfo = airbyteApiClient.getJobsApi().getJobInfoLight(id); + final var jobInfo = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfoLight(id), + "get job info light"); LOGGER.info("received response from from jobsApi.getJobInfoLight: {}", jobInfo); final var jobScope = jobInfo.getJob().getConfigId(); final var connectionId = UUID.fromString(jobScope);