From 982f7da552b63e1488bb6ca4cdafc0dc953c83ab Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 21 Nov 2022 19:20:27 -0800 Subject: [PATCH 1/8] Add application logic to retry api calls. --- .../temporal/TemporalAttemptExecution.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 27531fce2919e..5b466575bf534 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -20,6 +20,7 @@ import io.temporal.activity.ActivityExecutionContext; import java.nio.file.Path; import java.util.Optional; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,6 +43,8 @@ public class TemporalAttemptExecution implements Supplier private static final Logger LOGGER = LoggerFactory.getLogger(TemporalAttemptExecution.class); + private static final Random RANDOM = new Random(); + private final JobRunConfig jobRunConfig; private final Path jobRoot; private final CheckedSupplier, Exception> workerSupplier; @@ -139,7 +142,7 @@ public OUTPUT get() { } LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion); - saveWorkflowIdForCancellation(airbyteApiClient); + retryWithJitter(airbyteApiClient); final Worker worker = workerSupplier.get(); final CompletableFuture outputFuture = new CompletableFuture<>(); @@ -165,6 +168,25 @@ public OUTPUT get() { } } + private void retryWithJitter(final AirbyteApiClient airbyteApiClient) throws InterruptedException { + final int maxRetries = 3; + int currRetries = 0; + boolean needToSend = true; + + while (needToSend && currRetries < maxRetries) { + try { + LOGGER.info("Attempt {} to save workflow id", currRetries); + saveWorkflowIdForCancellation(airbyteApiClient); + needToSend = false; + } catch (final ApiException e) { + LOGGER.info("Workflow ID attempt {} save error: {}", currRetries, e); + currRetries++; + // Sleep anywhere from 1 to 10 seconds. + Thread.sleep(Math.min(RANDOM.nextInt(11), 1) * 1000); + } + } + } + private void saveWorkflowIdForCancellation(final AirbyteApiClient airbyteApiClient) throws ApiException { // If the jobId is not a number, it means the job is a synchronous job. No attempt is created for // it, and it cannot be cancelled, so do not save the workflowId. See From 623464a935e368d7fe9915394840a6459b3f7b7b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 22 Nov 2022 13:19:06 -0800 Subject: [PATCH 2/8] Sleep for ten mins on the last attempt. --- .../workers/temporal/TemporalAttemptExecution.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 5b466575bf534..8f6201e248a8c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -169,7 +169,7 @@ public OUTPUT get() { } private void retryWithJitter(final AirbyteApiClient airbyteApiClient) throws InterruptedException { - final int maxRetries = 3; + final int maxRetries = 4; int currRetries = 0; boolean needToSend = true; @@ -182,7 +182,15 @@ private void retryWithJitter(final AirbyteApiClient airbyteApiClient) throws Int LOGGER.info("Workflow ID attempt {} save error: {}", currRetries, e); currRetries++; // Sleep anywhere from 1 to 10 seconds. - Thread.sleep(Math.min(RANDOM.nextInt(11), 1) * 1000); + var backoffTime = Math.min(RANDOM.nextInt(11), 1) * 1000; + + if (currRetries == maxRetries - 1) { + // sleep for ten mins on the last attempt. + backoffTime = 10 * 60 * 1000; + } + + Thread.sleep(backoffTime); + } } } From 0ff88130baf82525da2ef5d8eb429d27f1373b9c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 22 Nov 2022 15:28:30 -0800 Subject: [PATCH 3/8] Pull jitter out into a separate method. Inject retry with jitter into all the spots we call the api. --- .../temporal/TemporalAttemptExecution.java | 21 +++++++++++++------ .../sync/DbtTransformationActivityImpl.java | 3 ++- .../sync/NormalizationActivityImpl.java | 2 +- ...NormalizationSummaryCheckActivityImpl.java | 7 ++++--- .../sync/PersistStateActivityImpl.java | 20 +++++++++++------- .../sync/ReplicationActivityImpl.java | 2 +- 6 files changed, 35 insertions(+), 20 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 8f6201e248a8c..1ddaf0abc9323 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -21,6 +21,7 @@ import java.nio.file.Path; import java.util.Optional; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -142,7 +143,10 @@ public OUTPUT get() { } LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion); - retryWithJitter(airbyteApiClient); + retryWithJitter(() -> { + saveWorkflowIdForCancellation(airbyteApiClient); + return null; + }); final Worker worker = workerSupplier.get(); final CompletableFuture outputFuture = new CompletableFuture<>(); @@ -168,17 +172,18 @@ public OUTPUT get() { } } - private void retryWithJitter(final AirbyteApiClient airbyteApiClient) throws InterruptedException { + public static T retryWithJitter(final Callable call) { final int maxRetries = 4; int currRetries = 0; boolean needToSend = true; + T data = null; while (needToSend && currRetries < maxRetries) { try { LOGGER.info("Attempt {} to save workflow id", currRetries); - saveWorkflowIdForCancellation(airbyteApiClient); + data = call.call(); needToSend = false; - } catch (final ApiException e) { + } catch (final Exception e) { LOGGER.info("Workflow ID attempt {} save error: {}", currRetries, e); currRetries++; // Sleep anywhere from 1 to 10 seconds. @@ -189,10 +194,14 @@ private void retryWithJitter(final AirbyteApiClient airbyteApiClient) throws Int backoffTime = 10 * 60 * 1000; } - Thread.sleep(backoffTime); - + try { + Thread.sleep(backoffTime); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } } } + return data; } private void saveWorkflowIdForCancellation(final AirbyteApiClient airbyteApiClient) throws ApiException { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index 768f397ed86f3..d8ac4f4090ca5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -157,7 +157,8 @@ private CheckedSupplier, Exception> getContainerL throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); + + final var jobScope = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId()); final var connectionId = UUID.fromString(jobScope); return () -> new DbtLauncherWorker( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index b37e41e8aa33b..7816317dd89fc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -166,7 +166,7 @@ private CheckedSupplier, Except throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); + final var jobScope = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId()); final var connectionId = UUID.fromString(jobScope); return () -> new NormalizationLauncherWorker( connectionId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index 38bf7111eb25a..6afedca9f7a16 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -10,11 +10,11 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.metrics.lib.ApmTraceUtils; +import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.temporal.activity.Activity; import jakarta.inject.Singleton; import java.util.Comparator; @@ -48,8 +48,9 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList; try { - AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)); - } catch (final ApiException e) { + AttemptNormalizationStatusReadList = TemporalAttemptExecution.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId))); + } catch (final Exception e) { throw Activity.wrap(e); } final AtomicLong totalRecordsCommitted = new AtomicLong(0L); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index f9ed3057066ca..efbd136f0d810 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate; @@ -27,6 +26,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.workers.helper.StateConverter; +import io.airbyte.workers.temporal.TemporalAttemptExecution; import jakarta.inject.Singleton; import java.util.List; import java.util.Map; @@ -54,8 +54,9 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut try { final Optional maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState()); if (maybeStateWrapper.isPresent()) { - final ConnectionState previousState = airbyteApiClient.getStateApi() - .getState(new ConnectionIdRequestBody().connectionId(connectionId)); + final ConnectionState previousState = + TemporalAttemptExecution + .retryWithJitter(() -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); if (featureFlags.needStateValidation() && previousState != null) { final StateType newStateType = maybeStateWrapper.get().getStateType(); final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType()); @@ -65,12 +66,15 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut } } - airbyteApiClient.getStateApi().createOrUpdateState( - new ConnectionStateCreateOrUpdate() - .connectionId(connectionId) - .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); + TemporalAttemptExecution.retryWithJitter(() -> { + airbyteApiClient.getStateApi().createOrUpdateState( + new ConnectionStateCreateOrUpdate() + .connectionId(connectionId) + .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); + return null; + }); } - } catch (final ApiException e) { + } catch (final Exception e) { throw new RuntimeException(e); } return true; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index b7104497e71aa..c818ed6ace677 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -307,7 +307,7 @@ private CheckedSupplier, Exception> throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobInfo = airbyteApiClient.getJobsApi().getJobInfoLight(id); + final var jobInfo = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfoLight(id)); LOGGER.info("received response from from jobsApi.getJobInfoLight: {}", jobInfo); final var jobScope = jobInfo.getJob().getConfigId(); final var connectionId = UUID.fromString(jobScope); From ae94145e3fc4c5f77e5a1563093a7544c5cb9580 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Nov 2022 09:16:18 -0800 Subject: [PATCH 4/8] Retry to make more usable. --- .../airbyte/api/client/AirbyteApiClient.java | 52 +++++++++++++++++++ .../temporal/TemporalAttemptExecution.java | 39 +------------- .../sync/DbtTransformationActivityImpl.java | 7 +-- .../sync/NormalizationActivityImpl.java | 4 +- ...NormalizationSummaryCheckActivityImpl.java | 5 +- .../sync/PersistStateActivityImpl.java | 21 ++++---- .../sync/ReplicationActivityImpl.java | 4 +- 7 files changed, 79 insertions(+), 53 deletions(-) diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index 89c08ff3a45ca..6ace0d1f861b3 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -4,6 +4,7 @@ package io.airbyte.api.client; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.DbMigrationApi; @@ -19,6 +20,10 @@ import io.airbyte.api.client.generated.StateApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; +import java.util.Random; +import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all @@ -34,6 +39,14 @@ */ public class AirbyteApiClient { + private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteApiClient.class); + private static final Random RANDOM = new Random(); + + public static final int DEFAULT_MAX_RETRIES = 4; + public static final int DEFAULT_RETRY_INTERVAL_SECS = 10; + + public static final int DEFAULT_FINAL_INTERVAL_SECS = 10; + private final ConnectionApi connectionApi; private final DestinationDefinitionApi destinationDefinitionApi; private final DestinationApi destinationApi; @@ -128,4 +141,43 @@ public StateApi getStateApi() { return stateApi; } + public static T retryWithJitter(final Callable call, final String desc) { + return retryWithJitter(call, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES, desc); + } + + @VisibleForTesting + public static T retryWithJitter(final Callable call, int jitterIntervalSecs, int finalIntervalMins, int maxTries, final String desc) { + int currRetries = 0; + boolean keepTrying = true; + + T data = null; + while (keepTrying && currRetries < maxTries) { + try { + LOGGER.info("Attempt {} to {}", currRetries, desc); + data = call.call(); + + keepTrying = false; + } catch (final Exception e) { + LOGGER.info("Attempt {} to {} error: {}", currRetries, desc, e); + currRetries++; + + // Sleep anywhere from 1 to jitterIntervalSecs seconds. + final var backoffTimeSecs = Math.min(RANDOM.nextInt(jitterIntervalSecs + 1), 1); + var backoffTimeMs = backoffTimeSecs * 1000; + + if (currRetries == maxTries - 1) { + // sleep for finalIntervalMins on the last attempt. + backoffTimeMs = finalIntervalMins * 60 * 1000; + } + + try { + Thread.sleep(backoffTimeMs); + } catch (final InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + return data; + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 1ddaf0abc9323..0cde59df545fd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -21,7 +21,6 @@ import java.nio.file.Path; import java.util.Optional; import java.util.Random; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -44,8 +43,6 @@ public class TemporalAttemptExecution implements Supplier private static final Logger LOGGER = LoggerFactory.getLogger(TemporalAttemptExecution.class); - private static final Random RANDOM = new Random(); - private final JobRunConfig jobRunConfig; private final Path jobRoot; private final CheckedSupplier, Exception> workerSupplier; @@ -143,10 +140,10 @@ public OUTPUT get() { } LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion); - retryWithJitter(() -> { + AirbyteApiClient.retryWithJitter(() -> { saveWorkflowIdForCancellation(airbyteApiClient); return null; - }); + }, "save workflow id for cancellation"); final Worker worker = workerSupplier.get(); final CompletableFuture outputFuture = new CompletableFuture<>(); @@ -172,38 +169,6 @@ public OUTPUT get() { } } - public static T retryWithJitter(final Callable call) { - final int maxRetries = 4; - int currRetries = 0; - boolean needToSend = true; - - T data = null; - while (needToSend && currRetries < maxRetries) { - try { - LOGGER.info("Attempt {} to save workflow id", currRetries); - data = call.call(); - needToSend = false; - } catch (final Exception e) { - LOGGER.info("Workflow ID attempt {} save error: {}", currRetries, e); - currRetries++; - // Sleep anywhere from 1 to 10 seconds. - var backoffTime = Math.min(RANDOM.nextInt(11), 1) * 1000; - - if (currRetries == maxRetries - 1) { - // sleep for ten mins on the last attempt. - backoffTime = 10 * 60 * 1000; - } - - try { - Thread.sleep(backoffTime); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - } - return data; - } - private void saveWorkflowIdForCancellation(final AirbyteApiClient airbyteApiClient) throws ApiException { // If the jobId is not a number, it means the job is a synchronous job. No attempt is created for // it, and it cannot be cancelled, so do not save the workflowId. See diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index d8ac4f4090ca5..592fe874f14fc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -153,12 +153,13 @@ private CheckedSupplier, Exception> getContainerL final WorkerConfigs workerConfigs, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final Supplier activityContext) - throws ApiException { + final Supplier activityContext) { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId()); + final var jobScope = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), + "get job scope"); final var connectionId = UUID.fromString(jobScope); return () -> new DbtLauncherWorker( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 7816317dd89fc..da695efd6f2de 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -166,7 +166,9 @@ private CheckedSupplier, Except throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobScope = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId()); + final var jobScope = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), + "get job scope"); final var connectionId = UUID.fromString(jobScope); return () -> new NormalizationLauncherWorker( connectionId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index 6afedca9f7a16..6d7564f5e96cd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -48,8 +48,9 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList; try { - AttemptNormalizationStatusReadList = TemporalAttemptExecution.retryWithJitter( - () -> airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId))); + AttemptNormalizationStatusReadList = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)), + "get normalization statuses"); } catch (final Exception e) { throw Activity.wrap(e); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index efbd136f0d810..54952bc7ca129 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -55,8 +55,9 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut final Optional maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState()); if (maybeStateWrapper.isPresent()) { final ConnectionState previousState = - TemporalAttemptExecution - .retryWithJitter(() -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)), + "get state"); if (featureFlags.needStateValidation() && previousState != null) { final StateType newStateType = maybeStateWrapper.get().getStateType(); final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType()); @@ -66,13 +67,15 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut } } - TemporalAttemptExecution.retryWithJitter(() -> { - airbyteApiClient.getStateApi().createOrUpdateState( - new ConnectionStateCreateOrUpdate() - .connectionId(connectionId) - .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); - return null; - }); + AirbyteApiClient.retryWithJitter( + () -> { + airbyteApiClient.getStateApi().createOrUpdateState( + new ConnectionStateCreateOrUpdate() + .connectionId(connectionId) + .connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null)))); + return null; + }, + "create or update state"); } } catch (final Exception e) { throw new RuntimeException(e); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index c818ed6ace677..e5494e8702bf7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -307,7 +307,9 @@ private CheckedSupplier, Exception> throws ApiException { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); - final var jobInfo = TemporalAttemptExecution.retryWithJitter(() -> airbyteApiClient.getJobsApi().getJobInfoLight(id)); + final var jobInfo = AirbyteApiClient.retryWithJitter( + () -> airbyteApiClient.getJobsApi().getJobInfoLight(id), + "get job info light"); LOGGER.info("received response from from jobsApi.getJobInfoLight: {}", jobInfo); final var jobScope = jobInfo.getJob().getConfigId(); final var connectionId = UUID.fromString(jobScope); From 0b7741d7b9d502d33ce9040c32c0c8e5cd8f6813 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Nov 2022 09:48:46 -0800 Subject: [PATCH 5/8] Pass PMD and check. --- .../airbyte/api/client/AirbyteApiClient.java | 13 +++-- .../api/client/AirbyteApiClientTest.java | 55 +++++++++++++++++++ .../temporal/TemporalAttemptExecution.java | 1 - .../sync/DbtTransformationActivityImpl.java | 1 - .../sync/NormalizationActivityImpl.java | 4 +- ...NormalizationSummaryCheckActivityImpl.java | 1 - .../sync/PersistStateActivityImpl.java | 1 - 7 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index 6ace0d1f861b3..b7716da07cbaf 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -45,7 +45,7 @@ public class AirbyteApiClient { public static final int DEFAULT_MAX_RETRIES = 4; public static final int DEFAULT_RETRY_INTERVAL_SECS = 10; - public static final int DEFAULT_FINAL_INTERVAL_SECS = 10; + public static final int DEFAULT_FINAL_INTERVAL_SECS = 10 * 60; private final ConnectionApi connectionApi; private final DestinationDefinitionApi destinationDefinitionApi; @@ -142,11 +142,16 @@ public StateApi getStateApi() { } public static T retryWithJitter(final Callable call, final String desc) { - return retryWithJitter(call, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES, desc); + return retryWithJitter(call, desc, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES); } @VisibleForTesting - public static T retryWithJitter(final Callable call, int jitterIntervalSecs, int finalIntervalMins, int maxTries, final String desc) { + @SuppressWarnings("PMD.PreserveStackTrace") + public static T retryWithJitter(final Callable call, + final String desc, + final int jitterIntervalSecs, + final int finalIntervalSecs, + final int maxTries) { int currRetries = 0; boolean keepTrying = true; @@ -167,7 +172,7 @@ public static T retryWithJitter(final Callable call, int jitterIntervalS if (currRetries == maxTries - 1) { // sleep for finalIntervalMins on the last attempt. - backoffTimeMs = finalIntervalMins * 60 * 1000; + backoffTimeMs = finalIntervalSecs * 1000; } try { diff --git a/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java new file mode 100644 index 0000000000000..7c725b05c91af --- /dev/null +++ b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.api.client; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Callable; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +public class AirbyteApiClientTest { + + // These set of configurations are so each test case takes ~3 secs. + private static final int TEST_JITTER_INTERVAL_SECS = 1; + private static final int TEST_FINAL_INTERVAL_SECS = 1; + private static final int TEST_MAX_RETRIES = 2; + @Mock + private Callable mockCallable; + + @Nested + class RetryWithJitter { + + @Test + @DisplayName("Should not retry on success") + void ifSucceedShouldNotRetry() throws Exception { + mockCallable = mock(Callable.class); + when(mockCallable.call()).thenReturn("Success!"); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); + + verify(mockCallable, times(1)).call(); + } + + @Test + @DisplayName("Should make ") + void ifErrorShouldRetry() throws Exception { + mockCallable = mock(Callable.class); + when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!")); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); + + verify(mockCallable, times(TEST_MAX_RETRIES)).call(); + + } + + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 0cde59df545fd..78b74764a8034 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -20,7 +20,6 @@ import io.temporal.activity.ActivityExecutionContext; import java.nio.file.Path; import java.util.Optional; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index 592fe874f14fc..381d6ebbebfb1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -11,7 +11,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index da695efd6f2de..81410f668bf92 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -11,7 +11,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; @@ -162,8 +161,7 @@ private CheckedSupplier, Except final WorkerConfigs workerConfigs, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final Supplier activityContext) - throws ApiException { + final Supplier activityContext) { final JobIdRequestBody id = new JobIdRequestBody(); id.setId(Long.valueOf(jobRunConfig.getJobId())); final var jobScope = AirbyteApiClient.retryWithJitter( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index 6d7564f5e96cd..eec1fd1af591a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -14,7 +14,6 @@ import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.metrics.lib.ApmTraceUtils; -import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.temporal.activity.Activity; import jakarta.inject.Singleton; import java.util.Comparator; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index 54952bc7ca129..4b6dd00753b16 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -26,7 +26,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.workers.helper.StateConverter; -import io.airbyte.workers.temporal.TemporalAttemptExecution; import jakarta.inject.Singleton; import java.util.List; import java.util.Map; From c744b0224a5ae5a0ce4d012038b5f2b3113b42ba Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Nov 2022 10:10:01 -0800 Subject: [PATCH 6/8] Better comments. --- .../airbyte/api/client/AirbyteApiClient.java | 26 ++++++++++++++++--- .../api/client/AirbyteApiClientTest.java | 19 ++++++++++++-- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index b7716da07cbaf..4b465f8a783f1 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -44,7 +44,6 @@ public class AirbyteApiClient { public static final int DEFAULT_MAX_RETRIES = 4; public static final int DEFAULT_RETRY_INTERVAL_SECS = 10; - public static final int DEFAULT_FINAL_INTERVAL_SECS = 10 * 60; private final ConnectionApi connectionApi; @@ -141,15 +140,34 @@ public StateApi getStateApi() { return stateApi; } + /** + * Default to 3 retries with a randomised 1 - 10 seconds interval between the first two retries and + * an 10 minute wait for the last retry. + */ public static T retryWithJitter(final Callable call, final String desc) { return retryWithJitter(call, desc, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES); } + /** + * Provides a simple retry wrapper for api calls. This retry behaviour is slightly different from + * generally available retries libraries - the last retry is able to wait an interval inconsistent + * with regular intervals/exponential backoff. + *

+ * Since the primary retries use case is long-running workflows, the benefit of waiting a couple of + * minutes as a last ditch effort to outlast networking disruption outweighs the cost of slightly + * longer jobs. + * + * @param call method to execute + * @param desc short readable explanation of why this method is executed + * @param jitterMaxIntervalSecs upper limit of the randomised retry interval. Minimum value is 1. + * @param finalIntervalSecs retry interval before the last retry. + */ @VisibleForTesting + // This is okay since we are logging the stack trace, which PMD is not detecting. @SuppressWarnings("PMD.PreserveStackTrace") public static T retryWithJitter(final Callable call, final String desc, - final int jitterIntervalSecs, + final int jitterMaxIntervalSecs, final int finalIntervalSecs, final int maxTries) { int currRetries = 0; @@ -166,8 +184,8 @@ public static T retryWithJitter(final Callable call, LOGGER.info("Attempt {} to {} error: {}", currRetries, desc, e); currRetries++; - // Sleep anywhere from 1 to jitterIntervalSecs seconds. - final var backoffTimeSecs = Math.min(RANDOM.nextInt(jitterIntervalSecs + 1), 1); + // Sleep anywhere from 1 to jitterMaxIntervalSecs seconds. + final var backoffTimeSecs = Math.min(RANDOM.nextInt(jitterMaxIntervalSecs + 1), 1); var backoffTimeMs = backoffTimeSecs * 1000; if (currRetries == maxTries - 1) { diff --git a/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java index 7c725b05c91af..2c96d9ef264c8 100644 --- a/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java +++ b/airbyte-api/src/test/java/io/airbyte/api/client/AirbyteApiClientTest.java @@ -39,8 +39,8 @@ void ifSucceedShouldNotRetry() throws Exception { } @Test - @DisplayName("Should make ") - void ifErrorShouldRetry() throws Exception { + @DisplayName("Should retry up to the configured max retries on continued errors") + void onlyRetryTillMaxRetries() throws Exception { mockCallable = mock(Callable.class); when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!")); @@ -50,6 +50,21 @@ void ifErrorShouldRetry() throws Exception { } + @Test + @DisplayName("Should retry only if there are errors") + void onlyRetryOnErrors() throws Exception { + mockCallable = mock(Callable.class); + // Because we succeed on the second try, we should only call the method twice. + when(mockCallable.call()) + .thenThrow(new RuntimeException("Bomb!")) + .thenReturn("Success!"); + + AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, 3); + + verify(mockCallable, times(2)).call(); + + } + } } From 44e8863035f7c56cfd3c357c2f48f14ba09a4faa Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Nov 2022 10:18:12 -0800 Subject: [PATCH 7/8] Better comments. --- .../src/main/java/io/airbyte/api/client/AirbyteApiClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index 4b465f8a783f1..e54e321ff9683 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -142,7 +142,7 @@ public StateApi getStateApi() { /** * Default to 3 retries with a randomised 1 - 10 seconds interval between the first two retries and - * an 10 minute wait for the last retry. + * an 10-minute wait for the last retry. */ public static T retryWithJitter(final Callable call, final String desc) { return retryWithJitter(call, desc, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES); From ca0ddbb18a92f6f8071b75d028a1172ae4aa22bd Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Nov 2022 21:39:24 -0800 Subject: [PATCH 8/8] Fix comment. Fix Math.max. --- .../src/main/java/io/airbyte/api/client/AirbyteApiClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index e54e321ff9683..7a2efa51b7220 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -141,7 +141,7 @@ public StateApi getStateApi() { } /** - * Default to 3 retries with a randomised 1 - 10 seconds interval between the first two retries and + * Default to 4 retries with a randomised 1 - 10 seconds interval between the first two retries and * an 10-minute wait for the last retry. */ public static T retryWithJitter(final Callable call, final String desc) { @@ -185,7 +185,7 @@ public static T retryWithJitter(final Callable call, currRetries++; // Sleep anywhere from 1 to jitterMaxIntervalSecs seconds. - final var backoffTimeSecs = Math.min(RANDOM.nextInt(jitterMaxIntervalSecs + 1), 1); + final var backoffTimeSecs = Math.max(RANDOM.nextInt(jitterMaxIntervalSecs + 1), 1); var backoffTimeMs = backoffTimeSecs * 1000; if (currRetries == maxTries - 1) {