Skip to content

Add Retry Logic to Airbyte API calls. #19693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 28, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.api.client;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.client.generated.AttemptApi;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.DbMigrationApi;
Expand All @@ -19,6 +20,10 @@
import io.airbyte.api.client.generated.StateApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import java.util.Random;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all
Expand All @@ -34,6 +39,13 @@
*/
public class AirbyteApiClient {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteApiClient.class);
private static final Random RANDOM = new Random();

public static final int DEFAULT_MAX_RETRIES = 4;
public static final int DEFAULT_RETRY_INTERVAL_SECS = 10;
public static final int DEFAULT_FINAL_INTERVAL_SECS = 10 * 60;

private final ConnectionApi connectionApi;
private final DestinationDefinitionApi destinationDefinitionApi;
private final DestinationApi destinationApi;
Expand Down Expand Up @@ -128,4 +140,67 @@ public StateApi getStateApi() {
return stateApi;
}

/**
* Default to 4 retries with a randomised 1 - 10 seconds interval between the first two retries and
* an 10-minute wait for the last retry.
*/
public static <T> T retryWithJitter(final Callable<T> call, final String desc) {
return retryWithJitter(call, desc, DEFAULT_RETRY_INTERVAL_SECS, DEFAULT_FINAL_INTERVAL_SECS, DEFAULT_MAX_RETRIES);
}

/**
* Provides a simple retry wrapper for api calls. This retry behaviour is slightly different from
* generally available retries libraries - the last retry is able to wait an interval inconsistent
* with regular intervals/exponential backoff.
* <p>
* Since the primary retries use case is long-running workflows, the benefit of waiting a couple of
* minutes as a last ditch effort to outlast networking disruption outweighs the cost of slightly
* longer jobs.
*
* @param call method to execute
* @param desc short readable explanation of why this method is executed
* @param jitterMaxIntervalSecs upper limit of the randomised retry interval. Minimum value is 1.
* @param finalIntervalSecs retry interval before the last retry.
*/
@VisibleForTesting
// This is okay since we are logging the stack trace, which PMD is not detecting.
@SuppressWarnings("PMD.PreserveStackTrace")
public static <T> T retryWithJitter(final Callable<T> call,
final String desc,
final int jitterMaxIntervalSecs,
final int finalIntervalSecs,
final int maxTries) {
int currRetries = 0;
boolean keepTrying = true;

T data = null;
while (keepTrying && currRetries < maxTries) {
try {
LOGGER.info("Attempt {} to {}", currRetries, desc);
data = call.call();

keepTrying = false;
} catch (final Exception e) {
LOGGER.info("Attempt {} to {} error: {}", currRetries, desc, e);
currRetries++;

// Sleep anywhere from 1 to jitterMaxIntervalSecs seconds.
final var backoffTimeSecs = Math.max(RANDOM.nextInt(jitterMaxIntervalSecs + 1), 1);
var backoffTimeMs = backoffTimeSecs * 1000;

if (currRetries == maxTries - 1) {
// sleep for finalIntervalMins on the last attempt.
backoffTimeMs = finalIntervalSecs * 1000;
}

try {
Thread.sleep(backoffTimeMs);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
return data;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.api.client;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.Callable;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;

public class AirbyteApiClientTest {

// These set of configurations are so each test case takes ~3 secs.
private static final int TEST_JITTER_INTERVAL_SECS = 1;
private static final int TEST_FINAL_INTERVAL_SECS = 1;
private static final int TEST_MAX_RETRIES = 2;
@Mock
private Callable mockCallable;

@Nested
class RetryWithJitter {

@Test
@DisplayName("Should not retry on success")
void ifSucceedShouldNotRetry() throws Exception {
mockCallable = mock(Callable.class);
when(mockCallable.call()).thenReturn("Success!");

AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES);

verify(mockCallable, times(1)).call();
}

@Test
@DisplayName("Should retry up to the configured max retries on continued errors")
void onlyRetryTillMaxRetries() throws Exception {
mockCallable = mock(Callable.class);
when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!"));

AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES);

verify(mockCallable, times(TEST_MAX_RETRIES)).call();

}

@Test
@DisplayName("Should retry only if there are errors")
void onlyRetryOnErrors() throws Exception {
mockCallable = mock(Callable.class);
// Because we succeed on the second try, we should only call the method twice.
when(mockCallable.call())
.thenThrow(new RuntimeException("Bomb!"))
.thenReturn("Success!");

AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, 3);

verify(mockCallable, times(2)).call();

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ public OUTPUT get() {
}

LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion);
saveWorkflowIdForCancellation(airbyteApiClient);
AirbyteApiClient.retryWithJitter(() -> {
saveWorkflowIdForCancellation(airbyteApiClient);
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this return null?

Copy link
Contributor Author

@davinchia davinchia Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this and I think the alternatives are messier.

We can turn the interface into a Runnable, which doesn't expect an return value. However Runnables do not support checked exceptions, which means we would have to catch and recast the exception thrown by saveWorkflowIdForCancellation into a RTE.

Given that, I think it's simpler to keep this as is!

If you have a suggestion, I'm happy to edit this!

}, "save workflow id for cancellation");

final Worker<INPUT, OUTPUT> worker = workerSupplier.get();
final CompletableFuture<OUTPUT> outputFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -153,11 +152,13 @@ private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getContainerL
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final Supplier<ActivityExecutionContext> activityContext)
throws ApiException {
final Supplier<ActivityExecutionContext> activityContext) {
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(Long.valueOf(jobRunConfig.getJobId()));
final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId();

final var jobScope = AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

"get job scope");
final var connectionId = UUID.fromString(jobScope);

return () -> new DbtLauncherWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -162,11 +161,12 @@ private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Except
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final Supplier<ActivityExecutionContext> activityContext)
throws ApiException {
final Supplier<ActivityExecutionContext> activityContext) {
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(Long.valueOf(jobRunConfig.getJobId()));
final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId();
final var jobScope = AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmossman I feel like we can switch this to getJobInfoLight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, will do in a follow up PR.

"get job scope");
final var connectionId = UUID.fromString(jobScope);
return () -> new NormalizationLauncherWorker(
connectionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
Expand Down Expand Up @@ -48,8 +47,10 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber

final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList;
try {
AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId));
} catch (final ApiException e) {
AttemptNormalizationStatusReadList = AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)),
"get normalization statuses");
} catch (final Exception e) {
throw Activity.wrap(e);
}
final AtomicLong totalRecordsCommitted = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate;
Expand Down Expand Up @@ -54,8 +53,10 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut
try {
final Optional<StateWrapper> maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState());
if (maybeStateWrapper.isPresent()) {
final ConnectionState previousState = airbyteApiClient.getStateApi()
.getState(new ConnectionIdRequestBody().connectionId(connectionId));
final ConnectionState previousState =
AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)),
"get state");
if (featureFlags.needStateValidation() && previousState != null) {
final StateType newStateType = maybeStateWrapper.get().getStateType();
final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType());
Expand All @@ -65,12 +66,17 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut
}
}

airbyteApiClient.getStateApi().createOrUpdateState(
new ConnectionStateCreateOrUpdate()
.connectionId(connectionId)
.connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null))));
AirbyteApiClient.retryWithJitter(
() -> {
airbyteApiClient.getStateApi().createOrUpdateState(
new ConnectionStateCreateOrUpdate()
.connectionId(connectionId)
.connectionState(StateConverter.toClient(connectionId, maybeStateWrapper.orElse(null))));
return null;
},
"create or update state");
}
} catch (final ApiException e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
throws ApiException {
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(Long.valueOf(jobRunConfig.getJobId()));
final var jobInfo = airbyteApiClient.getJobsApi().getJobInfoLight(id);
final var jobInfo = AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getJobsApi().getJobInfoLight(id),
"get job info light");
LOGGER.info("received response from from jobsApi.getJobInfoLight: {}", jobInfo);
final var jobScope = jobInfo.getJob().getConfigId();
final var connectionId = UUID.fromString(jobScope);
Expand Down