Skip to content

Use ConnectionApi to fetch source ID #20670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
Expand Down Expand Up @@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -48,15 +51,18 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
private final JobPersistence jobPersistence;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -176,9 +182,11 @@ public GetMaxAttemptOutput getMaxAttempt() {
@Override
public Optional<UUID> getSourceId(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getSourceId());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -48,6 +49,9 @@ class ConfigFetchActivityTest {
@Mock
private Job mJob;

@Mock
private ConnectionApi mConnectionApi;

private ConfigFetchActivityImpl configFetchActivity;

private final static UUID connectionId = UUID.randomUUID();
Expand Down Expand Up @@ -93,7 +97,8 @@ class ConfigFetchActivityTest {
@BeforeEach
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond());
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond(),
mConnectionApi);
}

@Nested
Expand Down Expand Up @@ -161,7 +166,8 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
@Test
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -183,7 +189,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@Test
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -238,7 +245,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -267,7 +274,8 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
mockRightNow.set(Calendar.MILLISECOND, 0);

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L,
mConnectionApi);

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));
Expand All @@ -293,7 +301,8 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
mockRightNow.set(Calendar.MILLISECOND, 0);

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L,
mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -317,7 +326,8 @@ class TestGetMaxAttempt {
@DisplayName("Test that we are using to right service to get the maximum amount of attempt")
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond());
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down