Skip to content

Commit b3db914

Browse files
authored
Change where a connection is deleted (#19096)
* Tmp * Move when the deletion is performed * Re-enable disable test * PR comments * Use cancel * rename * Fix test and version check position * Log exception
1 parent e73f535 commit b3db914

File tree

13 files changed

+79
-76
lines changed

13 files changed

+79
-76
lines changed

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@
3030
@Slf4j
3131
public class ConnectionManagerUtils {
3232

33+
/**
34+
* Send a cancellation to the workflow. It will swallow any exception and won't check if the
35+
* workflow is already deleted when being cancel.
36+
*/
37+
public void deleteWorkflowIfItExist(final WorkflowClient client,
38+
final UUID connectionId) {
39+
try {
40+
final ConnectionManagerWorkflow connectionManagerWorkflow =
41+
client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
42+
connectionManagerWorkflow.deleteConnection();
43+
} catch (final Exception e) {
44+
log.warn("The workflow is not reachable when trying to cancel it", e);
45+
}
46+
47+
}
48+
3349
/**
3450
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
3551
*

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,13 +476,13 @@ public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connect
476476
return connectionManagerWorkflow;
477477
}
478478

479-
public void deleteConnection(final UUID connectionId) {
480-
try {
481-
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId,
482-
connectionManagerWorkflow -> connectionManagerWorkflow::deleteConnection);
483-
} catch (final DeletedWorkflowException e) {
484-
log.info("Connection {} has already been deleted.", connectionId);
485-
}
479+
/**
480+
* This will cancel a workflow even if the connection is deleted already
481+
*
482+
* @param connectionId - connectionId to cancel
483+
*/
484+
public void forceDeleteWorkflow(final UUID connectionId) {
485+
connectionManagerUtils.deleteWorkflowIfItExist(client, connectionId);
486486
}
487487

488488
public void update(final UUID connectionId) {

airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -326,12 +326,12 @@ void migrateCalled() {
326326

327327
@Nested
328328
@DisplayName("Test delete connection method.")
329-
class DeleteConnection {
329+
class ForceCancelConnection {
330330

331331
@Test
332332
@SuppressWarnings(UNCHECKED)
333333
@DisplayName("Test delete connection method when workflow is in a running state.")
334-
void testDeleteConnection() {
334+
void testforceCancelConnection() {
335335
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
336336
final WorkflowState mWorkflowState = mock(WorkflowState.class);
337337
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
@@ -349,54 +349,9 @@ void testDeleteConnection() {
349349
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
350350

351351
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
352-
temporalClient.deleteConnection(CONNECTION_ID);
352+
temporalClient.forceDeleteWorkflow(CONNECTION_ID);
353353

354-
verify(workflowClient, Mockito.never()).newSignalWithStartRequest();
355-
verify(mConnectionManagerWorkflow).deleteConnection();
356-
}
357-
358-
@Test
359-
@SuppressWarnings(UNCHECKED)
360-
@DisplayName("Test delete connection method when workflow is in an unexpected state")
361-
void testDeleteConnectionInUnexpectedState() {
362-
final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
363-
when(mTerminatedConnectionManagerWorkflow.getState())
364-
.thenThrow(new IllegalStateException(EXCEPTION_MESSAGE));
365-
when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow);
366-
367-
final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
368-
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
369-
final BatchRequest mBatchRequest = mock(BatchRequest.class);
370-
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);
371-
372-
temporalClient.deleteConnection(CONNECTION_ID);
373-
verify(workflowClient).signalWithStart(mBatchRequest);
374-
375-
// Verify that the deleteConnection signal was passed to the batch request by capturing the
376-
// argument,
377-
// executing the signal, and verifying that the desired signal was executed
378-
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
379-
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
380-
final Proc signal = batchRequestAddArgCaptor.getValue();
381-
signal.apply();
382-
verify(mNewConnectionManagerWorkflow).deleteConnection();
383-
}
384-
385-
@Test
386-
@SuppressWarnings(UNCHECKED)
387-
@DisplayName("Test delete connection method when workflow has already been deleted")
388-
void testDeleteConnectionOnDeletedWorkflow() {
389-
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
390-
final WorkflowState mWorkflowState = mock(WorkflowState.class);
391-
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
392-
when(mWorkflowState.isDeleted()).thenReturn(true);
393-
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
394-
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
395-
396-
temporalClient.deleteConnection(CONNECTION_ID);
397-
398-
verify(temporalClient).deleteConnection(CONNECTION_ID);
399-
verifyNoMoreInteractions(temporalClient);
354+
verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
400355
}
401356

402357
}

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import io.airbyte.server.scheduler.EventRunner;
7171
import io.airbyte.server.scheduler.TemporalEventRunner;
7272
import io.airbyte.validation.json.JsonSchemaValidator;
73+
import io.airbyte.workers.helper.ConnectionHelper;
7374
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
7475
import io.temporal.serviceclient.WorkflowServiceStubs;
7576
import java.net.http.HttpClient;
@@ -258,11 +259,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
258259

259260
final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);
260261

262+
final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);
263+
261264
final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
262265
configRepository,
263266
workspaceHelper,
264267
trackingClient,
265-
eventRunner);
268+
eventRunner,
269+
connectionHelper);
266270

267271
final DestinationHandler destinationHandler = new DestinationHandler(
268272
configRepository,

airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,29 +78,34 @@ public class ConnectionsHandler {
7878
private final WorkspaceHelper workspaceHelper;
7979
private final TrackingClient trackingClient;
8080
private final EventRunner eventRunner;
81+
private final ConnectionHelper connectionHelper;
8182

8283
@VisibleForTesting
8384
ConnectionsHandler(final ConfigRepository configRepository,
8485
final Supplier<UUID> uuidGenerator,
8586
final WorkspaceHelper workspaceHelper,
8687
final TrackingClient trackingClient,
87-
final EventRunner eventRunner) {
88+
final EventRunner eventRunner,
89+
final ConnectionHelper connectionHelper) {
8890
this.configRepository = configRepository;
8991
this.uuidGenerator = uuidGenerator;
9092
this.workspaceHelper = workspaceHelper;
9193
this.trackingClient = trackingClient;
9294
this.eventRunner = eventRunner;
95+
this.connectionHelper = connectionHelper;
9396
}
9497

9598
public ConnectionsHandler(final ConfigRepository configRepository,
9699
final WorkspaceHelper workspaceHelper,
97100
final TrackingClient trackingClient,
98-
final EventRunner eventRunner) {
101+
final EventRunner eventRunner,
102+
final ConnectionHelper connectionHelper) {
99103
this(configRepository,
100104
UUID::randomUUID,
101105
workspaceHelper,
102106
trackingClient,
103-
eventRunner);
107+
eventRunner,
108+
connectionHelper);
104109

105110
}
106111

@@ -545,8 +550,9 @@ public boolean matchSearch(final DestinationSearch destinationSearch, final Dest
545550
return (destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead));
546551
}
547552

548-
public void deleteConnection(final UUID connectionId) {
549-
eventRunner.deleteConnection(connectionId);
553+
public void deleteConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
554+
connectionHelper.deleteConnection(connectionId);
555+
eventRunner.forceDeleteConnection(connectionId);
550556
}
551557

552558
private ConnectionRead buildConnectionRead(final UUID connectionId)

airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,15 @@ public void deleteSource(final SourceRead source)
213213
final var workspaceIdRequestBody = new WorkspaceIdRequestBody()
214214
.workspaceId(source.getWorkspaceId());
215215

216-
connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
216+
final List<UUID> uuidsToDelete = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
217217
.getConnections().stream()
218218
.filter(con -> con.getSourceId().equals(source.getSourceId()))
219219
.map(ConnectionRead::getConnectionId)
220-
.forEach(connectionsHandler::deleteConnection);
220+
.toList();
221+
222+
for (final UUID uuidToDelete : uuidsToDelete) {
223+
connectionsHandler.deleteConnection(uuidToDelete);
224+
}
221225

222226
final var spec = getSpecFromSourceId(source.getSourceId());
223227
final var fullConfig = secretsRepositoryReader.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration();

airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ public interface EventRunner {
2020

2121
ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset, final boolean runSyncImmediately);
2222

23-
void deleteConnection(final UUID connectionId);
23+
void forceDeleteConnection(final UUID connectionId);
2424

25+
// TODO: Delete
26+
@Deprecated(forRemoval = true)
2527
void migrateSyncIfNeeded(final Set<UUID> connectionIds);
2628

2729
void update(final UUID connectionId);

airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public ManualOperationResult resetConnection(final UUID connectionId,
4040
}
4141

4242
@Override
43-
public void deleteConnection(final UUID connectionId) {
44-
temporalClient.deleteConnection(connectionId);
43+
public void forceDeleteConnection(final UUID connectionId) {
44+
temporalClient.forceDeleteWorkflow(connectionId);
4545
}
4646

4747
@Override

airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import io.airbyte.server.helpers.ConnectionHelpers;
6767
import io.airbyte.server.scheduler.EventRunner;
6868
import io.airbyte.validation.json.JsonValidationException;
69+
import io.airbyte.workers.helper.ConnectionHelper;
6970
import java.io.IOException;
7071
import java.util.Collections;
7172
import java.util.List;
@@ -98,6 +99,7 @@ class ConnectionsHandlerTest {
9899
private WorkspaceHelper workspaceHelper;
99100
private TrackingClient trackingClient;
100101
private EventRunner eventRunner;
102+
private ConnectionHelper connectionHelper;
101103

102104
private static final String PRESTO_TO_HUDI = "presto to hudi";
103105
private static final String PRESTO_TO_HUDI_PREFIX = "presto_to_hudi";
@@ -173,7 +175,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
173175
workspaceHelper = mock(WorkspaceHelper.class);
174176
trackingClient = mock(TrackingClient.class);
175177
eventRunner = mock(EventRunner.class);
176-
178+
connectionHelper = mock(ConnectionHelper.class);
177179
when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId);
178180
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId);
179181
when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId);
@@ -190,7 +192,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio
190192
uuidGenerator,
191193
workspaceHelper,
192194
trackingClient,
193-
eventRunner);
195+
eventRunner,
196+
connectionHelper);
194197

195198
when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
196199
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
@@ -831,10 +834,10 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep
831834
}
832835

833836
@Test
834-
void testDeleteConnection() {
837+
void testDeleteConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
835838
connectionsHandler.deleteConnection(connectionId);
836839

837-
verify(eventRunner).deleteConnection(connectionId);
840+
verify(connectionHelper).deleteConnection(connectionId);
838841
}
839842

840843
@Test
@@ -904,7 +907,8 @@ void setUp() {
904907
uuidGenerator,
905908
workspaceHelper,
906909
trackingClient,
907-
eventRunner);
910+
eventRunner,
911+
connectionHelper);
908912
}
909913

910914
@Test

airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,6 @@ void testIncrementalSync() throws Exception {
658658

659659
}
660660

661-
@Disabled
662661
@Test
663662
@Order(14)
664663
void testDeleteConnection() throws Exception {

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
116116
private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
117117
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;
118118

119+
private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal";
120+
private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1;
121+
119122
private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
120123
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
121124
private static final String RECORD_METRIC_TAG = "record_metric";
@@ -182,10 +185,17 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
182185
if (workflowState.isDeleted()) {
183186
if (workflowState.isRunning()) {
184187
log.info("Cancelling the current running job because a connection deletion was requested");
188+
// This call is not needed anymore since this will be cancel using the the cancellation state
185189
reportCancelled(connectionUpdaterInput.getConnectionId());
186190
}
187-
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
188-
deleteConnectionBeforeTerminatingTheWorkflow();
191+
192+
final int dontDeleteInTemporal =
193+
Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION);
194+
195+
if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION) {
196+
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
197+
deleteConnectionBeforeTerminatingTheWorkflow();
198+
}
189199
return;
190200
}
191201

@@ -503,6 +513,7 @@ public void cancelJob() {
503513
cancellableSyncWorkflow.cancel();
504514
}
505515

516+
// TODO: Delete when the don't delete in temporal is removed
506517
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
507518
@Override
508519
public void deleteConnection() {

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.util.Map;
2121

22+
// TODO: Deleted when version is removed
2223
@Singleton
2324
@Requires(env = WorkerMode.CONTROL_PLANE)
2425
public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity {

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ void cancelNonRunning() throws InterruptedException {
486486
Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity);
487487
}
488488

489+
// TODO: delete when the signal method can be removed
489490
@Test
490491
@Timeout(value = 10,
491492
unit = TimeUnit.SECONDS)
@@ -533,7 +534,7 @@ void deleteSync() throws InterruptedException {
533534
&& changedStateEvent.isValue())
534535
.isEmpty();
535536

536-
Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any());
537+
Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any());
537538
}
538539

539540
@Test

0 commit comments

Comments
 (0)