Skip to content

Commit 4b23365

Browse files
committed
Use ConnectionApi to fetch source ID (#20670)
1 parent 8d658b7 commit 4b23365

File tree

3 files changed

+36
-15
lines changed

3 files changed

+36
-15
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.auth0.jwt.algorithms.Algorithm;
1010
import com.google.auth.oauth2.ServiceAccountCredentials;
1111
import io.airbyte.api.client.AirbyteApiClient;
12+
import io.airbyte.api.client.generated.ConnectionApi;
1213
import io.airbyte.api.client.generated.SourceApi;
1314
import io.airbyte.api.client.invoker.generated.ApiClient;
1415
import io.airbyte.commons.temporal.config.WorkerMode;
@@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
6768
return new SourceApi(apiClient);
6869
}
6970

71+
@Singleton
72+
public ConnectionApi connectionApi(final ApiClient apiClient) {
73+
return new ConnectionApi(apiClient);
74+
}
75+
7076
@Singleton
7177
public HttpClient httpClient() {
7278
return HttpClient.newHttpClient();

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import com.google.common.annotations.VisibleForTesting;
1111
import datadog.trace.api.Trace;
1212
import io.airbyte.commons.temporal.config.WorkerMode;
13+
import io.airbyte.api.client.generated.ConnectionApi;
14+
import io.airbyte.api.client.invoker.generated.ApiException;
15+
import io.airbyte.api.client.model.generated.ConnectionRead;
1316
import io.airbyte.commons.temporal.exception.RetryableException;
1417
import io.airbyte.config.Cron;
1518
import io.airbyte.config.StandardSync;
@@ -65,25 +68,29 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
6568
private final WorkspaceHelper workspaceHelper;
6669
private final Integer syncJobMaxAttempts;
6770
private final Supplier<Long> currentSecondsSupplier;
71+
private final ConnectionApi connectionApi;
6872

6973
public ConfigFetchActivityImpl(final ConfigRepository configRepository,
7074
final JobPersistence jobPersistence,
7175
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
72-
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
73-
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier);
76+
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
77+
final ConnectionApi connectionApi) {
78+
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, connectionApi);
7479
}
7580

7681
@VisibleForTesting
7782
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
7883
final JobPersistence jobPersistence,
7984
final WorkspaceHelper workspaceHelper,
8085
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
81-
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
86+
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
87+
final ConnectionApi connectionApi) {
8288
this.configRepository = configRepository;
8389
this.jobPersistence = jobPersistence;
8490
this.workspaceHelper = workspaceHelper;
8591
this.syncJobMaxAttempts = syncJobMaxAttempts;
8692
this.currentSecondsSupplier = currentSecondsSupplier;
93+
this.connectionApi = connectionApi;
8794
}
8895

8996
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@@ -229,9 +236,11 @@ public GetMaxAttemptOutput getMaxAttempt() {
229236
@Override
230237
public Optional<UUID> getSourceId(final UUID connectionId) {
231238
try {
232-
final StandardSync standardSync = getStandardSync(connectionId);
233-
return Optional.ofNullable(standardSync.getSourceId());
234-
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
239+
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
240+
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
241+
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
242+
return Optional.ofNullable(connectionRead.getSourceId());
243+
} catch (ApiException e) {
235244
log.info("Encountered an error fetching the connection's Source ID: ", e);
236245
return Optional.empty();
237246
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import static org.mockito.ArgumentMatchers.any;
88
import static org.mockito.Mockito.when;
9-
9+
import io.airbyte.api.client.generated.ConnectionApi;
1010
import io.airbyte.config.BasicSchedule;
1111
import io.airbyte.config.Cron;
1212
import io.airbyte.config.Schedule;
@@ -55,6 +55,9 @@ class ConfigFetchActivityTest {
5555
@Mock
5656
private Job mJob;
5757

58+
@Mock
59+
private ConnectionApi mConnectionApi;
60+
5861
private ConfigFetchActivityImpl configFetchActivity;
5962

6063
private final static UUID connectionId = UUID.randomUUID();
@@ -102,7 +105,7 @@ class ConfigFetchActivityTest {
102105
void setup() {
103106
configFetchActivity =
104107
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
105-
() -> Instant.now().getEpochSecond());
108+
() -> Instant.now().getEpochSecond(), mConnectionApi);
106109
}
107110

108111
@Nested
@@ -170,7 +173,8 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
170173
@Test
171174
@DisplayName("Test we will wait the required amount of time with legacy config")
172175
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
173-
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
176+
configFetchActivity =
177+
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
174178

175179
when(mJob.getStartedAtInSecond())
176180
.thenReturn(Optional.of(60L));
@@ -192,7 +196,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
192196
@Test
193197
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
194198
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
195-
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);
199+
configFetchActivity =
200+
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);
196201

197202
when(mJob.getStartedAtInSecond())
198203
.thenReturn(Optional.of(60L));
@@ -247,7 +252,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
247252
@Test
248253
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
249254
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
250-
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
255+
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
251256

252257
when(mJob.getStartedAtInSecond())
253258
.thenReturn(Optional.of(60L));
@@ -279,7 +284,7 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
279284

280285
configFetchActivity =
281286
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
282-
() -> mockRightNow.getTimeInMillis() / 1000L);
287+
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);
283288

284289
when(mJobPersistence.getLastReplicationJob(connectionId))
285290
.thenReturn(Optional.of(mJob));
@@ -308,7 +313,7 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
308313

309314
configFetchActivity =
310315
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
311-
() -> mockRightNow.getTimeInMillis() / 1000L);
316+
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);
312317

313318
when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
314319
when(mJobPersistence.getLastReplicationJob(connectionId))
@@ -338,7 +343,7 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf
338343

339344
configFetchActivity =
340345
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
341-
() -> mockRightNow.getTimeInMillis() / 1000L);
346+
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);
342347

343348
when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
344349
when(mJobPersistence.getLastReplicationJob(connectionId))
@@ -362,7 +367,8 @@ class TestGetMaxAttempt {
362367
@DisplayName("Test that we are using to right service to get the maximum amount of attempt")
363368
void testGetMaxAttempt() {
364369
final int maxAttempt = 15031990;
365-
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond());
370+
configFetchActivity =
371+
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
366372
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
367373
.isEqualTo(maxAttempt);
368374
}

0 commit comments

Comments
 (0)