Skip to content

Commit f9939c7

Browse files
authored
Bmoric/right error for refresh (#22471)
* Get a better failure reason for the refresh schema error * Use share empty stats * Format * Add missing import * fix pmd
1 parent 3d2a995 commit f9939c7

File tree

6 files changed

+63
-28
lines changed

6 files changed

+63
-28
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4167,6 +4167,7 @@ components:
41674167
- config_error
41684168
- system_error
41694169
- manual_cancellation
4170+
- refresh_schema
41704171
AttemptStatus:
41714172
type: string
41724173
enum:

airbyte-config/config-models/src/main/resources/types/FailureReason.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ properties:
2626
- config_error
2727
- system_error
2828
- manual_cancellation
29+
- refresh_schema
2930
internalMessage:
3031
description: Human readable failure description for consumption by technical system operators, like Airbyte engineers or OSS users.
3132
type: string

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44

55
package io.airbyte.workers.temporal.scheduling;
66

7+
import static io.airbyte.workers.temporal.sync.SyncOutputProvider.EMPTY_FAILED_SYNC;
8+
79
import io.airbyte.config.ConnectorJobOutput;
810
import io.airbyte.config.ConnectorJobOutput.OutputType;
911
import io.airbyte.config.FailureReason;
1012
import io.airbyte.config.StandardCheckConnectionOutput;
1113
import io.airbyte.config.StandardSyncOutput;
12-
import io.airbyte.config.StandardSyncSummary;
13-
import io.airbyte.config.SyncStats;
1414
import io.airbyte.persistence.job.models.JobRunConfig;
1515
import io.airbyte.workers.helper.FailureHelper;
1616
import java.util.List;
@@ -58,19 +58,7 @@ public StandardSyncOutput buildFailureOutput() {
5858
}
5959

6060
final StandardSyncOutput syncOutput = new StandardSyncOutput()
61-
.withStandardSyncSummary(
62-
new StandardSyncSummary()
63-
.withStatus(StandardSyncSummary.ReplicationStatus.FAILED)
64-
.withStartTime(System.currentTimeMillis())
65-
.withEndTime(System.currentTimeMillis())
66-
.withRecordsSynced(0L)
67-
.withBytesSynced(0L)
68-
.withTotalStats(new SyncStats()
69-
.withRecordsEmitted(0L)
70-
.withBytesEmitted(0L)
71-
.withSourceStateMessagesEmitted(0L)
72-
.withDestinationStateMessagesEmitted(0L)
73-
.withRecordsCommitted(0L)));
61+
.withStandardSyncSummary(EMPTY_FAILED_SYNC);
7462

7563
if (failureOutput.getFailureReason() != null) {
7664
syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin)));
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.sync;
6+
7+
import io.airbyte.config.FailureReason;
8+
import io.airbyte.config.StandardSyncOutput;
9+
import io.airbyte.config.StandardSyncSummary;
10+
import io.airbyte.config.SyncStats;
11+
import java.util.List;
12+
13+
public class SyncOutputProvider {
14+
15+
public final static StandardSyncSummary EMPTY_FAILED_SYNC = new StandardSyncSummary()
16+
.withStatus(StandardSyncSummary.ReplicationStatus.FAILED)
17+
.withStartTime(System.currentTimeMillis())
18+
.withEndTime(System.currentTimeMillis())
19+
.withRecordsSynced(0L)
20+
.withBytesSynced(0L)
21+
.withTotalStats(new SyncStats()
22+
.withRecordsEmitted(0L)
23+
.withBytesEmitted(0L)
24+
.withSourceStateMessagesEmitted(0L)
25+
.withDestinationStateMessagesEmitted(0L)
26+
.withRecordsCommitted(0L));
27+
28+
public static StandardSyncOutput getRefreshSchemaFailure(final Exception e) {
29+
return new StandardSyncOutput()
30+
.withFailures(List.of(new FailureReason()
31+
.withFailureType(FailureReason.FailureType.REFRESH_SCHEMA)
32+
.withFailureOrigin(FailureReason.FailureOrigin.SOURCE)
33+
.withExternalMessage("Failed to detect if there is a schema change. If the error persist please contact the support team.")
34+
.withInternalMessage("Failed to launch the refresh schema activity because of: " + e.getMessage())
35+
.withStacktrace(e.toString())))
36+
.withStandardSyncSummary(EMPTY_FAILED_SYNC);
37+
}
38+
39+
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
9292

9393
if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
9494
LOGGER.info("Refreshing source schema...");
95-
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
95+
try {
96+
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
97+
} catch (final Exception e) {
98+
return SyncOutputProvider.getRefreshSchemaFailure(e);
99+
}
96100
}
97101

98102
final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);

airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,9 @@
2020
import io.airbyte.commons.json.Jsons;
2121
import io.airbyte.commons.temporal.TemporalUtils;
2222
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
23-
import io.airbyte.config.NormalizationInput;
24-
import io.airbyte.config.NormalizationSummary;
25-
import io.airbyte.config.OperatorDbtInput;
26-
import io.airbyte.config.OperatorWebhook;
27-
import io.airbyte.config.OperatorWebhookInput;
28-
import io.airbyte.config.ResourceRequirements;
29-
import io.airbyte.config.StandardSync;
30-
import io.airbyte.config.StandardSyncInput;
31-
import io.airbyte.config.StandardSyncOperation;
23+
import io.airbyte.config.*;
3224
import io.airbyte.config.StandardSyncOperation.OperatorType;
33-
import io.airbyte.config.StandardSyncOutput;
34-
import io.airbyte.config.StandardSyncSummary;
3525
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
36-
import io.airbyte.config.SyncStats;
3726
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
3827
import io.airbyte.persistence.job.models.JobRunConfig;
3928
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -181,6 +170,7 @@ void setUp() {
181170
.build();
182171
discoveryActivityOptions = ActivityOptions.newBuilder()
183172
.setStartToCloseTimeout(Duration.ofSeconds(360))
173+
.setRetryOptions(TemporalUtils.NO_RETRY)
184174
.build();
185175

186176
final BeanIdentifier longActivitiesBeanIdentifier = mock(BeanIdentifier.class);
@@ -418,6 +408,18 @@ void testSkipReplicationAfterRefreshSchema() {
418408
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
419409
}
420410

411+
@Test
412+
void testGetProperFailureIfRefreshFails() {
413+
when(refreshSchemaActivity.shouldRefreshSchema(any())).thenReturn(true);
414+
doThrow(new RuntimeException())
415+
.when(refreshSchemaActivity).refreshSchema(any(), any());
416+
final StandardSyncOutput output = execute();
417+
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.FAILED);
418+
assertEquals(output.getFailures().size(), 1);
419+
assertEquals(output.getFailures().get(0).getFailureOrigin(), FailureReason.FailureOrigin.SOURCE);
420+
assertEquals(output.getFailures().get(0).getFailureType(), FailureReason.FailureType.REFRESH_SCHEMA);
421+
}
422+
421423
@SuppressWarnings("ResultOfMethodCallIgnored")
422424
private void cancelWorkflow() {
423425
final WorkflowServiceBlockingStub temporalService = testEnv.getWorkflowService().blockingStub();

0 commit comments

Comments
 (0)