Skip to content

Commit 63ccc32

Browse files
authored
Bmoric/remove unused code (#19188)
* Tmp * Move when the deletion is performed * Re-enable disable test * PR comments * Use cancel * rename * Fix test and version check position * remove unused temporal deletion code * Remove false todo * Rm repeated test * Rm unused import
1 parent fdb96d0 commit 63ccc32

File tree

6 files changed

+6
-175
lines changed

6 files changed

+6
-175
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java

-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity;
1212
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
1313
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
14-
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity;
1514
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
1615
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
1716
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
@@ -70,7 +69,6 @@ public List<Object> connectionManagerActivities(
7069
final GenerateInputActivity generateInputActivity,
7170
final JobCreationAndStatusUpdateActivity jobCreationAndStatusUpdateActivity,
7271
final ConfigFetchActivity configFetchActivity,
73-
final ConnectionDeletionActivity connectionDeletionActivity,
7472
final CheckConnectionActivity checkConnectionActivity,
7573
final AutoDisableConnectionActivity autoDisableConnectionActivity,
7674
final StreamResetActivity streamResetActivity,
@@ -80,7 +78,6 @@ public List<Object> connectionManagerActivities(
8078
return List.of(generateInputActivity,
8179
jobCreationAndStatusUpdateActivity,
8280
configFetchActivity,
83-
connectionDeletionActivity,
8481
checkConnectionActivity,
8582
autoDisableConnectionActivity,
8683
streamResetActivity,

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

-22
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
4747
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
4848
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
49-
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity;
50-
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity.ConnectionDeletionInput;
5149
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
5250
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
5351
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
@@ -117,9 +115,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
117115
private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
118116
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;
119117

120-
private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal";
121-
private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1;
122-
123118
private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
124119
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
125120
private static final String RECORD_METRIC_TAG = "record_metric";
@@ -140,8 +135,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
140135
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
141136
private ConfigFetchActivity configFetchActivity;
142137
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
143-
private ConnectionDeletionActivity connectionDeletionActivity;
144-
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
145138
private AutoDisableConnectionActivity autoDisableConnectionActivity;
146139
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
147140
private CheckConnectionActivity checkActivity;
@@ -190,13 +183,6 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
190183
reportCancelled(connectionUpdaterInput.getConnectionId());
191184
}
192185

193-
final int dontDeleteInTemporal =
194-
Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION);
195-
196-
if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION) {
197-
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
198-
deleteConnectionBeforeTerminatingTheWorkflow();
199-
}
200186
return;
201187
}
202188

@@ -902,14 +888,6 @@ private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
902888
return false;
903889
}
904890

905-
/**
906-
* Delete a connection
907-
*/
908-
private void deleteConnectionBeforeTerminatingTheWorkflow() {
909-
final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionId);
910-
runMandatoryActivity(connectionDeletionActivity::deleteConnection, connectionDeletionInput);
911-
}
912-
913891
/**
914892
* Set a job as cancel and continue to the next job if and continue as a reset if needed
915893
*/

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivity.java

-32
This file was deleted.

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java

-44
This file was deleted.

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
3636
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.GetMaxAttemptOutput;
3737
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
38-
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity;
3938
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
4039
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
4140
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivityImpl;
@@ -114,8 +113,6 @@ class ConnectionManagerWorkflowTest {
114113
mock(ConfigFetchActivity.class, Mockito.withSettings().withoutAnnotations());
115114
private final CheckConnectionActivity mCheckConnectionActivity =
116115
mock(CheckConnectionActivity.class, Mockito.withSettings().withoutAnnotations());
117-
private static final ConnectionDeletionActivity mConnectionDeletionActivity =
118-
mock(ConnectionDeletionActivity.class, Mockito.withSettings().withoutAnnotations());
119116
private static final GenerateInputActivityImpl mGenerateInputActivityImpl =
120117
mock(GenerateInputActivityImpl.class, Mockito.withSettings().withoutAnnotations());
121118
private static final JobCreationAndStatusUpdateActivity mJobCreationAndStatusUpdateActivity =
@@ -152,7 +149,6 @@ static Stream<Arguments> getMaxAttemptForResetRetry() {
152149
void setUp() {
153150
Mockito.reset(mConfigFetchActivity);
154151
Mockito.reset(mCheckConnectionActivity);
155-
Mockito.reset(mConnectionDeletionActivity);
156152
Mockito.reset(mGenerateInputActivityImpl);
157153
Mockito.reset(mJobCreationAndStatusUpdateActivity);
158154
Mockito.reset(mAutoDisableConnectionActivity);
@@ -533,8 +529,6 @@ void deleteSync() throws InterruptedException {
533529
&& changedStateEvent.getField() != StateField.DONE_WAITING
534530
&& changedStateEvent.isValue())
535531
.isEmpty();
536-
537-
Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any());
538532
}
539533

540534
@Test
@@ -900,8 +894,8 @@ void setup() {
900894

901895
final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name());
902896
managerWorker.registerWorkflowImplementationTypes(temporalProxyHelper.proxyWorkflowClass(ConnectionManagerWorkflowImpl.class));
903-
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mConnectionDeletionActivity,
904-
mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
897+
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mGenerateInputActivityImpl,
898+
mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
905899
mWorkflowConfigActivity, mRouteToSyncTaskQueueActivity);
906900

907901
client = testEnv.getWorkflowClient();
@@ -998,8 +992,8 @@ void setup() {
998992

999993
final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name());
1000994
managerWorker.registerWorkflowImplementationTypes(temporalProxyHelper.proxyWorkflowClass(ConnectionManagerWorkflowImpl.class));
1001-
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mConnectionDeletionActivity,
1002-
mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
995+
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mGenerateInputActivityImpl,
996+
mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
1003997
mWorkflowConfigActivity, mRouteToSyncTaskQueueActivity);
1004998

1005999
client = testEnv.getWorkflowClient();
@@ -1586,8 +1580,8 @@ private <T extends SyncWorkflow> void setupSpecificChildWorkflow(final Class<T>
15861580

15871581
final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name());
15881582
managerWorker.registerWorkflowImplementationTypes(temporalProxyHelper.proxyWorkflowClass(ConnectionManagerWorkflowImpl.class));
1589-
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mConnectionDeletionActivity,
1590-
mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
1583+
managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mCheckConnectionActivity, mGenerateInputActivityImpl,
1584+
mJobCreationAndStatusUpdateActivity, mAutoDisableConnectionActivity, mRecordMetricActivity,
15911585
mWorkflowConfigActivity, mRouteToSyncTaskQueueActivity);
15921586

15931587
client = testEnv.getWorkflowClient();

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityTest.java

-62
This file was deleted.

0 commit comments

Comments
 (0)