Skip to content

Commit e7c26d6

Browse files
authored
Add logic to recover from quarantined and completed states (#13071)
* add logic to recover from quarantined and completed states * move status retrieval into try-catch * fix typo in log * add one more tests * mvoe isWorkflowStateRunning into ConnectionManagerUtils to be more direct * format
1 parent 8b9fa33 commit e7c26d6

File tree

5 files changed

+173
-24
lines changed

5 files changed

+173
-24
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java

+61-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl;
1111
import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput;
1212
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
13+
import io.temporal.api.common.v1.WorkflowExecution;
14+
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
15+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
16+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
1317
import io.temporal.client.BatchRequest;
1418
import io.temporal.client.WorkflowClient;
1519
import io.temporal.workflow.Functions.Proc;
@@ -99,6 +103,10 @@ private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(
99103
connectionId),
100104
e);
101105

106+
// in case there is an existing workflow in a bad state, attempt to terminate it first before
107+
// starting a new workflow
108+
safeTerminateWorkflow(client, connectionId, "Terminating workflow in unreachable state before starting a new workflow for this connection");
109+
102110
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
103111
final ConnectionUpdaterInput startWorkflowInput = buildStartWorkflowInput(connectionId);
104112

@@ -121,6 +129,18 @@ private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(
121129
}
122130
}
123131

132+
static void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
133+
log.info("Attempting to terminate existing workflow for connection {}.", connectionId);
134+
try {
135+
client.newUntypedWorkflowStub(getConnectionManagerName(connectionId)).terminate(reason);
136+
} catch (final Exception e) {
137+
log.warn(
138+
"Could not terminate temporal workflow due to the following error; "
139+
+ "this may be because there is currently no running workflow for this connection.",
140+
e);
141+
}
142+
}
143+
124144
static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
125145
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
126146
final ConnectionUpdaterInput input = buildStartWorkflowInput(connectionId);
@@ -136,30 +156,66 @@ static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowCl
136156
* @param connectionId the ID of the connection whose workflow should be retrieved
137157
* @return the healthy ConnectionManagerWorkflow
138158
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state
139-
* @throws UnreachableWorkflowException if the workflow is unreachable
159+
* @throws UnreachableWorkflowException if the workflow is in an unreachable state
140160
*/
141161
static ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
142162
throws DeletedWorkflowException, UnreachableWorkflowException {
163+
143164
final ConnectionManagerWorkflow connectionManagerWorkflow;
144165
final WorkflowState workflowState;
166+
final WorkflowExecutionStatus workflowExecutionStatus;
145167
try {
146168
connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
147169
workflowState = connectionManagerWorkflow.getState();
170+
workflowExecutionStatus = getConnectionManagerWorkflowStatus(client, connectionId);
148171
} catch (final Exception e) {
149172
throw new UnreachableWorkflowException(
150173
String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s due to the following error:", connectionId),
151174
e);
152175
}
153176

154-
if (workflowState.isDeleted()) {
155-
throw new DeletedWorkflowException(String.format(
156-
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
157-
connectionId));
177+
if (WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED.equals(workflowExecutionStatus)) {
178+
if (workflowState.isDeleted()) {
179+
throw new DeletedWorkflowException(String.format(
180+
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
181+
connectionId));
182+
}
183+
184+
// A non-deleted workflow being in a COMPLETED state is unexpected, and should be corrected
185+
throw new UnreachableWorkflowException(
186+
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
187+
}
188+
189+
if (workflowState.isQuarantined()) {
190+
throw new UnreachableWorkflowException(
191+
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
158192
}
159193

160194
return connectionManagerWorkflow;
161195
}
162196

197+
static boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
198+
try {
199+
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
200+
getConnectionManagerName(connectionId));
201+
return connectionManagerWorkflow.getState().isRunning();
202+
} catch (final Exception e) {
203+
return false;
204+
}
205+
}
206+
207+
static WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
208+
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
209+
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(getConnectionManagerName(connectionId)).build())
210+
.setNamespace(workflowClient.getOptions().getNamespace())
211+
.build();
212+
213+
final DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = workflowClient.getWorkflowServiceStubs().blockingStub()
214+
.describeWorkflowExecution(describeWorkflowExecutionRequest);
215+
216+
return describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus();
217+
}
218+
163219
static long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
164220
try {
165221
final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.airbyte.workers.temporal.exception.DeletedWorkflowException;
2727
import io.airbyte.workers.temporal.exception.UnreachableWorkflowException;
2828
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow;
29-
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
3029
import io.airbyte.workers.temporal.spec.SpecWorkflow;
3130
import io.airbyte.workers.temporal.sync.SyncWorkflow;
3231
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
@@ -276,7 +275,7 @@ public static class ManualOperationResult {
276275
public ManualOperationResult startNewManualSync(final UUID connectionId) {
277276
log.info("Manual sync request");
278277

279-
if (isWorkflowStateRunning(connectionId)) {
278+
if (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId)) {
280279
// TODO Bmoric: Error is running
281280
return new ManualOperationResult(
282281
Optional.of("A sync is already running for: " + connectionId),
@@ -335,7 +334,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
335334
Optional.of("Didn't manage to cancel a sync for: " + connectionId),
336335
Optional.empty());
337336
}
338-
} while (isWorkflowStateRunning(connectionId));
337+
} while (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));
339338

340339
log.info("end of manual cancellation");
341340

@@ -463,18 +462,4 @@ boolean isWorkflowReachable(final UUID connectionId) {
463462
}
464463
}
465464

466-
/**
467-
* Check if a workflow is reachable and has state {@link WorkflowState#isRunning()}
468-
*/
469-
@VisibleForTesting
470-
boolean isWorkflowStateRunning(final UUID connectionId) {
471-
try {
472-
final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);
473-
474-
return connectionManagerWorkflow.getState().isRunning();
475-
} catch (final Exception e) {
476-
return false;
477-
}
478-
}
479-
480465
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/UnreachableWorkflowException.java

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
public class UnreachableWorkflowException extends Exception {
88

9+
public UnreachableWorkflowException(final String message) {
10+
super(message);
11+
}
12+
913
public UnreachableWorkflowException(final String message, final Throwable t) {
1014
super(message, t);
1115
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -316,14 +316,14 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
316316
}
317317
}
318318

319-
private SyncCheckConnectionFailure checkConnections(GenerateInputActivity.GeneratedJobInput jobInputs) {
319+
private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.GeneratedJobInput jobInputs) {
320320
final JobRunConfig jobRunConfig = jobInputs.getJobRunConfig();
321321
final StandardSyncInput syncInput = jobInputs.getSyncInput();
322322
final JsonNode sourceConfig = syncInput.getSourceConfiguration();
323323
final JsonNode destinationConfig = syncInput.getDestinationConfiguration();
324324
final IntegrationLauncherConfig sourceLauncherConfig = jobInputs.getSourceLauncherConfig();
325325
final IntegrationLauncherConfig destinationLauncherConfig = jobInputs.getDestinationLauncherConfig();
326-
SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);
326+
final SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);
327327

328328
final int attemptCreationVersion =
329329
Workflow.getVersion(CHECK_BEFORE_SYNC_TAG, Workflow.DEFAULT_VERSION, CHECK_BEFORE_SYNC_CURRENT_VERSION);

airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java

+104
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@
4242
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
4343
import io.airbyte.workers.temporal.spec.SpecWorkflow;
4444
import io.airbyte.workers.temporal.sync.SyncWorkflow;
45+
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
46+
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
47+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
48+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc.WorkflowServiceBlockingStub;
4549
import io.temporal.client.BatchRequest;
4650
import io.temporal.client.WorkflowClient;
51+
import io.temporal.client.WorkflowClientOptions;
4752
import io.temporal.client.WorkflowOptions;
53+
import io.temporal.client.WorkflowStub;
4854
import io.temporal.serviceclient.WorkflowServiceStubs;
4955
import io.temporal.workflow.Functions.Proc;
5056
import java.io.IOException;
@@ -79,19 +85,26 @@ class TemporalClientTest {
7985
.withJobId(String.valueOf(JOB_ID))
8086
.withAttemptId((long) ATTEMPT_ID)
8187
.withDockerImage(IMAGE_NAME1);
88+
private static final String NAMESPACE = "namespace";
8289

8390
private WorkflowClient workflowClient;
8491
private TemporalClient temporalClient;
8592
private Path logPath;
8693
private WorkflowServiceStubs workflowServiceStubs;
94+
private WorkflowServiceBlockingStub workflowServiceBlockingStub;
8795
private Configs configs;
8896

8997
@BeforeEach
9098
void setup() throws IOException {
9199
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test");
92100
logPath = workspaceRoot.resolve(String.valueOf(JOB_ID)).resolve(String.valueOf(ATTEMPT_ID)).resolve(LogClientSingleton.LOG_FILENAME);
93101
workflowClient = mock(WorkflowClient.class);
102+
when(workflowClient.getOptions()).thenReturn(WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
94103
workflowServiceStubs = mock(WorkflowServiceStubs.class);
104+
when(workflowClient.getWorkflowServiceStubs()).thenReturn(workflowServiceStubs);
105+
workflowServiceBlockingStub = mock(WorkflowServiceBlockingStub.class);
106+
when(workflowServiceStubs.blockingStub()).thenReturn(workflowServiceBlockingStub);
107+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
95108
temporalClient = spy(new TemporalClient(workflowClient, workspaceRoot, workflowServiceStubs, configs));
96109
}
97110

@@ -336,6 +349,7 @@ void testDeleteConnectionOnDeletedWorkflow() {
336349
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
337350
when(mWorkflowState.isDeleted()).thenReturn(true);
338351
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
352+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
339353

340354
temporalClient.deleteConnection(CONNECTION_ID);
341355

@@ -393,6 +407,7 @@ void testUpdateConnectionDeletedWorkflow() {
393407
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
394408
when(mWorkflowState.isDeleted()).thenReturn(true);
395409
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
410+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
396411

397412
temporalClient.update(CONNECTION_ID);
398413

@@ -490,6 +505,7 @@ void testStartNewManualSyncDeletedWorkflow() {
490505
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
491506
when(mWorkflowState.isDeleted()).thenReturn(true);
492507
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
508+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
493509

494510
final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);
495511

@@ -568,6 +584,7 @@ void testStartNewCancellationDeletedWorkflow() {
568584
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
569585
when(mWorkflowState.isDeleted()).thenReturn(true);
570586
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
587+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
571588

572589
final ManualOperationResult result = temporalClient.startNewCancellation(CONNECTION_ID);
573590

@@ -656,6 +673,7 @@ void testResetConnectionDeletedWorkflow() {
656673
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
657674
when(mWorkflowState.isDeleted()).thenReturn(true);
658675
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
676+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
659677

660678
final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID);
661679

@@ -667,4 +685,90 @@ void testResetConnectionDeletedWorkflow() {
667685

668686
}
669687

688+
@Test
689+
@DisplayName("Test manual operation on quarantined workflow causes a restart")
690+
void testManualOperationOnQuarantinedWorkflow() {
691+
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
692+
final WorkflowState mWorkflowState = mock(WorkflowState.class);
693+
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
694+
when(mWorkflowState.isQuarantined()).thenReturn(true);
695+
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
696+
697+
final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
698+
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
699+
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
700+
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
701+
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
702+
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
703+
final BatchRequest mBatchRequest = mock(BatchRequest.class);
704+
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);
705+
706+
final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
707+
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);
708+
709+
final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);
710+
711+
assertTrue(result.getJobId().isPresent());
712+
assertEquals(JOB_ID, result.getJobId().get());
713+
assertFalse(result.getFailingReason().isPresent());
714+
verify(workflowClient).signalWithStart(mBatchRequest);
715+
verify(mWorkflowStub).terminate(anyString());
716+
717+
// Verify that the submitManualSync signal was passed to the batch request by capturing the
718+
// argument,
719+
// executing the signal, and verifying that the desired signal was executed
720+
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
721+
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
722+
final Proc signal = batchRequestAddArgCaptor.getValue();
723+
signal.apply();
724+
verify(mNewConnectionManagerWorkflow).submitManualSync();
725+
}
726+
727+
@Test
728+
@DisplayName("Test manual operation on completed workflow causes a restart")
729+
void testManualOperationOnCompletedWorkflow() {
730+
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
731+
final WorkflowState mWorkflowState = mock(WorkflowState.class);
732+
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
733+
when(mWorkflowState.isQuarantined()).thenReturn(false);
734+
when(mWorkflowState.isDeleted()).thenReturn(false);
735+
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
736+
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
737+
738+
final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
739+
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
740+
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
741+
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
742+
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
743+
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
744+
final BatchRequest mBatchRequest = mock(BatchRequest.class);
745+
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);
746+
747+
final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
748+
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);
749+
750+
final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);
751+
752+
assertTrue(result.getJobId().isPresent());
753+
assertEquals(JOB_ID, result.getJobId().get());
754+
assertFalse(result.getFailingReason().isPresent());
755+
verify(workflowClient).signalWithStart(mBatchRequest);
756+
verify(mWorkflowStub).terminate(anyString());
757+
758+
// Verify that the submitManualSync signal was passed to the batch request by capturing the
759+
// argument,
760+
// executing the signal, and verifying that the desired signal was executed
761+
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
762+
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
763+
final Proc signal = batchRequestAddArgCaptor.getValue();
764+
signal.apply();
765+
verify(mNewConnectionManagerWorkflow).submitManualSync();
766+
}
767+
768+
private void mockWorkflowStatus(final WorkflowExecutionStatus status) {
769+
when(workflowServiceBlockingStub.describeWorkflowExecution(any())).thenReturn(
770+
DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo(
771+
WorkflowExecutionInfo.newBuilder().setStatus(status).buildPartial()).build());
772+
}
773+
670774
}

0 commit comments

Comments
 (0)