-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add Retry Logic to Airbyte API calls. #19693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
982f7da
623464a
0ff8813
9e92aa6
ae94145
0b7741d
c744b02
44e8863
ca0ddbb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.api.client; | ||
|
||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.concurrent.Callable; | ||
import org.junit.jupiter.api.DisplayName; | ||
import org.junit.jupiter.api.Nested; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.Mock; | ||
|
||
public class AirbyteApiClientTest { | ||
|
||
// These set of configurations are so each test case takes ~3 secs. | ||
private static final int TEST_JITTER_INTERVAL_SECS = 1; | ||
private static final int TEST_FINAL_INTERVAL_SECS = 1; | ||
private static final int TEST_MAX_RETRIES = 2; | ||
@Mock | ||
private Callable mockCallable; | ||
|
||
@Nested | ||
class RetryWithJitter { | ||
|
||
@Test | ||
@DisplayName("Should not retry on success") | ||
void ifSucceedShouldNotRetry() throws Exception { | ||
mockCallable = mock(Callable.class); | ||
when(mockCallable.call()).thenReturn("Success!"); | ||
|
||
AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); | ||
|
||
verify(mockCallable, times(1)).call(); | ||
} | ||
|
||
@Test | ||
@DisplayName("Should retry up to the configured max retries on continued errors") | ||
void onlyRetryTillMaxRetries() throws Exception { | ||
mockCallable = mock(Callable.class); | ||
when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!")); | ||
|
||
AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, TEST_MAX_RETRIES); | ||
|
||
verify(mockCallable, times(TEST_MAX_RETRIES)).call(); | ||
|
||
} | ||
|
||
@Test | ||
@DisplayName("Should retry only if there are errors") | ||
void onlyRetryOnErrors() throws Exception { | ||
mockCallable = mock(Callable.class); | ||
// Because we succeed on the second try, we should only call the method twice. | ||
when(mockCallable.call()) | ||
.thenThrow(new RuntimeException("Bomb!")) | ||
.thenReturn("Success!"); | ||
|
||
AirbyteApiClient.retryWithJitter(mockCallable, "test", TEST_JITTER_INTERVAL_SECS, TEST_FINAL_INTERVAL_SECS, 3); | ||
|
||
verify(mockCallable, times(2)).call(); | ||
|
||
} | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ | |
|
||
import datadog.trace.api.Trace; | ||
import io.airbyte.api.client.AirbyteApiClient; | ||
import io.airbyte.api.client.invoker.generated.ApiException; | ||
import io.airbyte.api.client.model.generated.JobIdRequestBody; | ||
import io.airbyte.commons.functional.CheckedSupplier; | ||
import io.airbyte.commons.json.Jsons; | ||
|
@@ -153,11 +152,13 @@ private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getContainerL | |
final WorkerConfigs workerConfigs, | ||
final IntegrationLauncherConfig destinationLauncherConfig, | ||
final JobRunConfig jobRunConfig, | ||
final Supplier<ActivityExecutionContext> activityContext) | ||
throws ApiException { | ||
final Supplier<ActivityExecutionContext> activityContext) { | ||
final JobIdRequestBody id = new JobIdRequestBody(); | ||
id.setId(Long.valueOf(jobRunConfig.getJobId())); | ||
final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); | ||
|
||
final var jobScope = AirbyteApiClient.retryWithJitter( | ||
() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here too |
||
"get job scope"); | ||
final var connectionId = UUID.fromString(jobScope); | ||
|
||
return () -> new DbtLauncherWorker( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ | |
|
||
import datadog.trace.api.Trace; | ||
import io.airbyte.api.client.AirbyteApiClient; | ||
import io.airbyte.api.client.invoker.generated.ApiException; | ||
import io.airbyte.api.client.model.generated.JobIdRequestBody; | ||
import io.airbyte.commons.functional.CheckedSupplier; | ||
import io.airbyte.commons.json.Jsons; | ||
|
@@ -162,11 +161,12 @@ private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Except | |
final WorkerConfigs workerConfigs, | ||
final IntegrationLauncherConfig destinationLauncherConfig, | ||
final JobRunConfig jobRunConfig, | ||
final Supplier<ActivityExecutionContext> activityContext) | ||
throws ApiException { | ||
final Supplier<ActivityExecutionContext> activityContext) { | ||
final JobIdRequestBody id = new JobIdRequestBody(); | ||
id.setId(Long.valueOf(jobRunConfig.getJobId())); | ||
final var jobScope = airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(); | ||
final var jobScope = AirbyteApiClient.retryWithJitter( | ||
() -> airbyteApiClient.getJobsApi().getJobInfo(id).getJob().getConfigId(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pmossman I feel like we can switch this to getJobInfoLight? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If so, will do in a follow up PR. |
||
"get job scope"); | ||
final var connectionId = UUID.fromString(jobScope); | ||
return () -> new NormalizationLauncherWorker( | ||
connectionId, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this return null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into this and I think the alternatives are messier.
We can turn the interface into a Runnable, which doesn't expect an return value. However Runnables do not support checked exceptions, which means we would have to catch and recast the exception thrown by
saveWorkflowIdForCancellation
into a RTE.Given that, I think it's simpler to keep this as is!
If you have a suggestion, I'm happy to edit this!