From 90d7cfe052834ea930483e1ae5ee8e0b763489af Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 4 Feb 2022 16:29:04 -0800 Subject: [PATCH 01/11] If an activity is failing, stuck the workflow and make it queriable --- .../scheduling/ConnectionManagerWorkflow.java | 32 ++ .../ConnectionManagerWorkflowImpl.java | 226 +++++--- .../scheduling/state/WorkflowState.java | 29 + .../state/listener/TestStateListener.java | 4 + .../WorkflowStateChangedListener.java | 5 +- .../ConnectionManagerWorkflowTest.java | 513 ++++++++++++------ 6 files changed, 549 insertions(+), 260 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java index 098e0c580e28e..82d176d538589 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java @@ -9,6 +9,7 @@ import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.util.UUID; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -55,6 +56,19 @@ public interface ConnectionManagerWorkflow { @SignalMethod void resetConnection(); + /** + * If an activity fails the workflow will be stuck. This signal activity can be used to retry the + * activity. + */ + @SignalMethod + void retryFailActivity(); + + /** + * Use for testing in order to simulate a job failure. + */ + @SignalMethod + void simulateFailure(); + /** * Return the current state of the workflow. */ @@ -77,4 +91,22 @@ class JobInformation { @QueryMethod JobInformation getJobInformation(); + @Data + @NoArgsConstructor + @AllArgsConstructor + class StuckInformation { + + private UUID connectionId; + private long jobId; + private int attemptId; + private boolean isStuck; + + } + + /** + * Return if a job is stuck or not with the job information + */ + @QueryMethod + StuckInformation getStuckInformation(); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index d06a2201702e0..7e55b084c49cf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -45,6 +45,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -96,82 +98,89 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr Workflow.await(scheduleRetrieverOutput.getTimeToWait(), () -> skipScheduling() || connectionUpdaterInput.isFromFailure()); - if (!workflowState.isUpdated() && !workflowState.isDeleted()) { - // Job and attempt creation - maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { - final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput( - connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); - connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); - return Optional.ofNullable(jobCreationOutput.getJobId()); - }); - - maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { - final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput( - jobId)); - connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); - return attemptCreationOutput.getAttemptId(); - })); - - // Sync workflow - final SyncInput getSyncInputActivitySyncInput = new SyncInput( - maybeAttemptId.get(), - maybeJobId.get(), - workflowState.isResetConnection()); - - jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput( - maybeJobId.get())); - - final SyncOutput syncWorkflowInputs = getSyncInputActivity.getSyncWorkflowInput(getSyncInputActivitySyncInput); - - workflowState.setRunning(true); - - final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, - ChildWorkflowOptions.newBuilder() - .setWorkflowId("sync_" + maybeJobId.get()) - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) - // This will cancel the child workflow when the parent is terminated - .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) - .build()); - - final UUID connectionId = connectionUpdaterInput.getConnectionId(); - - try { - standardSyncOutput = Optional.ofNullable(childSync.run( - syncWorkflowInputs.getJobRunConfig(), - syncWorkflowInputs.getSourceLauncherConfig(), - syncWorkflowInputs.getDestinationLauncherConfig(), - syncWorkflowInputs.getSyncInput(), - connectionId)); - - final StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); - - if (workflowState.isResetConnection()) { - workflowState.setResetConnection(false); - } - - if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { - failures.addAll(standardSyncOutput.get().getFailures()); - partialSuccess = standardSyncSummary.getTotalStats().getRecordsCommitted() > 0; - workflowState.setFailed(true); - } - } catch (final ChildWorkflowFailure childWorkflowFailure) { - if (childWorkflowFailure.getCause() instanceof CanceledFailure) { - // do nothing, cancellation handled by cancellationScope - - } else if (childWorkflowFailure.getCause() instanceof ActivityFailure) { - final ActivityFailure af = (ActivityFailure) childWorkflowFailure.getCause(); - failures.add(FailureHelper.failureReasonFromWorkflowAndActivity( - childWorkflowFailure.getWorkflowType(), - af.getActivityType(), - af.getCause(), - maybeJobId.get(), - maybeAttemptId.get())); - throw childWorkflowFailure; - } else { - failures.add( - FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), maybeJobId.get(), maybeAttemptId.get())); - throw childWorkflowFailure; - } + maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { + final JobCreationOutput jobCreationOutput = + runActivityWithOutput( + (input) -> jobCreationAndStatusUpdateActivity.createNewJob(input), + new JobCreationInput( + connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); + connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); + return Optional.ofNullable(jobCreationOutput.getJobId()); + }); + + maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { + final AttemptCreationOutput attemptCreationOutput = + runActivityWithOutput( + (input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input), + new AttemptCreationInput( + jobId)); + connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); + return attemptCreationOutput.getAttemptId(); + })); + + // Sync workflow + final SyncInput getSyncInputActivitySyncInput = new SyncInput( + maybeAttemptId.get(), + maybeJobId.get(), + workflowState.isResetConnection()); + + runActivity( + (input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input), + new ReportJobStartInput( + maybeJobId.get())); + + final SyncOutput syncWorkflowInputs = runActivityWithOutput( + (input) -> getSyncInputActivity.getSyncWorkflowInput(input), + getSyncInputActivitySyncInput); + + workflowState.setRunning(true); + + final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, + ChildWorkflowOptions.newBuilder() + .setWorkflowId("sync_" + maybeJobId.get()) + .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + // This will cancel the child workflow when the parent is terminated + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) + .build()); + + final UUID connectionId = connectionUpdaterInput.getConnectionId(); + + try { + standardSyncOutput = Optional.ofNullable(childSync.run( + syncWorkflowInputs.getJobRunConfig(), + syncWorkflowInputs.getSourceLauncherConfig(), + syncWorkflowInputs.getDestinationLauncherConfig(), + syncWorkflowInputs.getSyncInput(), + connectionId)); + + final StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); + + if (workflowState.isResetConnection()) { + workflowState.setResetConnection(false); + } + + if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { + failures.addAll(standardSyncOutput.get().getFailures()); + partialSuccess = standardSyncSummary.getTotalStats().getRecordsCommitted() > 0; + workflowState.setFailed(true); + } + } catch (final ChildWorkflowFailure childWorkflowFailure) { + if (childWorkflowFailure.getCause() instanceof CanceledFailure) { + // do nothing, cancellation handled by cancellationScope + + } else if (childWorkflowFailure.getCause() instanceof ActivityFailure) { + final ActivityFailure af = (ActivityFailure) childWorkflowFailure.getCause(); + failures.add(FailureHelper.failureReasonFromWorkflowAndActivity( + childWorkflowFailure.getWorkflowType(), + af.getActivityType(), + af.getCause(), + maybeJobId.get(), + maybeAttemptId.get())); + throw childWorkflowFailure; + } else { + failures.add( + FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), maybeJobId.get(), maybeAttemptId.get())); + throw childWorkflowFailure; } } }); @@ -203,13 +212,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } else if (workflowState.isDeleted()) { // Stop the runs final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId()); - connectionDeletionActivity.deleteConnection(connectionDeletionInput); + runActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput); return; } else if (workflowState.isCancelled()) { - jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( - maybeJobId.get(), - maybeAttemptId.get(), - failures.isEmpty() ? null : FailureHelper.failureSummary(failures, partialSuccess))); + runActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input), + new JobCancelledInput( + maybeJobId.get(), + maybeAttemptId.get(), + failures.isEmpty() ? null : FailureHelper.failureSummary(failures, partialSuccess))); resetNewConnectionInput(connectionUpdaterInput); } else if (workflowState.isFailed()) { reportFailure(connectionUpdaterInput); @@ -227,7 +237,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) { - jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput( + workflowState.setSuccess(true); + runActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput( maybeJobId.get(), maybeAttemptId.get(), standardSyncOutput.orElse(null))); @@ -236,7 +247,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) } private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) { - jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput( + runActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput( connectionUpdaterInput.getJobId(), connectionUpdaterInput.getAttemptId(), standardSyncOutput.orElse(null), @@ -250,7 +261,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) connectionUpdaterInput.setAttemptNumber(attemptNumber + 1); connectionUpdaterInput.setFromFailure(true); } else { - jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput( + runActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput( connectionUpdaterInput.getJobId(), "Job failed after too many retries for connection " + connectionId)); @@ -305,6 +316,16 @@ public void resetConnection() { } } + @Override + public void retryFailActivity() { + workflowState.setRetryFailedActivity(true); + } + + @Override + public void simulateFailure() { + workflowState.setFailed(true); + } + @Override public WorkflowState getState() { return workflowState; @@ -317,6 +338,15 @@ public JobInformation getJobInformation() { maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID)); } + @Override + public StuckInformation getStuckInformation() { + return new StuckInformation( + connectionId, + maybeJobId.orElse(NON_RUNNING_JOB_ID), + maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID), + workflowState.isStuck()); + } + private Boolean skipScheduling() { return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection(); } @@ -334,4 +364,32 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) } } + private OUTPUT runActivityWithOutput(Function mapper, INPUT input) { + try { + return mapper.apply(input); + } catch (Exception e) { + log.error("Failed to run an activity for the connection " + connectionId, e); + workflowState.setStuck(true); + Workflow.await(() -> workflowState.isRetryFailedActivity()); + log.error("Retrying an activity for the connection " + connectionId, e); + workflowState.setStuck(false); + workflowState.setRetryFailedActivity(false); + return runActivityWithOutput(mapper, input); + } + } + + private void runActivity(Consumer consumer, INPUT input) { + try { + consumer.accept(input); + } catch (Exception e) { + log.error("Failed to run an activity for the connection " + connectionId, e); + workflowState.setStuck(true); + Workflow.await(() -> workflowState.isRetryFailedActivity()); + log.error("Retrying an activity for the connection " + connectionId, e); + workflowState.setStuck(false); + workflowState.setRetryFailedActivity(false); + runActivity(consumer, input); + } + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index a80a7bbec58ab..2e3965d5728e0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -30,6 +30,9 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private boolean failed = false; private boolean resetConnection = false; private boolean continueAsReset = false; + private boolean retryFailedActivity = false; + private boolean stuck = false; + private boolean success = true; public void setRunning(final boolean running) { final ChangedStateEvent event = new ChangedStateEvent( @@ -95,6 +98,30 @@ public void setContinueAsReset(final boolean continueAsReset) { this.continueAsReset = continueAsReset; } + public void setRetryFailedActivity(final boolean retryFailedActivity) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.RETRY_FAILED_ACTIVITY, + retryFailedActivity); + stateChangedListener.addEvent(id, event); + this.retryFailedActivity = retryFailedActivity; + } + + public void setStuck(final boolean stuck) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.STUCK, + stuck); + stateChangedListener.addEvent(id, event); + this.stuck = stuck; + } + + public void setSuccess(final boolean success) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.SUCCESS, + success); + stateChangedListener.addEvent(id, event); + this.success = success; + } + public void reset() { this.setRunning(false); this.setDeleted(false); @@ -104,6 +131,8 @@ public void reset() { this.setFailed(false); this.setResetConnection(false); this.setContinueAsReset(false); + this.setRetryFailedActivity(false); + this.setSuccess(false); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java index 864510502385b..58397dbcc1e2e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java @@ -14,6 +14,10 @@ public class TestStateListener implements WorkflowStateChangedListener { private static final ConcurrentHashMap> events = new ConcurrentHashMap<>(); + public static void reset() { + events.clear(); + } + @Override public Queue events(final UUID testId) { if (!events.containsKey(testId)) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 99d4675c3542f..c0e30a0ef0b6e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -30,7 +30,10 @@ enum StateField { UPDATED, FAILED, RESET, - CONTINUE_AS_RESET + CONTINUE_AS_RESET, + RETRY_FAILED_ACTIVITY, + STUCK, + SUCCESS } @Value diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index ab67040ceef34..10f7d06d8e878 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -33,28 +33,35 @@ import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SourceAndDestinationFailureSyncWorkflow; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.Worker; import java.time.Duration; import java.util.Queue; import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; public class ConnectionManagerWorkflowTest { - private final ConfigFetchActivity mConfigFetchActivity = + private static final ConfigFetchActivity mConfigFetchActivity = Mockito.mock(ConfigFetchActivity.class, Mockito.withSettings().withoutAnnotations()); - private final ConnectionDeletionActivity mConnectionDeletionActivity = + private static final ConnectionDeletionActivity mConnectionDeletionActivity = Mockito.mock(ConnectionDeletionActivity.class, Mockito.withSettings().withoutAnnotations()); - private final GenerateInputActivityImpl mGenerateInputActivityImpl = + private static final GenerateInputActivityImpl mGenerateInputActivityImpl = Mockito.mock(GenerateInputActivityImpl.class, Mockito.withSettings().withoutAnnotations()); - private final JobCreationAndStatusUpdateActivity mJobCreationAndStatusUpdateActivity = + private static final JobCreationAndStatusUpdateActivity mJobCreationAndStatusUpdateActivity = Mockito.mock(JobCreationAndStatusUpdateActivity.class, Mockito.withSettings().withoutAnnotations()); private TestWorkflowEnvironment testEnv; @@ -90,8 +97,9 @@ public void setUp() { new StandardSyncInput())); } + @AfterEach public void tearDown() { - testEnv.shutdown(); + TestStateListener.reset(); } @Nested @@ -117,6 +125,8 @@ public void setup() { WorkflowOptions.newBuilder() .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) .build()); + + Mockito.when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(2)); } @Test @@ -352,45 +362,33 @@ public void deleteSync() { testEnv.shutdown(); } - } - - @Nested - @DisplayName("Test which with a long running child workflow") - class SynchronousWorkflow { - - @BeforeEach - public void setup() { - testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - - client = testEnv.getWorkflowClient(); - - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, - mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); - testEnv.start(); - - workflow = client - .newWorkflowStub( - ConnectionManagerWorkflow.class, - WorkflowOptions.newBuilder() - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) - .build()); + public static Stream getSetupFailingFailingActivityBeforeRun() { + Thread.currentThread().run(); + return Stream.of( + Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), + Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttempt(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), + Arguments.of(new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).reportJobStart(Mockito.any()))), + Arguments.of(new Thread(() -> Mockito.when(mGenerateInputActivityImpl.getSyncWorkflowInput(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", ""))))); } - @Test - @DisplayName("Test workflow which recieved a manual while running does nothing") - public void manualRun() { + @ParameterizedTest + @MethodSource("getSetupFailingFailingActivityBeforeRun") + void testGetStuckBeforeRun(Thread mockSetup) { + mockSetup.run(); final UUID testId = UUID.randomUUID(); + TestStateListener.reset(); final TestStateListener testStateListener = new TestStateListener(); final WorkflowState workflowState = new WorkflowState(testId, testStateListener); final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + null, + null, false, 1, workflowState, @@ -398,53 +396,24 @@ public void manualRun() { WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofMinutes(2L)); - workflow.submitManualSync(); testEnv.shutdown(); final Queue events = testStateListener.events(testId); Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIPPED_SCHEDULING && changedStateEvent.isValue()) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) .isEmpty(); - } - - @Test - @DisplayName("Test that cancelling a running workflow cancel the sync") - public void cancelRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(10L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSize(1); } @Test - @DisplayName("Test that resetting a-non running workflow starts a reset") - public void resetStart() { + void testCanGetUnstuck() { + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .thenReturn(new JobCreationOutput(1l)); final UUID testId = UUID.randomUUID(); final TestStateListener testStateListener = new TestStateListener(); @@ -452,131 +421,313 @@ public void resetStart() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + null, + null, false, 1, workflowState, false); WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(80L)); + workflow.retryFailActivity(); testEnv.sleep(Duration.ofSeconds(30L)); - workflow.resetConnection(); - testEnv.sleep(Duration.ofSeconds(90L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); - - } - - @Test - @DisplayName("Test that resetting a running workflow starts cancel the running workflow") - public void resetCancelRunningWorkflow() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.resetConnection(); - testEnv.sleep(Duration.ofMinutes(2L)); testEnv.shutdown(); final Queue events = testStateListener.events(testId); Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1); - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); - - } - - @Test - @DisplayName("Test that cancelling a reset don't restart a reset") - public void cancelResetDontContinueAsReset() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - true)); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .isEmpty(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> !event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .hasSizeGreaterThanOrEqualTo(2); - } - - @Test - @DisplayName("Test workflow which recieved an update signal wait for the current run and report the job status") - public void updatedSignalRecievedWhileRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.connectionUpdated(); - testEnv.sleep(Duration.ofMinutes(1L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) + .hasSize(1); Assertions.assertThat(events) .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1); + } - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.UPDATED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); + @Nested + @DisplayName("Test which with a long running child workflow") + class SynchronousWorkflow { + + @BeforeEach + public void setup() { + testEnv = TestWorkflowEnvironment.newInstance(); + worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + // Register your workflow implementations + worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); + + client = testEnv.getWorkflowClient(); + + worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + testEnv.start(); + + workflow = client + .newWorkflowStub( + ConnectionManagerWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .build()); + } + + @Test + @DisplayName("Test workflow which recieved a manual while running does nothing") + public void manualRun() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofMinutes(2L)); + workflow.submitManualSync(); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIPPED_SCHEDULING && changedStateEvent.isValue()) + .isEmpty(); + + } + + @Test + @DisplayName("Test that cancelling a running workflow cancel the sync") + public void cancelRunning() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(10L)); + workflow.cancelJob(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); + } + + @Test + @DisplayName("Test that resetting a-non running workflow starts a reset") + public void resetStart() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.resetConnection(); + testEnv.sleep(Duration.ofSeconds(90L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); + + } + + @Test + @DisplayName("Test that resetting a running workflow starts cancel the running workflow") + public void resetCancelRunningWorkflow() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.resetConnection(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); + + } + + @Test + @DisplayName("Test that cancelling a reset don't restart a reset") + public void cancelResetDontContinueAsReset() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + true)); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.cancelJob(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + Assertions.assertThat(testStateListener.events(testId)) + .filteredOn((event) -> event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) + .isEmpty(); + + Assertions.assertThat(testStateListener.events(testId)) + .filteredOn((event) -> !event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) + .hasSizeGreaterThanOrEqualTo(2); + } + + @Test + @DisplayName("Test workflow which recieved an update signal wait for the current run and report the job status") + public void updatedSignalRecievedWhileRunning() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.connectionUpdated(); + testEnv.sleep(Duration.ofMinutes(1L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.UPDATED && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); + } + + public static Stream getSetupFailingFailingActivityAfterRun() { + Thread.currentThread().run(); + return Stream.of( + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any())))); + } + + @ParameterizedTest + @MethodSource("getSetupFailingFailingActivityAfterRun") + void testGetStuckAfterRun(Consumer signalSender, Thread mockSetup) { + mockSetup.run(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + null, + null, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(10L)); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + signalSender.accept(workflow); + testEnv.sleep(Duration.ofSeconds(50L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSize(1); + } - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); } } @@ -704,6 +855,18 @@ public void testReplicationFailureRecorded() { } + @Nested + @DisplayName("Test with failing activity") + class FailingActivityWorkflow { + + @Test + void testGetStuck() { + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenThrow(new RuntimeException()); + } + + } + private class HasFailureFromSource implements ArgumentMatcher { private final FailureOrigin expectedFailureOrigin; From d997ce1ec2c164697e08443cdd48eacaa1723a1e Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 15:02:52 -0800 Subject: [PATCH 02/11] Allow a workflow to be stuck --- .../ConnectionManagerWorkflowImpl.java | 169 +++++++++--------- .../ConnectionManagerWorkflowTest.java | 18 +- 2 files changed, 97 insertions(+), 90 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 7e55b084c49cf..21f12faa15efe 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -97,90 +97,91 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr final ScheduleRetrieverOutput scheduleRetrieverOutput = configFetchActivity.getTimeToWait(scheduleRetrieverInput); Workflow.await(scheduleRetrieverOutput.getTimeToWait(), () -> skipScheduling() || connectionUpdaterInput.isFromFailure()); - - maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { - final JobCreationOutput jobCreationOutput = - runActivityWithOutput( - (input) -> jobCreationAndStatusUpdateActivity.createNewJob(input), - new JobCreationInput( - connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); - connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); - return Optional.ofNullable(jobCreationOutput.getJobId()); - }); - - maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { - final AttemptCreationOutput attemptCreationOutput = - runActivityWithOutput( - (input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input), - new AttemptCreationInput( - jobId)); - connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); - return attemptCreationOutput.getAttemptId(); - })); - - // Sync workflow - final SyncInput getSyncInputActivitySyncInput = new SyncInput( - maybeAttemptId.get(), - maybeJobId.get(), - workflowState.isResetConnection()); - - runActivity( - (input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input), - new ReportJobStartInput( - maybeJobId.get())); - - final SyncOutput syncWorkflowInputs = runActivityWithOutput( - (input) -> getSyncInputActivity.getSyncWorkflowInput(input), - getSyncInputActivitySyncInput); - - workflowState.setRunning(true); - - final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, - ChildWorkflowOptions.newBuilder() - .setWorkflowId("sync_" + maybeJobId.get()) - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) - // This will cancel the child workflow when the parent is terminated - .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) - .build()); - - final UUID connectionId = connectionUpdaterInput.getConnectionId(); - - try { - standardSyncOutput = Optional.ofNullable(childSync.run( - syncWorkflowInputs.getJobRunConfig(), - syncWorkflowInputs.getSourceLauncherConfig(), - syncWorkflowInputs.getDestinationLauncherConfig(), - syncWorkflowInputs.getSyncInput(), - connectionId)); - - final StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); - - if (workflowState.isResetConnection()) { - workflowState.setResetConnection(false); - } - - if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { - failures.addAll(standardSyncOutput.get().getFailures()); - partialSuccess = standardSyncSummary.getTotalStats().getRecordsCommitted() > 0; - workflowState.setFailed(true); - } - } catch (final ChildWorkflowFailure childWorkflowFailure) { - if (childWorkflowFailure.getCause() instanceof CanceledFailure) { - // do nothing, cancellation handled by cancellationScope - - } else if (childWorkflowFailure.getCause() instanceof ActivityFailure) { - final ActivityFailure af = (ActivityFailure) childWorkflowFailure.getCause(); - failures.add(FailureHelper.failureReasonFromWorkflowAndActivity( - childWorkflowFailure.getWorkflowType(), - af.getActivityType(), - af.getCause(), - maybeJobId.get(), - maybeAttemptId.get())); - throw childWorkflowFailure; - } else { - failures.add( - FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), maybeJobId.get(), maybeAttemptId.get())); - throw childWorkflowFailure; + if (!workflowState.isUpdated() && !workflowState.isDeleted()) { + maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { + final JobCreationOutput jobCreationOutput = + runActivityWithOutput( + (input) -> jobCreationAndStatusUpdateActivity.createNewJob(input), + new JobCreationInput( + connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); + connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); + return Optional.ofNullable(jobCreationOutput.getJobId()); + }); + + maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { + final AttemptCreationOutput attemptCreationOutput = + runActivityWithOutput( + (input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input), + new AttemptCreationInput( + jobId)); + connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); + return attemptCreationOutput.getAttemptId(); + })); + + // Sync workflow + final SyncInput getSyncInputActivitySyncInput = new SyncInput( + maybeAttemptId.get(), + maybeJobId.get(), + workflowState.isResetConnection()); + + runActivity( + (input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input), + new ReportJobStartInput( + maybeJobId.get())); + + final SyncOutput syncWorkflowInputs = runActivityWithOutput( + (input) -> getSyncInputActivity.getSyncWorkflowInput(input), + getSyncInputActivitySyncInput); + + workflowState.setRunning(true); + + final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, + ChildWorkflowOptions.newBuilder() + .setWorkflowId("sync_" + maybeJobId.get()) + .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + // This will cancel the child workflow when the parent is terminated + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) + .build()); + + final UUID connectionId = connectionUpdaterInput.getConnectionId(); + + try { + standardSyncOutput = Optional.ofNullable(childSync.run( + syncWorkflowInputs.getJobRunConfig(), + syncWorkflowInputs.getSourceLauncherConfig(), + syncWorkflowInputs.getDestinationLauncherConfig(), + syncWorkflowInputs.getSyncInput(), + connectionId)); + + final StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); + + if (workflowState.isResetConnection()) { + workflowState.setResetConnection(false); + } + + if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { + failures.addAll(standardSyncOutput.get().getFailures()); + partialSuccess = standardSyncSummary.getTotalStats().getRecordsCommitted() > 0; + workflowState.setFailed(true); + } + } catch (final ChildWorkflowFailure childWorkflowFailure) { + if (childWorkflowFailure.getCause() instanceof CanceledFailure) { + // do nothing, cancellation handled by cancellationScope + + } else if (childWorkflowFailure.getCause() instanceof ActivityFailure) { + final ActivityFailure af = (ActivityFailure) childWorkflowFailure.getCause(); + failures.add(FailureHelper.failureReasonFromWorkflowAndActivity( + childWorkflowFailure.getWorkflowType(), + af.getActivityType(), + af.getCause(), + maybeJobId.get(), + maybeAttemptId.get())); + throw childWorkflowFailure; + } else { + failures.add( + FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), maybeJobId.get(), maybeAttemptId.get())); + throw childWorkflowFailure; + } } } }); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 99cd4d016e974..d0e36b3a953b6 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -157,7 +157,8 @@ public void runSuccess() { .hasSize(2); Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() != StateField.RUNNING && changedStateEvent.isValue()) + .filteredOn(changedStateEvent -> (changedStateEvent.getField() != StateField.RUNNING && changedStateEvent.getField() != StateField.SUCCESS) + && changedStateEvent.isValue()) .isEmpty(); testEnv.shutdown(); @@ -189,7 +190,8 @@ public void retryAfterFail() { .hasSize(1); Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() != StateField.RUNNING && changedStateEvent.isValue()) + .filteredOn(changedStateEvent -> (changedStateEvent.getField() != StateField.RUNNING && changedStateEvent.getField() != StateField.SUCCESS) + && changedStateEvent.isValue()) .isEmpty(); testEnv.shutdown(); @@ -230,7 +232,8 @@ public void manualRun() { Assertions.assertThat(events) .filteredOn( changedStateEvent -> (changedStateEvent.getField() != StateField.RUNNING - && changedStateEvent.getField() != StateField.SKIPPED_SCHEDULING) + && changedStateEvent.getField() != StateField.SKIPPED_SCHEDULING + && changedStateEvent.getField() != StateField.SUCCESS) && changedStateEvent.isValue()) .isEmpty(); @@ -271,7 +274,8 @@ public void updatedSignalRecieved() { Assertions.assertThat(events) .filteredOn( - changedStateEvent -> changedStateEvent.getField() != StateField.UPDATED && changedStateEvent.isValue()) + changedStateEvent -> (changedStateEvent.getField() != StateField.UPDATED && changedStateEvent.getField() != StateField.SUCCESS) + && changedStateEvent.isValue()) .isEmpty(); Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity); @@ -313,7 +317,8 @@ public void cancelNonRunning() { Assertions.assertThat(events) .filteredOn( - changedStateEvent -> changedStateEvent.getField() != StateField.CANCELLED && changedStateEvent.isValue()) + changedStateEvent -> (changedStateEvent.getField() != StateField.CANCELLED && changedStateEvent.getField() != StateField.SUCCESS) + && changedStateEvent.isValue()) .isEmpty(); Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity); @@ -355,7 +360,8 @@ public void deleteSync() { Assertions.assertThat(events) .filteredOn( - changedStateEvent -> changedStateEvent.getField() != StateField.DELETED && changedStateEvent.isValue()) + changedStateEvent -> changedStateEvent.getField() != StateField.DELETED && changedStateEvent.getField() != StateField.SUCCESS + && changedStateEvent.isValue()) .isEmpty(); Mockito.verify(mConnectionDeletionActivity).deleteConnection(Mockito.any()); From fa3412e38a04a760676fa2d510ae257082d637da Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 15:50:17 -0800 Subject: [PATCH 03/11] Update test with merge issues --- .../scheduling/state/WorkflowState.java | 1 + .../ConnectionManagerWorkflowTest.java | 228 ++++++++++++++++++ 2 files changed, 229 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index 2e3965d5728e0..8e70b31eeff2b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -133,6 +133,7 @@ public void reset() { this.setContinueAsReset(false); this.setRetryFailedActivity(false); this.setSuccess(false); + this.setStuck(false); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 09d815436e448..60990b6f12789 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -838,6 +838,234 @@ void testGetStuckAfterRun(Consumer signalSender, Thre } + @Nested + @DisplayName("Test which with a long running child workflow") + class SynchronousWorkflow { + + @BeforeEach + public void setup() { + testEnv = TestWorkflowEnvironment.newInstance(); + worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + // Register your workflow implementations + worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); + + client = testEnv.getWorkflowClient(); + + worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + testEnv.start(); + + workflow = client + .newWorkflowStub( + ConnectionManagerWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .build()); + } + + @Test + @DisplayName("Test workflow which recieved a manual while running does nothing") + public void manualRun() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + JOB_ID, + ATTEMPT_ID, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofMinutes(2L)); + workflow.submitManualSync(); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIPPED_SCHEDULING && changedStateEvent.isValue()) + .isEmpty(); + + } + + @Disabled + @Test + @DisplayName("Test that cancelling a running workflow cancel the sync") + public void cancelRunning() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + JOB_ID, + ATTEMPT_ID, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(10L)); + workflow.cancelJob(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID))); + } + + @Test + @DisplayName("Test that resetting a-non running workflow starts a reset") + public void resetStart() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + JOB_ID, + ATTEMPT_ID, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.resetConnection(); + testEnv.sleep(Duration.ofSeconds(90L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); + + } + + @Test + @DisplayName("Test that resetting a running workflow starts cancel the running workflow") + public void resetCancelRunningWorkflow() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + JOB_ID, + ATTEMPT_ID, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.resetConnection(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); + + } + + @Test + @DisplayName("Test that cancelling a reset don't restart a reset") + public void cancelResetDontContinueAsReset() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( + UUID.randomUUID(), + 1L, + 1, + false, + 1, + workflowState, + true)); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.cancelJob(); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + Assertions.assertThat(testStateListener.events(testId)) + .filteredOn((event) -> event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) + .isEmpty(); + + Assertions.assertThat(testStateListener.events(testId)) + .filteredOn((event) -> !event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) + .hasSizeGreaterThanOrEqualTo(2); + } + + @Test + @DisplayName("Test workflow which recieved an update signal wait for the current run and report the job status") + public void updatedSignalRecievedWhileRunning() { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + JOB_ID, + ATTEMPT_ID, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + workflow.connectionUpdated(); + testEnv.sleep(Duration.ofMinutes(1L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.UPDATED && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); + } + + } + @Nested @DisplayName("Test that sync workflow failures are recorded") class SyncWorkflowReplicationFailuresRecorded { From 6aefd6eb33854a4c38846b0eafb6e445415efd46 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 15:57:20 -0800 Subject: [PATCH 04/11] another merge issue fix --- .../ConnectionManagerWorkflowTest.java | 357 ++++-------------- 1 file changed, 64 insertions(+), 293 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 60990b6f12789..1919d8ebff95d 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -555,287 +555,6 @@ public void updatedSignalRecievedWhileRunning() { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1); } - - @Nested - @DisplayName("Test which with a long running child workflow") - class SynchronousWorkflow { - - @BeforeEach - public void setup() { - testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - - client = testEnv.getWorkflowClient(); - - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, - mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); - testEnv.start(); - - workflow = client - .newWorkflowStub( - ConnectionManagerWorkflow.class, - WorkflowOptions.newBuilder() - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) - .build()); - } - - @Test - @DisplayName("Test workflow which recieved a manual while running does nothing") - public void manualRun() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofMinutes(2L)); - workflow.submitManualSync(); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIPPED_SCHEDULING && changedStateEvent.isValue()) - .isEmpty(); - - } - - @Test - @DisplayName("Test that cancelling a running workflow cancel the sync") - public void cancelRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(10L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); - } - - @Test - @DisplayName("Test that resetting a-non running workflow starts a reset") - public void resetStart() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.resetConnection(); - testEnv.sleep(Duration.ofSeconds(90L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); - - } - - @Test - @DisplayName("Test that resetting a running workflow starts cancel the running workflow") - public void resetCancelRunningWorkflow() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.resetConnection(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); - - } - - @Test - @DisplayName("Test that cancelling a reset don't restart a reset") - public void cancelResetDontContinueAsReset() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - true)); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .isEmpty(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> !event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .hasSizeGreaterThanOrEqualTo(2); - } - - @Test - @DisplayName("Test workflow which recieved an update signal wait for the current run and report the job status") - public void updatedSignalRecievedWhileRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.connectionUpdated(); - testEnv.sleep(Duration.ofMinutes(1L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.UPDATED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); - } - - public static Stream getSetupFailingFailingActivityAfterRun() { - Thread.currentThread().run(); - return Stream.of( - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any())))); - } - - @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityAfterRun") - void testGetStuckAfterRun(Consumer signalSender, Thread mockSetup) { - mockSetup.run(); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - null, - null, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(10L)); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - signalSender.accept(workflow); - testEnv.sleep(Duration.ofSeconds(50L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) - .hasSize(1); - } - - } - } @Nested @@ -873,8 +592,8 @@ public void manualRun() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, + 1L, + 1, false, 1, workflowState, @@ -893,7 +612,6 @@ public void manualRun() { } - @Disabled @Test @DisplayName("Test that cancelling a running workflow cancel the sync") public void cancelRunning() { @@ -904,8 +622,8 @@ public void cancelRunning() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, + 1L, + 1, false, 1, workflowState, @@ -924,7 +642,7 @@ public void cancelRunning() { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1); - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID))); + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); } @Test @@ -937,8 +655,8 @@ public void resetStart() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, + 1L, + 1, false, 1, workflowState, @@ -970,8 +688,8 @@ public void resetCancelRunningWorkflow() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, + 1L, + 1, false, 1, workflowState, @@ -1036,8 +754,8 @@ public void updatedSignalRecievedWhileRunning() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, + 1L, + 1, false, 1, workflowState, @@ -1064,6 +782,59 @@ public void updatedSignalRecievedWhileRunning() { Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); } + public static Stream getSetupFailingFailingActivityAfterRun() { + Thread.currentThread().run(); + return Stream.of( + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any())))); + } + + @ParameterizedTest + @MethodSource("getSetupFailingFailingActivityAfterRun") + void testGetStuckAfterRun(Consumer signalSender, Thread mockSetup) { + mockSetup.run(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + null, + null, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(10L)); + workflow.submitManualSync(); + testEnv.sleep(Duration.ofSeconds(30L)); + signalSender.accept(workflow); + testEnv.sleep(Duration.ofSeconds(50L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSize(1); + } + } @Nested From a5721879042115e1017d10494563c8bdbfa599a8 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 16:01:24 -0800 Subject: [PATCH 05/11] More merge conflict resolution --- .../ConnectionManagerWorkflowTest.java | 98 ------------------- 1 file changed, 98 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 1919d8ebff95d..b7d33adaea96c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -421,40 +421,6 @@ void testGetStuckBeforeRun(Thread mockSetup) { .hasSize(1); } - @Disabled - @Test - @DisplayName("Test that cancelling a running workflow cancel the sync") - public void cancelRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(10L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID))); - } - @Test void testCanGetUnstuck() { Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) @@ -491,70 +457,6 @@ void testCanGetUnstuck() { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) .hasSize(1); } - - @Test - @DisplayName("Test that cancelling a reset don't restart a reset") - public void cancelResetDontContinueAsReset() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( - UUID.randomUUID(), - 1L, - 1, - false, - 1, - workflowState, - true)); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .isEmpty(); - - Assertions.assertThat(testStateListener.events(testId)) - .filteredOn((event) -> !event.isValue() && event.getField() == StateField.CONTINUE_AS_RESET) - .hasSizeGreaterThanOrEqualTo(2); - } - - @Test - @DisplayName("Test workflow which recieved an update signal wait for the current run and report the job status") - public void updatedSignalRecievedWhileRunning() { - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - JOB_ID, - ATTEMPT_ID, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(30L)); - workflow.connectionUpdated(); - testEnv.sleep(Duration.ofMinutes(1L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - } } @Nested From 6fc92d705872f68a1e4f63aef9b875a555a7669e Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 16:07:16 -0800 Subject: [PATCH 06/11] Update merge conflicts --- .../ConnectionManagerWorkflowTest.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index b7d33adaea96c..9aa73b58e57cb 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -131,8 +131,6 @@ public void setup() { WorkflowOptions.newBuilder() .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) .build()); - - Mockito.when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(2)); } @Test @@ -494,8 +492,8 @@ public void manualRun() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, @@ -514,6 +512,7 @@ public void manualRun() { } + @Disabled @Test @DisplayName("Test that cancelling a running workflow cancel the sync") public void cancelRunning() { @@ -524,8 +523,8 @@ public void cancelRunning() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, @@ -544,7 +543,7 @@ public void cancelRunning() { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1); - Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()); + Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID))); } @Test @@ -557,8 +556,8 @@ public void resetStart() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, @@ -590,8 +589,8 @@ public void resetCancelRunningWorkflow() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, @@ -624,8 +623,8 @@ public void cancelResetDontContinueAsReset() { final ConnectionUpdaterInput input = Mockito.spy(new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, @@ -656,8 +655,8 @@ public void updatedSignalRecievedWhileRunning() { final ConnectionUpdaterInput input = new ConnectionUpdaterInput( UUID.randomUUID(), - 1L, - 1, + JOB_ID, + ATTEMPT_ID, false, 1, workflowState, From 05a4b6842e98aca90dde764fb16675d27c03f62b Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 8 Feb 2022 16:22:01 -0800 Subject: [PATCH 07/11] Finish to fix the merge conflict --- .../ConnectionManagerWorkflowTest.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 9aa73b58e57cb..b3d83918a8369 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -105,6 +105,7 @@ public void setUp() { @AfterEach public void tearDown() { + testEnv.shutdown(); TestStateListener.reset(); } @@ -455,6 +456,7 @@ void testCanGetUnstuck() { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) .hasSize(1); } + } @Nested @@ -617,6 +619,8 @@ public void resetCancelRunningWorkflow() { @DisplayName("Test that cancelling a reset don't restart a reset") public void cancelResetDontContinueAsReset() { + Mockito.when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(2)); + final UUID testId = UUID.randomUUID(); final TestStateListener testStateListener = new TestStateListener(); final WorkflowState workflowState = new WorkflowState(testId, testStateListener); @@ -861,18 +865,6 @@ public void testReplicationFailureRecorded() { } - @Nested - @DisplayName("Test with failing activity") - class FailingActivityWorkflow { - - @Test - void testGetStuck() { - Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) - .thenThrow(new RuntimeException()); - } - - } - private class HasFailureFromOrigin implements ArgumentMatcher { private final FailureOrigin expectedFailureOrigin; From e7b7f872461717799825b1410194d1f7c6dc4147 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Thu, 10 Feb 2022 15:01:39 -0800 Subject: [PATCH 08/11] Ensure that the preper flag is set --- .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 424ec99c58f4c..125f9811a8a83 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -371,6 +371,7 @@ private OUTPUT runActivityWithOutput(Function map } catch (Exception e) { log.error("Failed to run an activity for the connection " + connectionId, e); workflowState.setStuck(true); + workflowState.setRetryFailedActivity(false); Workflow.await(() -> workflowState.isRetryFailedActivity()); log.error("Retrying an activity for the connection " + connectionId, e); workflowState.setStuck(false); @@ -385,6 +386,7 @@ private void runActivity(Consumer consumer, INPUT input) { } catch (Exception e) { log.error("Failed to run an activity for the connection " + connectionId, e); workflowState.setStuck(true); + workflowState.setRetryFailedActivity(false); Workflow.await(() -> workflowState.isRetryFailedActivity()); log.error("Retrying an activity for the connection " + connectionId, e); workflowState.setStuck(false); From 9fe026f7753a6f75749a35966df36eab5a24101b Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 15 Feb 2022 11:46:48 -0800 Subject: [PATCH 09/11] fix new tests --- .../ConnectionManagerWorkflowTest.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index d6f17e6a77e82..57ec5451c6078 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -421,6 +421,8 @@ public static Stream getSetupFailingFailingActivityBeforeRun() { @MethodSource("getSetupFailingFailingActivityBeforeRun") void testGetStuckBeforeRun(Thread mockSetup) { mockSetup.run(); + Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( + Duration.ZERO)); final UUID testId = UUID.randomUUID(); TestStateListener.reset(); @@ -457,6 +459,9 @@ void testCanGetUnstuck() { .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) .thenReturn(new JobCreationOutput(1l)); + Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( + Duration.ZERO)); + final UUID testId = UUID.randomUUID(); final TestStateListener testStateListener = new TestStateListener(); final WorkflowState workflowState = new WorkflowState(testId, testStateListener); @@ -699,8 +704,6 @@ public void resetCancelRunningWorkflow() { @DisplayName("Test that cancelling a reset doesn't restart a reset") public void cancelResetDontContinueAsReset() { - Mockito.when(mConfigFetchActivity.getMaxAttempt()).thenReturn(new GetMaxAttemptOutput(2)); - final UUID testId = UUID.randomUUID(); final TestStateListener testStateListener = new TestStateListener(); final WorkflowState workflowState = new WorkflowState(testId, testStateListener); @@ -824,11 +827,19 @@ void testGetStuckAfterRun(Consumer signalSender, Thre false); WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(10L)); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofSeconds(90L)); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); signalSender.accept(workflow); - testEnv.sleep(Duration.ofSeconds(60L)); + + // TODO + // For some reason this transiently fails if it is below the runtime. + // However, this should be reported almost immediately. I think this is a bug. + testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1)); testEnv.shutdown(); final Queue events = testStateListener.events(testId); From f0766159c22dd7f529bed41612cfa5b55663595a Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 15 Feb 2022 11:55:26 -0800 Subject: [PATCH 10/11] Extract stuck test --- .../ConnectionManagerWorkflowTest.java | 326 ++++++++++-------- 1 file changed, 175 insertions(+), 151 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 57ec5451c6078..8b18db4df7491 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -404,95 +404,6 @@ public void deleteSync() { testEnv.shutdown(); } - public static Stream getSetupFailingFailingActivityBeforeRun() { - Thread.currentThread().run(); - return Stream.of( - Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) - .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), - Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttempt(Mockito.any())) - .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), - Arguments.of(new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).reportJobStart(Mockito.any()))), - Arguments.of(new Thread(() -> Mockito.when(mGenerateInputActivityImpl.getSyncWorkflowInput(Mockito.any())) - .thenThrow(ApplicationFailure.newNonRetryableFailure("", ""))))); - } - - @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityBeforeRun") - void testGetStuckBeforeRun(Thread mockSetup) { - mockSetup.run(); - Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( - Duration.ZERO)); - - final UUID testId = UUID.randomUUID(); - TestStateListener.reset(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - null, - null, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofMinutes(2L)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) - .isEmpty(); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) - .hasSize(1); - } - - @Test - void testCanGetUnstuck() { - Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) - .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .thenReturn(new JobCreationOutput(1l)); - - Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( - Duration.ZERO)); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - null, - null, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - testEnv.sleep(Duration.ofSeconds(80L)); - workflow.retryFailActivity(); - testEnv.sleep(Duration.ofSeconds(30L)); - - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) - .hasSize(1); - } - } @Nested @@ -787,68 +698,6 @@ public void updatedSignalReceivedWhileRunning() { Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); } - - public static Stream getSetupFailingFailingActivityAfterRun() { - Thread.currentThread().run(); - return Stream.of( - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any()))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any())))); - } - - @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityAfterRun") - void testGetStuckAfterRun(Consumer signalSender, Thread mockSetup) { - mockSetup.run(); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = new ConnectionUpdaterInput( - UUID.randomUUID(), - null, - null, - false, - 1, - workflowState, - false); - - WorkflowClient.start(workflow::run, input); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofMinutes(1)); - workflow.submitManualSync(); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofMinutes(1)); - signalSender.accept(workflow); - - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1)); - testEnv.shutdown(); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) - .hasSize(1); - } - } @Nested @@ -1004,6 +853,181 @@ public void testReplicationFailureRecorded() { } + @Nested + @DisplayName("Test that the workflow are properly getting stuck") + class StuckWorkflow { + @BeforeEach + public void setup() { + testEnv = TestWorkflowEnvironment.newInstance(); + worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + // Register your workflow implementations + worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); + + client = testEnv.getWorkflowClient(); + + worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + testEnv.start(); + + workflow = client + .newWorkflowStub( + ConnectionManagerWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .build()); + } + + public static Stream getSetupFailingFailingActivityBeforeRun() { + Thread.currentThread().run(); + return Stream.of( + Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), + Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttempt(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), + Arguments.of(new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).reportJobStart(Mockito.any()))), + Arguments.of(new Thread(() -> Mockito.when(mGenerateInputActivityImpl.getSyncWorkflowInput(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", ""))))); + } + + @ParameterizedTest + @MethodSource("getSetupFailingFailingActivityBeforeRun") + void testGetStuckBeforeRun(Thread mockSetup) { + mockSetup.run(); + Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( + Duration.ZERO)); + + final UUID testId = UUID.randomUUID(); + TestStateListener.reset(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + null, + null, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofMinutes(2L)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) + .isEmpty(); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSize(1); + } + + @Test + void testCanGetUnstuck() { + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .thenReturn(new JobCreationOutput(1l)); + + Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( + Duration.ZERO)); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + null, + null, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + testEnv.sleep(Duration.ofSeconds(80L)); + workflow.retryFailActivity(); + testEnv.sleep(Duration.ofSeconds(30L)); + + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) + .hasSize(1); + } + + public static Stream getSetupFailingFailingActivityAfterRun() { + Thread.currentThread().run(); + return Stream.of( + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any()))), + Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.simulateFailure()), + new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) + .when(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.any())))); + } + + @ParameterizedTest + @MethodSource("getSetupFailingFailingActivityAfterRun") + void testGetStuckAfterRun(Consumer signalSender, Thread mockSetup) { + mockSetup.run(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = new ConnectionUpdaterInput( + UUID.randomUUID(), + null, + null, + false, + 1, + workflowState, + false); + + WorkflowClient.start(workflow::run, input); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); + workflow.submitManualSync(); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); + signalSender.accept(workflow); + + // TODO + // For some reason this transiently fails if it is below the runtime. + // However, this should be reported almost immediately. I think this is a bug. + testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1)); + testEnv.shutdown(); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) + .hasSize(1); + } + } + private class HasFailureFromOrigin implements ArgumentMatcher { private final FailureOrigin expectedFailureOrigin; From dc75830366741b10be151a18f1ccd52b9ca5bb11 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 15 Feb 2022 13:55:30 -0800 Subject: [PATCH 11/11] Rename and PR comments --- .../scheduling/ConnectionManagerWorkflow.java | 8 +-- .../ConnectionManagerWorkflowImpl.java | 62 ++++++++++--------- .../scheduling/state/WorkflowState.java | 10 +-- .../ConnectionManagerWorkflowTest.java | 5 +- 4 files changed, 45 insertions(+), 40 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java index 82d176d538589..2b07b7c3394f9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java @@ -61,7 +61,7 @@ public interface ConnectionManagerWorkflow { * activity. */ @SignalMethod - void retryFailActivity(); + void retryFailedActivity(); /** * Use for testing in order to simulate a job failure. @@ -94,12 +94,12 @@ class JobInformation { @Data @NoArgsConstructor @AllArgsConstructor - class StuckInformation { + class QuarantinedInformation { private UUID connectionId; private long jobId; private int attemptId; - private boolean isStuck; + private boolean isQuarantined; } @@ -107,6 +107,6 @@ class StuckInformation { * Return if a job is stuck or not with the job information */ @QueryMethod - StuckInformation getStuckInformation(); + QuarantinedInformation getQuarantinedInformation(); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 953474cfa3614..a158d1ce1a0b7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -100,7 +100,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr if (!workflowState.isUpdated() && !workflowState.isDeleted()) { maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { final JobCreationOutput jobCreationOutput = - runActivityWithOutput( + runMandatoryActivityWithOutput( (input) -> jobCreationAndStatusUpdateActivity.createNewJob(input), new JobCreationInput( connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); @@ -110,7 +110,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { final AttemptCreationOutput attemptCreationOutput = - runActivityWithOutput( + runMandatoryActivityWithOutput( (input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input), new AttemptCreationInput( jobId)); @@ -124,12 +124,12 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr maybeJobId.get(), workflowState.isResetConnection()); - runActivity( + runMandatoryActivity( (input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input), new ReportJobStartInput( maybeJobId.get())); - final SyncOutput syncWorkflowInputs = runActivityWithOutput( + final SyncOutput syncWorkflowInputs = runMandatoryActivityWithOutput( (input) -> getSyncInputActivity.getSyncWorkflowInput(input), getSyncInputActivitySyncInput); @@ -213,10 +213,10 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } else if (workflowState.isDeleted()) { // Stop the runs final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId()); - runActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput); + runMandatoryActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput); return; } else if (workflowState.isCancelled() || workflowState.isCancelledForReset()) { - runActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input), + runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input), new JobCancelledInput( maybeJobId.get(), maybeAttemptId.get(), @@ -239,7 +239,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) { workflowState.setSuccess(true); - runActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput( + runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput( maybeJobId.get(), maybeAttemptId.get(), standardSyncOutput.orElse(null))); @@ -248,7 +248,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) } private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) { - runActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput( + runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput( connectionUpdaterInput.getJobId(), connectionUpdaterInput.getAttemptId(), standardSyncOutput.orElse(null), @@ -262,7 +262,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) connectionUpdaterInput.setAttemptNumber(attemptNumber + 1); connectionUpdaterInput.setFromFailure(true); } else { - runActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput( + runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput( connectionUpdaterInput.getJobId(), "Job failed after too many retries for connection " + connectionId)); @@ -319,7 +319,7 @@ public void resetConnection() { } @Override - public void retryFailActivity() { + public void retryFailedActivity() { workflowState.setRetryFailedActivity(true); } @@ -341,12 +341,12 @@ public JobInformation getJobInformation() { } @Override - public StuckInformation getStuckInformation() { - return new StuckInformation( + public QuarantinedInformation getQuarantinedInformation() { + return new QuarantinedInformation( connectionId, maybeJobId.orElse(NON_RUNNING_JOB_ID), maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID), - workflowState.isStuck()); + workflowState.isQuarantined()); } private Boolean skipScheduling() { @@ -366,34 +366,36 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) } } - private OUTPUT runActivityWithOutput(Function mapper, INPUT input) { + /** + * This is running a lambda function that takes {@param input} as an input. If the run of the lambda + * is thowing an exception, the workflow will be in a quarantined state and can then be manual + * un-quarantined or a retry of the failed lambda can be trigger through a signal method. + * + * We aimed to use this method for call of the temporal activity. + */ + private OUTPUT runMandatoryActivityWithOutput(Function mapper, INPUT input) { try { return mapper.apply(input); } catch (Exception e) { log.error("Failed to run an activity for the connection " + connectionId, e); - workflowState.setStuck(true); + workflowState.setQuarantined(true); workflowState.setRetryFailedActivity(false); Workflow.await(() -> workflowState.isRetryFailedActivity()); log.error("Retrying an activity for the connection " + connectionId, e); - workflowState.setStuck(false); + workflowState.setQuarantined(false); workflowState.setRetryFailedActivity(false); - return runActivityWithOutput(mapper, input); + return runMandatoryActivityWithOutput(mapper, input); } } - private void runActivity(Consumer consumer, INPUT input) { - try { - consumer.accept(input); - } catch (Exception e) { - log.error("Failed to run an activity for the connection " + connectionId, e); - workflowState.setStuck(true); - workflowState.setRetryFailedActivity(false); - Workflow.await(() -> workflowState.isRetryFailedActivity()); - log.error("Retrying an activity for the connection " + connectionId, e); - workflowState.setStuck(false); - workflowState.setRetryFailedActivity(false); - runActivity(consumer, input); - } + /** + * Similar to runMandatoryActivityWithOutput but for methods that don't return + */ + private void runMandatoryActivity(Consumer consumer, INPUT input) { + runMandatoryActivityWithOutput((inputInternal) -> { + consumer.accept(inputInternal); + return null; + }, input); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index d1529642e571d..95d92572cc4fd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -31,7 +31,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private boolean resetConnection = false; private boolean continueAsReset = false; private boolean retryFailedActivity = false; - private boolean stuck = false; + private boolean quarantined = false; private boolean success = true; private boolean cancelledForReset = false; @@ -107,12 +107,12 @@ public void setRetryFailedActivity(final boolean retryFailedActivity) { this.retryFailedActivity = retryFailedActivity; } - public void setStuck(final boolean stuck) { + public void setQuarantined(final boolean quarantined) { final ChangedStateEvent event = new ChangedStateEvent( StateField.STUCK, - stuck); + quarantined); stateChangedListener.addEvent(id, event); - this.stuck = stuck; + this.quarantined = quarantined; } public void setSuccess(final boolean success) { @@ -142,7 +142,7 @@ public void reset() { this.setContinueAsReset(false); this.setRetryFailedActivity(false); this.setSuccess(false); - this.setStuck(false); + this.setQuarantined(false); this.setCancelledForReset(false); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 8b18db4df7491..477fd15a45221 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -698,6 +698,7 @@ public void updatedSignalReceivedWhileRunning() { Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any()); } + } @Nested @@ -856,6 +857,7 @@ public void testReplicationFailureRecorded() { @Nested @DisplayName("Test that the workflow are properly getting stuck") class StuckWorkflow { + @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); @@ -950,7 +952,7 @@ void testCanGetUnstuck() { WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(80L)); - workflow.retryFailActivity(); + workflow.retryFailedActivity(); testEnv.sleep(Duration.ofSeconds(30L)); testEnv.shutdown(); @@ -1026,6 +1028,7 @@ void testGetStuckAfterRun(Consumer signalSender, Thre .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.STUCK && changedStateEvent.isValue()) .hasSize(1); } + } private class HasFailureFromOrigin implements ArgumentMatcher {