Skip to content

Commit c6715bd

Browse files
authored
Remove config repo dependency for getStatus (#21033)
1 parent 40b4ad1 commit c6715bd

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package io.airbyte.workers.temporal.scheduling.activities;
66

7+
import io.airbyte.api.client.model.generated.ConnectionStatus;
78
import io.airbyte.config.StandardSync;
8-
import io.airbyte.config.StandardSync.Status;
99
import io.airbyte.config.persistence.ConfigNotFoundException;
1010
import io.airbyte.validation.json.JsonValidationException;
1111
import io.temporal.activity.ActivityInterface;
@@ -25,7 +25,7 @@ public interface ConfigFetchActivity {
2525
Optional<UUID> getSourceId(UUID connectionId);
2626

2727
@ActivityMethod
28-
Optional<Status> getStatus(UUID connectionId);
28+
Optional<ConnectionStatus> getStatus(UUID connectionId);
2929

3030
@Data
3131
@NoArgsConstructor

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.airbyte.api.client.generated.ConnectionApi;
1313
import io.airbyte.api.client.invoker.generated.ApiException;
1414
import io.airbyte.api.client.model.generated.ConnectionRead;
15+
import io.airbyte.api.client.model.generated.ConnectionStatus;
1516
import io.airbyte.commons.temporal.config.WorkerMode;
1617
import io.airbyte.commons.temporal.exception.RetryableException;
1718
import io.airbyte.config.Cron;
@@ -248,11 +249,13 @@ public Optional<UUID> getSourceId(final UUID connectionId) {
248249
}
249250

250251
@Override
251-
public Optional<Status> getStatus(final UUID connectionId) {
252+
public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
252253
try {
253-
final StandardSync standardSync = getStandardSync(connectionId);
254-
return Optional.ofNullable(standardSync.getStatus());
255-
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
254+
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
255+
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
256+
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
257+
return Optional.ofNullable(connectionRead.getStatus());
258+
} catch (ApiException e) {
256259
log.info("Encountered an error fetching the connection's status: ", e);
257260
return Optional.empty();
258261
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.mockito.Mockito.when;
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
19+
import io.airbyte.api.client.model.generated.ConnectionStatus;
1920
import io.airbyte.commons.json.Jsons;
2021
import io.airbyte.commons.temporal.TemporalUtils;
2122
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
@@ -26,7 +27,6 @@
2627
import io.airbyte.config.OperatorWebhookInput;
2728
import io.airbyte.config.ResourceRequirements;
2829
import io.airbyte.config.StandardSync;
29-
import io.airbyte.config.StandardSync.Status;
3030
import io.airbyte.config.StandardSyncInput;
3131
import io.airbyte.config.StandardSyncOperation;
3232
import io.airbyte.config.StandardSyncOperation.OperatorType;
@@ -159,7 +159,7 @@ void setUp() {
159159

160160
when(configFetchActivity.getSourceId(sync.getConnectionId())).thenReturn(Optional.of(SOURCE_ID));
161161
when(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)).thenReturn(true);
162-
when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(Status.ACTIVE));
162+
when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(ConnectionStatus.ACTIVE));
163163

164164
longActivityOptions = ActivityOptions.newBuilder()
165165
.setScheduleToCloseTimeout(Duration.ofDays(3))
@@ -405,7 +405,7 @@ void testWebhookOperation() {
405405
@Test
406406
@Disabled("Temporarily disabled to address OC issue #1210")
407407
void testSkipReplicationAfterRefreshSchema() {
408-
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE));
408+
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
409409
final StandardSyncOutput output = execute();
410410
verifyShouldRefreshSchema(refreshSchemaActivity);
411411
verifyRefreshSchema(refreshSchemaActivity, sync);

0 commit comments

Comments
 (0)