Skip to content

Commit 85e6792

Browse files
authored
More comprehensive temporal error message (#18608)
* To be remove * Remove the signal for waiting after a failed activity and ensure we are waiting the expected time * Revert "To be remove" This reverts commit 3a5f7b4. * Remove unused and move failure reason to the helper * Avoid repetitive new Set()
1 parent 5e5a5e3 commit 85e6792

File tree

6 files changed

+38
-70
lines changed

6 files changed

+38
-70
lines changed

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ public interface ConnectionManagerWorkflow {
6262
@SignalMethod
6363
void resetConnectionAndSkipNextScheduling();
6464

65-
/**
66-
* If an activity fails the workflow will be stuck. This signal activity can be used to retry the
67-
* activity.
68-
*/
69-
@SignalMethod
70-
void retryFailedActivity();
71-
7265
/**
7366
* Return the current state of the workflow.
7467
*/

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
3232
private final boolean resetConnection = false;
3333
@Deprecated
3434
private final boolean continueAsReset = false;
35-
private boolean retryFailedActivity = false;
3635
private boolean quarantined = false;
3736
private boolean success = true;
3837
private boolean cancelledForReset = false;
@@ -89,14 +88,6 @@ public void setFailed(final boolean failed) {
8988
this.failed = failed;
9089
}
9190

92-
public void setRetryFailedActivity(final boolean retryFailedActivity) {
93-
final ChangedStateEvent event = new ChangedStateEvent(
94-
StateField.RETRY_FAILED_ACTIVITY,
95-
retryFailedActivity);
96-
stateChangedListener.addEvent(id, event);
97-
this.retryFailedActivity = retryFailedActivity;
98-
}
99-
10091
public void setQuarantined(final boolean quarantined) {
10192
final ChangedStateEvent event = new ChangedStateEvent(
10293
StateField.QUARANTINED,
@@ -146,7 +137,6 @@ public void reset() {
146137
this.setUpdated(false);
147138
this.setCancelled(false);
148139
this.setFailed(false);
149-
this.setRetryFailedActivity(false);
150140
this.setSuccess(false);
151141
this.setQuarantined(false);
152142
this.setDoneWaiting(false);

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ enum StateField {
3131
FAILED,
3232
RESET,
3333
CONTINUE_AS_RESET,
34-
RETRY_FAILED_ACTIVITY,
3534
QUARANTINED,
3635
SUCCESS,
3736
CANCELLED_FOR_RESET,

airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ public static FailureReason failureReasonFromWorkflowAndActivity(
227227
}
228228
}
229229

230+
public static FailureReason platformFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
231+
return genericFailure(t, jobId, attemptNumber)
232+
.withFailureOrigin(FailureOrigin.AIRBYTE_PLATFORM)
233+
.withExternalMessage("Something went wrong within the airbyte platform");
234+
}
235+
230236
private static Metadata jobAndAttemptMetadata(final Long jobId, final Integer attemptNumber) {
231237
return new Metadata()
232238
.withAdditionalProperty(JOB_ID_METADATA_KEY, jobId)

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import io.temporal.workflow.Workflow;
8686
import java.time.Duration;
8787
import java.time.Instant;
88+
import java.util.HashSet;
8889
import java.util.Map;
8990
import java.util.Optional;
9091
import java.util.Set;
@@ -327,23 +328,31 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput,
327328
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
328329
final StandardSyncOutput standardSyncOutput,
329330
final FailureCause failureCause) {
331+
reportFailure(connectionUpdaterInput, standardSyncOutput, failureCause, new HashSet<>());
332+
}
333+
334+
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
335+
final StandardSyncOutput standardSyncOutput,
336+
final FailureCause failureCause,
337+
final Set<FailureReason> failureReasonsOverride) {
330338
final int attemptCreationVersion =
331339
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);
332340

341+
final Set<FailureReason> failureReasons = failureReasonsOverride.isEmpty() ? workflowInternalState.getFailures() : failureReasonsOverride;
333342
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
334343
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
335344
workflowInternalState.getJobId(),
336345
workflowInternalState.getAttemptNumber(),
337346
connectionUpdaterInput.getConnectionId(),
338347
standardSyncOutput,
339-
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
348+
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
340349
} else {
341350
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
342351
workflowInternalState.getJobId(),
343352
workflowInternalState.getAttemptNumber(),
344353
connectionUpdaterInput.getConnectionId(),
345354
standardSyncOutput,
346-
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
355+
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
347356
}
348357

349358
final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
@@ -539,13 +548,6 @@ public void resetConnectionAndSkipNextScheduling() {
539548
}
540549
}
541550

542-
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
543-
@Override
544-
public void retryFailedActivity() {
545-
traceConnectionId();
546-
workflowState.setRetryFailedActivity(true);
547-
}
548-
549551
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
550552
@Override
551553
public WorkflowState getState() {
@@ -625,13 +627,14 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INP
625627
// overwhelming temporal.
626628
log.info("Waiting {} before restarting the workflow for connection {}, to prevent spamming temporal with restarts.", workflowDelay,
627629
connectionId);
628-
Workflow.await(workflowDelay, () -> workflowState.isRetryFailedActivity());
629-
630-
// Accept a manual signal to retry the failed activity during this window
631-
if (workflowState.isRetryFailedActivity()) {
632-
log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId);
633-
workflowState.setRetryFailedActivity(false);
634-
return runMandatoryActivityWithOutput(mapper, input);
630+
Workflow.sleep(workflowDelay);
631+
632+
// If a jobId exist set the failure reason
633+
if (workflowInternalState.getJobId() != null) {
634+
final ConnectionUpdaterInput connectionUpdaterInput = connectionUpdaterInputFromState();
635+
final FailureReason failureReason =
636+
FailureHelper.platformFailure(e, workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber());
637+
reportFailure(connectionUpdaterInput, null, FailureCause.ACTIVITY, Set.of(failureReason));
635638
}
636639

637640
log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId);
@@ -645,6 +648,16 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INP
645648
}
646649
}
647650

651+
private ConnectionUpdaterInput connectionUpdaterInputFromState() {
652+
return ConnectionUpdaterInput.builder()
653+
.connectionId(connectionId)
654+
.jobId(workflowInternalState.getJobId())
655+
.attemptNumber(workflowInternalState.getAttemptNumber())
656+
.fromFailure(false)
657+
.build();
658+
659+
}
660+
648661
/**
649662
* Similar to runMandatoryActivityWithOutput but for methods that don't return
650663
*/

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ void runSuccess() throws InterruptedException {
264264
returnTrueForLastJobOrAttemptFailure();
265265
when(mConfigFetchActivity.getTimeToWait(Mockito.any()))
266266
.thenReturn(new ScheduleRetrieverOutput(SCHEDULE_WAIT));
267+
when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(1));
267268

268269
final UUID testId = UUID.randomUUID();
269270
final TestStateListener testStateListener = new TestStateListener();
@@ -308,6 +309,7 @@ void retryAfterFail() throws InterruptedException {
308309
returnTrueForLastJobOrAttemptFailure();
309310
when(mConfigFetchActivity.getTimeToWait(Mockito.any()))
310311
.thenReturn(new ScheduleRetrieverOutput(SCHEDULE_WAIT));
312+
when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(1));
311313

312314
final UUID testId = UUID.randomUUID();
313315
final TestStateListener testStateListener = new TestStateListener();
@@ -1475,6 +1477,7 @@ void testWorkflowRestartedAfterFailedActivity(final Thread mockSetup) throws Int
14751477
mockSetup.run();
14761478
when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput(
14771479
Duration.ZERO));
1480+
when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(1));
14781481

14791482
final UUID testId = UUID.randomUUID();
14801483
TestStateListener.reset();
@@ -1505,42 +1508,6 @@ void testWorkflowRestartedAfterFailedActivity(final Thread mockSetup) throws Int
15051508
assertWorkflowWasContinuedAsNew();
15061509
}
15071510

1508-
@Test
1509-
void testCanRetryFailedActivity() throws InterruptedException {
1510-
returnTrueForLastJobOrAttemptFailure();
1511-
when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any()))
1512-
.thenThrow(ApplicationFailure.newNonRetryableFailure("", ""))
1513-
.thenReturn(new JobCreationOutput(1l));
1514-
1515-
when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput(
1516-
Duration.ZERO));
1517-
1518-
final UUID testId = UUID.randomUUID();
1519-
final TestStateListener testStateListener = new TestStateListener();
1520-
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);
1521-
1522-
final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
1523-
.connectionId(UUID.randomUUID())
1524-
.jobId(null)
1525-
.attemptId(null)
1526-
.fromFailure(false)
1527-
.attemptNumber(1)
1528-
.workflowState(workflowState)
1529-
.build();
1530-
1531-
startWorkflowAndWaitUntilReady(workflow, input);
1532-
1533-
// Sleep test env for half of restart delay, so that we know we are in the middle of the delay
1534-
testEnv.sleep(WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2));
1535-
workflow.retryFailedActivity();
1536-
Thread.sleep(500); // any time after no-waiting manual run
1537-
final Queue<ChangedStateEvent> events = testStateListener.events(testId);
1538-
1539-
Assertions.assertThat(events)
1540-
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue())
1541-
.hasSize(1);
1542-
}
1543-
15441511
}
15451512

15461513
private class HasFailureFromOrigin implements ArgumentMatcher<AttemptNumberFailureInput> {

0 commit comments

Comments
 (0)