Skip to content

Commit bb7af73

Browse files
authored
Performance: Inject Socat CPU resources into the Orchestrator. (#22288)
Performance testing shows that Socat needs more than 1 CPU to get to a higher throughput. This PR lets us propagate env vars on the worker onto the orchestrator and the connector pods it creates. - Modify the worker so that the orchestrator is created with Socat CPU resource vars. - Modify the orchestrator injection mechanism so via the WorkerConfigs so it retrieves the Socat CPU vars from the env and passes it to the Kube Process Factory. All this is very ugly and hacky. This entire mechanism needs to be simplified. I am ignoring that now for the purposes of performance work.
1 parent aa5ed6d commit bb7af73

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.airbyte.commons.features.EnvVariableFeatureFlags;
2525
import io.airbyte.commons.features.FeatureFlags;
2626
import io.airbyte.config.AllowedHosts;
27+
import io.airbyte.config.Configs;
28+
import io.airbyte.config.EnvConfigs;
2729
import io.airbyte.config.ResourceRequirements;
2830
import io.airbyte.config.WorkerEnvConstants;
2931
import io.airbyte.metrics.lib.ApmTraceUtils;
@@ -219,14 +221,17 @@ public Process write(final Path jobRoot,
219221
}
220222

221223
private Map<String, String> getWorkerMetadata() {
224+
final Configs configs = new EnvConfigs();
222225
return Map.of(
223226
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
224227
WorkerEnvConstants.WORKER_JOB_ID, jobId,
225228
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
226229
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
227230
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
228231
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
229-
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
232+
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces(),
233+
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit(),
234+
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest());
230235
}
231236

232237
}

airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.Lists;
1818
import io.airbyte.commons.features.EnvVariableFeatureFlags;
1919
import io.airbyte.commons.features.FeatureFlags;
20+
import io.airbyte.config.Configs;
2021
import io.airbyte.config.EnvConfigs;
2122
import io.airbyte.config.WorkerEnvConstants;
2223
import io.airbyte.workers.WorkerConfigs;
@@ -51,15 +52,19 @@ class AirbyteIntegrationLauncherTest {
5152
CATALOG, "{}",
5253
"state", "{}");
5354

54-
private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
55+
private static final FeatureFlags FEATURE_FLAGS = new EnvVariableFeatureFlags();
56+
private static final Configs CONFIGS = new EnvConfigs();
57+
5558
private static final Map<String, String> JOB_METADATA = Map.of(
5659
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
5760
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
5861
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
59-
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
60-
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
61-
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
62-
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
62+
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()),
63+
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()),
64+
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()),
65+
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces(),
66+
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest(),
67+
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit());
6368

6469
private WorkerConfigs workerConfigs;
6570
@Mock
@@ -70,7 +75,7 @@ class AirbyteIntegrationLauncherTest {
7075
void setUp() {
7176
workerConfigs = new WorkerConfigs(new EnvConfigs());
7277
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), null, false,
73-
featureFlags);
78+
FEATURE_FLAGS);
7479
}
7580

7681
@Test

airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import io.airbyte.commons.features.EnvVariableFeatureFlags;
88
import io.airbyte.commons.features.FeatureFlags;
9+
import io.airbyte.config.Configs;
910
import io.airbyte.config.Configs.WorkerEnvironment;
11+
import io.airbyte.config.EnvConfigs;
1012
import io.airbyte.config.storage.CloudStorageConfigs;
1113
import io.airbyte.workers.ContainerOrchestratorConfig;
1214
import io.airbyte.workers.storage.DocumentStoreClient;
@@ -99,6 +101,10 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
99101
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath);
100102
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR, dataPlaneServiceAccountEmail);
101103

104+
final Configs configs = new EnvConfigs();
105+
environmentVariables.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit());
106+
environmentVariables.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest());
107+
102108
if (System.getenv(DD_ENV_ENV_VAR) != null) {
103109
environmentVariables.put(DD_ENV_ENV_VAR, System.getenv(DD_ENV_ENV_VAR));
104110
}

0 commit comments

Comments
 (0)