Skip to content

Commit c0838f8

Browse files
Disable auto detect schema activity bits (#20615)
* Disable auto detect schema activity bits * Disable impacted tests * Disable auto detect schema checks * Add comment as to why code has been disabled * Fix PMD warnings * Fix PMD warning Co-authored-by: Peter Hu <[email protected]>
1 parent b7113a2 commit c0838f8

File tree

4 files changed

+55
-43
lines changed

4 files changed

+55
-43
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public List<Object> checkConnectionActivities(
5858
@Requires(env = WorkerMode.CONTROL_PLANE)
5959
@Named("notifyActivities")
6060
public List<Object> notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity,
61-
SlackConfigActivity slackConfigActivity,
62-
ConfigFetchActivity configFetchActivity) {
61+
final SlackConfigActivity slackConfigActivity,
62+
final ConfigFetchActivity configFetchActivity) {
6363
return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity);
6464
}
6565

@@ -112,10 +112,12 @@ public List<Object> syncActivities(
112112
final PersistStateActivity persistStateActivity,
113113
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
114114
final WebhookOperationActivity webhookOperationActivity,
115-
final ConfigFetchActivity configFetchActivity,
115+
/*
116+
* Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity,
117+
*/
116118
final RefreshSchemaActivity refreshSchemaActivity) {
117119
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
118-
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
120+
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
119121
}
120122

121123
@Singleton

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
99

1010
import datadog.trace.api.Trace;
11+
import io.airbyte.commons.temporal.config.WorkerMode;
1112
import io.airbyte.commons.temporal.exception.RetryableException;
1213
import io.airbyte.config.Cron;
1314
import io.airbyte.config.StandardSync;
@@ -20,6 +21,7 @@
2021
import io.airbyte.persistence.job.JobPersistence;
2122
import io.airbyte.persistence.job.models.Job;
2223
import io.airbyte.validation.json.JsonValidationException;
24+
import io.micronaut.context.annotation.Requires;
2325
import io.micronaut.context.annotation.Value;
2426
import jakarta.inject.Named;
2527
import jakarta.inject.Singleton;
@@ -39,6 +41,7 @@
3941

4042
@Slf4j
4143
@Singleton
44+
@Requires(env = WorkerMode.CONTROL_PLANE)
4245
public class ConfigFetchActivityImpl implements ConfigFetchActivity {
4346

4447
private final static long MS_PER_SECOND = 1000L;

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,16 @@
1717
import io.airbyte.config.NormalizationSummary;
1818
import io.airbyte.config.OperatorDbtInput;
1919
import io.airbyte.config.OperatorWebhookInput;
20-
import io.airbyte.config.StandardSync.Status;
2120
import io.airbyte.config.StandardSyncInput;
2221
import io.airbyte.config.StandardSyncOperation;
2322
import io.airbyte.config.StandardSyncOperation.OperatorType;
2423
import io.airbyte.config.StandardSyncOutput;
25-
import io.airbyte.config.StandardSyncSummary;
26-
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
27-
import io.airbyte.config.SyncStats;
2824
import io.airbyte.config.WebhookOperationSummary;
2925
import io.airbyte.metrics.lib.ApmTraceUtils;
3026
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
3127
import io.airbyte.persistence.job.models.JobRunConfig;
3228
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
3329
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
34-
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
3530
import io.temporal.workflow.Workflow;
3631
import java.util.Map;
3732
import java.util.Optional;
@@ -62,10 +57,11 @@ public class SyncWorkflowImpl implements SyncWorkflow {
6257
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
6358
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
6459
private WebhookOperationActivity webhookOperationActivity;
65-
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
66-
private RefreshSchemaActivity refreshSchemaActivity;
67-
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
68-
private ConfigFetchActivity configFetchActivity;
60+
// Temporarily disabled to address OC issue #1210
61+
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
62+
// private RefreshSchemaActivity refreshSchemaActivity;
63+
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
64+
// private ConfigFetchActivity configFetchActivity;
6965

7066
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
7167
@Override
@@ -84,26 +80,30 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
8480
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
8581
final String taskQueue = Workflow.getInfo().getTaskQueue();
8682

83+
// Temporarily suppressed to address OC issue #1210
84+
@SuppressWarnings("PMD.UnusedLocalVariable")
8785
final int autoDetectSchemaVersion =
8886
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);
8987

90-
if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
91-
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
92-
93-
if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
94-
LOGGER.info("Refreshing source schema...");
95-
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
96-
}
97-
98-
final Optional<Status> status = configFetchActivity.getStatus(connectionId);
99-
if (!status.isEmpty() && Status.INACTIVE == status.get()) {
100-
LOGGER.info("Connection is disabled. Cancelling run.");
101-
final StandardSyncOutput output =
102-
new StandardSyncOutput()
103-
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
104-
return output;
105-
}
106-
}
88+
// Temporarily disabled to address OC issue #1210
89+
// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
90+
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
91+
//
92+
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
93+
// LOGGER.info("Refreshing source schema...");
94+
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
95+
// }
96+
//
97+
// final Optional<Status> status = configFetchActivity.getStatus(connectionId);
98+
// if (!status.isEmpty() && Status.INACTIVE == status.get()) {
99+
// LOGGER.info("Connection is disabled. Cancelling run.");
100+
// final StandardSyncOutput output =
101+
// new StandardSyncOutput()
102+
// .withStandardSyncSummary(new
103+
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
104+
// return output;
105+
// }
106+
// }
107107

108108
StandardSyncOutput syncOutput =
109109
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);

airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,9 @@ void testSuccess() {
229229
verifyNormalize(normalizationActivity, normalizationInput);
230230
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
231231
operatorDbtInput);
232-
verifyShouldRefreshSchema(refreshSchemaActivity);
233-
verifyRefreshSchema(refreshSchemaActivity, sync);
232+
// Temporarily disabled to address OC issue #1210
233+
// verifyShouldRefreshSchema(refreshSchemaActivity);
234+
// verifyRefreshSchema(refreshSchemaActivity, sync);
234235
assertEquals(
235236
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
236237
actualOutput.getStandardSyncSummary());
@@ -246,8 +247,9 @@ void testReplicationFailure() {
246247

247248
assertThrows(WorkflowFailedException.class, this::execute);
248249

249-
verifyShouldRefreshSchema(refreshSchemaActivity);
250-
verifyRefreshSchema(refreshSchemaActivity, sync);
250+
// Temporarily disabled to address OC issue #1210
251+
// verifyShouldRefreshSchema(refreshSchemaActivity);
252+
// verifyRefreshSchema(refreshSchemaActivity, sync);
251253
verifyReplication(replicationActivity, syncInput);
252254
verifyNoInteractions(persistStateActivity);
253255
verifyNoInteractions(normalizationActivity);
@@ -269,8 +271,9 @@ void testReplicationFailedGracefully() {
269271

270272
final StandardSyncOutput actualOutput = execute();
271273

272-
verifyShouldRefreshSchema(refreshSchemaActivity);
273-
verifyRefreshSchema(refreshSchemaActivity, sync);
274+
// Temporarily disabled to address OC issue #1210
275+
// verifyShouldRefreshSchema(refreshSchemaActivity);
276+
// verifyRefreshSchema(refreshSchemaActivity, sync);
274277
verifyReplication(replicationActivity, syncInput);
275278
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
276279
verifyNormalize(normalizationActivity, normalizationInput);
@@ -296,8 +299,9 @@ void testNormalizationFailure() {
296299

297300
assertThrows(WorkflowFailedException.class, this::execute);
298301

299-
verifyShouldRefreshSchema(refreshSchemaActivity);
300-
verifyRefreshSchema(refreshSchemaActivity, sync);
302+
// Temporarily disabled to address OC issue #1210
303+
// verifyShouldRefreshSchema(refreshSchemaActivity);
304+
// verifyRefreshSchema(refreshSchemaActivity, sync);
301305
verifyReplication(replicationActivity, syncInput);
302306
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
303307
verifyNormalize(normalizationActivity, normalizationInput);
@@ -317,8 +321,9 @@ void testCancelDuringReplication() {
317321

318322
assertThrows(WorkflowFailedException.class, this::execute);
319323

320-
verifyShouldRefreshSchema(refreshSchemaActivity);
321-
verifyRefreshSchema(refreshSchemaActivity, sync);
324+
// Temporarily disabled to address OC issue #1210
325+
// verifyShouldRefreshSchema(refreshSchemaActivity);
326+
// verifyRefreshSchema(refreshSchemaActivity, sync);
322327
verifyReplication(replicationActivity, syncInput);
323328
verifyNoInteractions(persistStateActivity);
324329
verifyNoInteractions(normalizationActivity);
@@ -343,8 +348,9 @@ void testCancelDuringNormalization() {
343348

344349
assertThrows(WorkflowFailedException.class, this::execute);
345350

346-
verifyShouldRefreshSchema(refreshSchemaActivity);
347-
verifyRefreshSchema(refreshSchemaActivity, sync);
351+
// Temporarily disabled to address OC issue #1210
352+
// verifyShouldRefreshSchema(refreshSchemaActivity);
353+
// verifyRefreshSchema(refreshSchemaActivity, sync);
348354
verifyReplication(replicationActivity, syncInput);
349355
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
350356
verifyNormalize(normalizationActivity, normalizationInput);
@@ -397,9 +403,10 @@ void testWebhookOperation() {
397403
}
398404

399405
@Test
406+
@Disabled("Temporarily disabled to address OC issue #1210")
400407
void testSkipReplicationAfterRefreshSchema() {
401408
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE));
402-
StandardSyncOutput output = execute();
409+
final StandardSyncOutput output = execute();
403410
verifyShouldRefreshSchema(refreshSchemaActivity);
404411
verifyRefreshSchema(refreshSchemaActivity, sync);
405412
verifyNoInteractions(replicationActivity);

0 commit comments

Comments
 (0)