Skip to content

Commit 06af5db

Browse files
committed
chore: cleanup rolled out flags (#13906)
1 parent b739c1b commit 06af5db

File tree

10 files changed

+39
-595
lines changed

10 files changed

+39
-595
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/ReplicationInputHydrator.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,8 @@
4444
import io.airbyte.featureflag.Connection;
4545
import io.airbyte.featureflag.EnableMappers;
4646
import io.airbyte.featureflag.FeatureFlagClient;
47-
import io.airbyte.featureflag.Multi;
4847
import io.airbyte.featureflag.Organization;
49-
import io.airbyte.featureflag.RefreshConfigBeforeSecretHydrationInitContainer;
5048
import io.airbyte.featureflag.UseRuntimeSecretPersistence;
51-
import io.airbyte.featureflag.Workspace;
5249
import io.airbyte.metrics.lib.ApmTraceUtils;
5350
import io.airbyte.persistence.job.models.ReplicationInput;
5451
import io.airbyte.workers.helper.BackfillHelper;
@@ -58,7 +55,6 @@
5855
import io.airbyte.workers.models.ReplicationActivityInput;
5956
import java.io.IOException;
6057
import java.time.Duration;
61-
import java.util.Arrays;
6258
import java.util.HashMap;
6359
import java.util.List;
6460
import java.util.Map;
@@ -87,11 +83,6 @@ public ReplicationInputHydrator(final AirbyteApiClient airbyteApiClient,
8783
this.featureFlagClient = featureFlagClient;
8884
}
8985

90-
private Boolean shouldRefreshSecretsReferences(final ReplicationActivityInput input) {
91-
final Multi context = new Multi(Arrays.asList(new Connection(input.getConnectionId()), new Workspace(input.getWorkspaceId())));
92-
return featureFlagClient.boolVariation(RefreshConfigBeforeSecretHydrationInitContainer.INSTANCE, context);
93-
}
94-
9586
private <T> T retry(final CheckedSupplier<T> supplier) {
9687
return Failsafe.with(
9788
RetryPolicy.builder()
@@ -135,9 +126,7 @@ private void refreshSecretsReferences(final ReplicationActivityInput parsed) {
135126
*/
136127
public ReplicationInput getHydratedReplicationInput(final ReplicationActivityInput replicationActivityInput) throws Exception {
137128
ApmTraceUtils.addTagsToTrace(Map.of("api_base_url", airbyteApiClient.getDestinationApi().getBaseUrl()));
138-
if (shouldRefreshSecretsReferences(replicationActivityInput)) {
139-
refreshSecretsReferences(replicationActivityInput);
140-
}
129+
refreshSecretsReferences(replicationActivityInput);
141130
final var destination =
142131
airbyteApiClient.getDestinationApi().getDestination(new DestinationIdRequestBody(replicationActivityInput.getDestinationId()));
143132
final var tag = DockerImageName.INSTANCE.extractTag(replicationActivityInput.getDestinationLauncherConfig().getDockerImage());

airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt

-9
Original file line numberDiff line numberDiff line change
@@ -174,21 +174,12 @@ object ResetStreamsStateWhenDisabled : Temporary<Boolean>(key = "reset-stream-st
174174

175175
object ConnectorSidecarFetchesInputFromInit : Temporary<Boolean>(key = "connector-sidecar-fetches-from-init", default = false)
176176

177-
object RefreshConfigBeforeSecretHydrationInitContainer : Temporary<Boolean>(
178-
key = "platform.refresh-config-before-secret-hydration-init-container",
179-
default = false,
180-
)
181-
182177
object LogStateMsgs : Temporary<Boolean>(key = "platform.log-state-msgs", default = false)
183178

184179
object EnableMappers : Temporary<Boolean>(key = "platform.enable-mappers", default = false)
185180

186-
object ReplicationMonoPod : Temporary<Boolean>(key = "replication-mono-pod", default = true)
187-
188181
object ReplicationBufferOverride : Temporary<Int>(key = "platform.replication-buffer-override", default = 0)
189182

190-
object ReplicationMonoPodMemoryTolerance : Temporary<Int>(key = "platform.replication-mono-pod-memory-tolerance-gb", default = 256)
191-
192183
object DisableAuthHeaderReplacement : Temporary<Boolean>(key = "platform.disable-auth-header-replacement", default = false)
193184

194185
object NodeSelectorOverride : Temporary<String>(key = "platform.node-selector-override", default = "")

airbyte-workload-launcher/src/main/kotlin/config/ContainerConfigBeanFactory.kt

-15
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import io.airbyte.commons.workers.config.WorkerConfigs
88
import io.airbyte.commons.workers.config.WorkerConfigsProvider
99
import io.airbyte.config.ResourceRequirements
1010
import io.airbyte.workers.process.KubeContainerInfo
11-
import io.airbyte.workers.sync.OrchestratorConstants
12-
import io.fabric8.kubernetes.api.model.ContainerPort
13-
import io.fabric8.kubernetes.api.model.ContainerPortBuilder
1411
import io.fabric8.kubernetes.api.model.LocalObjectReference
1512
import io.fabric8.kubernetes.api.model.Toleration
1613
import io.fabric8.kubernetes.api.model.TolerationBuilder
@@ -93,18 +90,6 @@ class ContainerConfigBeanFactory {
9390
)
9491
}
9592

96-
@Singleton
97-
@Named("orchestratorContainerPorts")
98-
fun orchestratorContainerPorts(): List<ContainerPort> {
99-
return listOf(
100-
ContainerPortBuilder().withContainerPort(OrchestratorConstants.SERVER_PORT).build(),
101-
ContainerPortBuilder().withContainerPort(OrchestratorConstants.PORT1).build(),
102-
ContainerPortBuilder().withContainerPort(OrchestratorConstants.PORT2).build(),
103-
ContainerPortBuilder().withContainerPort(OrchestratorConstants.PORT3).build(),
104-
ContainerPortBuilder().withContainerPort(OrchestratorConstants.PORT4).build(),
105-
)
106-
}
107-
10893
@Singleton
10994
@Named("sidecarKubeContainerInfo")
11095
fun sidecarKubeContainerInfo(

airbyte-workload-launcher/src/main/kotlin/metrics/MeterFilterFactory.kt

-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ class MeterFilterFactory(
3838
const val LAUNCH_PIPELINE_STAGE_OPERATION_NAME = "launch-pipeline-stage"
3939
const val LAUNCH_REPLICATION_OPERATION_NAME = "launch-replication"
4040
const val RESUME_CLAIMED_OPERATION_NAME = "resume_claimed"
41-
const val WAIT_DESTINATION_OPERATION_NAME = "wait-destination"
42-
const val WAIT_ORCHESTRATOR_OPERATION_NAME = "wait-orchestrator"
43-
const val WAIT_SOURCE_OPERATION_NAME = "wait-source"
4441
const val SUCCESS_STATUS = "ok"
4542
const val FAILURE_STATUS = "error"
4643
const val RUNNING_STATUS = "running"

airbyte-workload-launcher/src/main/kotlin/pods/KubeClientException.kt

-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ class KubeClientException(
88
) : Throwable(message, cause)
99

1010
enum class PodType {
11-
ORCHESTRATOR,
12-
SOURCE,
13-
DESTINATION,
1411
REPLICATION,
1512
}
1613

airbyte-workload-launcher/src/main/kotlin/pods/KubePodClient.kt

+1-144
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,20 @@ package io.airbyte.workload.launcher.pods
33
import com.google.common.annotations.VisibleForTesting
44
import datadog.trace.api.Trace
55
import io.airbyte.commons.constants.WorkerConstants.KubeConstants.FULL_POD_TIMEOUT
6-
import io.airbyte.config.ResourceRequirements
7-
import io.airbyte.featureflag.Connection
86
import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit
97
import io.airbyte.featureflag.Context
108
import io.airbyte.featureflag.FeatureFlagClient
119
import io.airbyte.featureflag.Multi
12-
import io.airbyte.featureflag.ReplicationMonoPod
13-
import io.airbyte.featureflag.ReplicationMonoPodMemoryTolerance
1410
import io.airbyte.featureflag.Workspace
1511
import io.airbyte.metrics.lib.ApmTraceUtils
1612
import io.airbyte.persistence.job.models.ReplicationInput
17-
import io.airbyte.workers.input.getDestinationResourceReqs
18-
import io.airbyte.workers.input.getOrchestratorResourceReqs
19-
import io.airbyte.workers.input.getSourceResourceReqs
20-
import io.airbyte.workers.input.setDestinationLabels
21-
import io.airbyte.workers.input.setSourceLabels
2213
import io.airbyte.workers.models.CheckConnectionInput
2314
import io.airbyte.workers.models.DiscoverCatalogInput
2415
import io.airbyte.workers.models.SpecInput
2516
import io.airbyte.workers.pod.PodLabeler
26-
import io.airbyte.workers.process.KubePodProcess
2717
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.LAUNCH_REPLICATION_OPERATION_NAME
28-
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WAIT_DESTINATION_OPERATION_NAME
29-
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WAIT_ORCHESTRATOR_OPERATION_NAME
30-
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WAIT_SOURCE_OPERATION_NAME
3118
import io.airbyte.workload.launcher.pipeline.consumer.LauncherInput
3219
import io.airbyte.workload.launcher.pods.factories.ConnectorPodFactory
33-
import io.airbyte.workload.launcher.pods.factories.OrchestratorPodFactory
3420
import io.airbyte.workload.launcher.pods.factories.ReplicationPodFactory
3521
import io.fabric8.kubernetes.api.model.Pod
3622
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException
@@ -52,7 +38,6 @@ class KubePodClient(
5238
private val kubePodLauncher: KubePodLauncher,
5339
private val labeler: PodLabeler,
5440
private val mapper: PayloadKubeInputMapper,
55-
private val orchestratorPodFactory: OrchestratorPodFactory,
5641
private val replicationPodFactory: ReplicationPodFactory,
5742
@Named("checkPodFactory") private val checkPodFactory: ConnectorPodFactory,
5843
@Named("discoverPodFactory") private val discoverPodFactory: ConnectorPodFactory,
@@ -69,59 +54,7 @@ class KubePodClient(
6954
replicationInput: ReplicationInput,
7055
launcherInput: LauncherInput,
7156
) {
72-
if (useMonoPod(replicationInput = replicationInput)) {
73-
launchReplicationMonoPod(replicationInput, launcherInput)
74-
} else {
75-
launchReplicationPodTriplet(replicationInput, launcherInput)
76-
}
77-
}
78-
79-
fun launchReplicationPodTriplet(
80-
replicationInput: ReplicationInput,
81-
launcherInput: LauncherInput,
82-
) {
83-
val sharedLabels = labeler.getSharedLabels(launcherInput.workloadId, launcherInput.mutexKey, launcherInput.labels, launcherInput.autoId)
84-
85-
val inputWithLabels =
86-
replicationInput
87-
.setSourceLabels(sharedLabels)
88-
.setDestinationLabels(sharedLabels)
89-
90-
val kubeInput = mapper.toKubeInput(launcherInput.workloadId, inputWithLabels, sharedLabels)
91-
92-
var pod =
93-
orchestratorPodFactory.create(
94-
replicationInput.connectionId,
95-
kubeInput.orchestratorLabels,
96-
kubeInput.resourceReqs,
97-
kubeInput.nodeSelectors,
98-
kubeInput.kubePodInfo,
99-
kubeInput.annotations,
100-
kubeInput.extraEnv,
101-
)
102-
try {
103-
pod =
104-
kubePodLauncher.create(pod)
105-
} catch (e: RuntimeException) {
106-
ApmTraceUtils.addExceptionToTrace(e)
107-
throw KubeClientException(
108-
"Failed to create pod ${kubeInput.kubePodInfo.name}.",
109-
e,
110-
KubeCommandType.CREATE,
111-
PodType.ORCHESTRATOR,
112-
)
113-
}
114-
115-
waitForPodInitComplete(pod, PodType.ORCHESTRATOR.toString())
116-
117-
waitForOrchestratorStart(pod)
118-
119-
// We wait for the destination first because orchestrator starts destinations first.
120-
waitDestinationReadyOrTerminalInit(kubeInput)
121-
122-
if (!replicationInput.isReset) {
123-
waitSourceReadyOrTerminalInit(kubeInput)
124-
}
57+
launchReplicationMonoPod(replicationInput, launcherInput)
12558
}
12659

12760
fun launchReplicationMonoPod(
@@ -174,52 +107,6 @@ class KubePodClient(
174107
waitForPodInitComplete(pod, PodType.REPLICATION.toString())
175108
}
176109

177-
@Trace(operationName = WAIT_ORCHESTRATOR_OPERATION_NAME)
178-
fun waitForOrchestratorStart(pod: Pod) {
179-
try {
180-
kubePodLauncher.waitForPodReadyOrTerminalByPod(pod, ORCHESTRATOR_STARTUP_TIMEOUT_VALUE)
181-
} catch (e: RuntimeException) {
182-
ApmTraceUtils.addExceptionToTrace(e)
183-
throw KubeClientException(
184-
"Main container of orchestrator pod failed to start within allotted timeout of ${ORCHESTRATOR_STARTUP_TIMEOUT_VALUE.seconds} seconds. " +
185-
"(${e.message})",
186-
e,
187-
KubeCommandType.WAIT_MAIN,
188-
PodType.ORCHESTRATOR,
189-
)
190-
}
191-
}
192-
193-
@Trace(operationName = WAIT_SOURCE_OPERATION_NAME)
194-
fun waitSourceReadyOrTerminalInit(kubeInput: OrchestratorKubeInput) {
195-
try {
196-
kubePodLauncher.waitForPodReadyOrTerminal(kubeInput.sourceLabels, REPL_CONNECTOR_STARTUP_TIMEOUT_VALUE)
197-
} catch (e: RuntimeException) {
198-
ApmTraceUtils.addExceptionToTrace(e)
199-
throw KubeClientException(
200-
"Source pod failed to start within allotted timeout of ${REPL_CONNECTOR_STARTUP_TIMEOUT_VALUE.seconds} seconds. (${e.message})",
201-
e,
202-
KubeCommandType.WAIT_MAIN,
203-
PodType.SOURCE,
204-
)
205-
}
206-
}
207-
208-
@Trace(operationName = WAIT_DESTINATION_OPERATION_NAME)
209-
fun waitDestinationReadyOrTerminalInit(kubeInput: OrchestratorKubeInput) {
210-
try {
211-
kubePodLauncher.waitForPodReadyOrTerminal(kubeInput.destinationLabels, REPL_CONNECTOR_STARTUP_TIMEOUT_VALUE)
212-
} catch (e: RuntimeException) {
213-
ApmTraceUtils.addExceptionToTrace(e)
214-
throw KubeClientException(
215-
"Destination pod failed to start within allotted timeout of ${REPL_CONNECTOR_STARTUP_TIMEOUT_VALUE.seconds} seconds. (${e.message})",
216-
e,
217-
KubeCommandType.WAIT_MAIN,
218-
PodType.DESTINATION,
219-
)
220-
}
221-
}
222-
223110
fun launchCheck(
224111
checkInput: CheckConnectionInput,
225112
launcherInput: LauncherInput,
@@ -388,39 +275,9 @@ class KubePodClient(
388275
}
389276
}
390277

391-
private fun useMonoPod(replicationInput: ReplicationInput): Boolean {
392-
val ffContext = Multi(listOf(Workspace(replicationInput.workspaceId), Connection(replicationInput.connectionId)))
393-
val memoryFootprint = getMemoryLimitTotal(replicationInput)
394-
return featureFlagClient.boolVariation(
395-
ReplicationMonoPod,
396-
ffContext,
397-
) && featureFlagClient.intVariation(ReplicationMonoPodMemoryTolerance, ffContext) >= toGigabytes(memoryFootprint)
398-
}
399-
400-
private fun getMemoryLimitTotal(replicationInput: ReplicationInput): Long {
401-
val sourceMemoryLimit = extractMemoryLimit(replicationInput.getSourceResourceReqs())
402-
val destinationMemoryLimit = extractMemoryLimit(replicationInput.getDestinationResourceReqs())
403-
val orchestratorMemoryLimit = extractMemoryLimit(replicationInput.getOrchestratorResourceReqs())
404-
return sourceMemoryLimit + destinationMemoryLimit + orchestratorMemoryLimit
405-
}
406-
407-
private fun extractMemoryLimit(resourceRequirements: ResourceRequirements?): Long {
408-
return KubePodProcess.buildResourceRequirements(resourceRequirements)?.limits?.get("memory")?.numericalAmount?.toLong() ?: 0
409-
}
410-
411278
companion object {
412279
private val TIMEOUT_SLACK: Duration = Duration.ofSeconds(5)
413-
val ORCHESTRATOR_STARTUP_TIMEOUT_VALUE: Duration = Duration.ofMinutes(2)
414280
val POD_INIT_TIMEOUT_VALUE: Duration = Duration.ofMinutes(15)
415281
val REPL_CONNECTOR_STARTUP_TIMEOUT_VALUE: Duration = FULL_POD_TIMEOUT.plus(TIMEOUT_SLACK)
416-
private const val BYTES_PER_GB = 1024 * 1024 * 1024
417-
418-
private fun toGigabytes(bytes: Long): Int {
419-
return if (bytes > 0) {
420-
(bytes / BYTES_PER_GB).toInt()
421-
} else {
422-
bytes.toInt()
423-
}
424-
}
425282
}
426283
}

airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt

-52
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import jakarta.inject.Named
4242
import jakarta.inject.Singleton
4343
import java.util.UUID
4444
import io.airbyte.commons.envvar.EnvVar as AirbyteEnvVar
45-
import io.airbyte.config.ResourceRequirements as AirbyteResourceRequirements
4645

4746
/**
4847
* Maps domain layer objects into Kube layer inputs.
@@ -62,46 +61,6 @@ class PayloadKubeInputMapper(
6261
private val featureFlagClient: FeatureFlagClient,
6362
@Named("infraFlagContexts") private val contexts: List<Context>,
6463
) {
65-
fun toKubeInput(
66-
workloadId: String,
67-
input: ReplicationInput,
68-
sharedLabels: Map<String, String>,
69-
): OrchestratorKubeInput {
70-
val jobId = input.getJobId()
71-
val attemptId = input.getAttemptId()
72-
73-
val orchestratorPodName = podNameGenerator.getReplicationOrchestratorPodName(jobId, attemptId)
74-
val orchestratorImage: String = resolveOrchestratorImageFFOverride(input.connectionId, orchestratorKubeContainerInfo.image)
75-
val orchestratorPodInfo =
76-
KubePodInfo(
77-
namespace,
78-
orchestratorPodName,
79-
KubeContainerInfo(orchestratorImage, orchestratorKubeContainerInfo.pullPolicy),
80-
)
81-
82-
val orchestratorReqs = input.getOrchestratorResourceReqs()
83-
val nodeSelectors = getNodeSelectors(input.usesCustomConnector(), replicationWorkerConfigs, input.connectionId)
84-
85-
val runtimeEnvVars =
86-
listOf(
87-
EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SYNC.toString(), null),
88-
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null),
89-
EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null),
90-
EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null),
91-
)
92-
93-
return OrchestratorKubeInput(
94-
labeler.getReplicationOrchestratorLabels(orchestratorKubeContainerInfo.image) + sharedLabels,
95-
labeler.getSourceLabels() + sharedLabels,
96-
labeler.getDestinationLabels() + sharedLabels,
97-
nodeSelectors,
98-
orchestratorPodInfo,
99-
orchestratorReqs,
100-
replicationWorkerConfigs.workerKubeAnnotations,
101-
runtimeEnvVars,
102-
)
103-
}
104-
10564
fun toReplicationKubeInput(
10665
workloadId: String,
10766
input: ReplicationInput,
@@ -400,17 +359,6 @@ class PayloadKubeInputMapper(
400359
}
401360
}
402361

403-
data class OrchestratorKubeInput(
404-
val orchestratorLabels: Map<String, String>,
405-
val sourceLabels: Map<String, String>,
406-
val destinationLabels: Map<String, String>,
407-
val nodeSelectors: Map<String, String>,
408-
val kubePodInfo: KubePodInfo,
409-
val resourceReqs: AirbyteResourceRequirements?,
410-
val annotations: Map<String, String>,
411-
val extraEnv: List<EnvVar>,
412-
)
413-
414362
data class ReplicationKubeInput(
415363
val podName: String,
416364
val labels: Map<String, String>,

0 commit comments

Comments
 (0)