Skip to content

Commit efbb624

Browse files
authored
separate replication orchestrator resource requests (#10227)
1 parent 02f382d commit efbb624

File tree

4 files changed

+60
-1
lines changed

4 files changed

+60
-1
lines changed

airbyte-config/models/src/main/java/io/airbyte/config/Configs.java

+20
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,26 @@ public interface Configs {
353353
*/
354354
String getContainerOrchestratorImage();
355355

356+
/**
357+
* Define the replication orchestrator's minimum CPU usage. Defaults to none.
358+
*/
359+
String getReplicationOrchestratorCpuRequest();
360+
361+
/**
362+
* Define the replication orchestrator's maximum CPU usage. Defaults to none.
363+
*/
364+
String getReplicationOrchestratorCpuLimit();
365+
366+
/**
367+
* Define the replication orchestrator's minimum RAM usage. Defaults to none.
368+
*/
369+
String getReplicationOrchestratorMemoryRequest();
370+
371+
/**
372+
* Define the replication orchestrator's maximum RAM usage. Defaults to none.
373+
*/
374+
String getReplicationOrchestratorMemoryLimit();
375+
356376
/**
357377
* Get the longest duration of non long running activity
358378
*/

airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java

+24
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ public class EnvConfigs implements Configs {
129129
public static final long DEFAULT_MAX_SYNC_WORKERS = 5;
130130

131131
public static final String DEFAULT_NETWORK = "host";
132+
private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
133+
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
134+
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
135+
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";
132136

133137
private final Function<String, String> getEnv;
134138
private final Supplier<Set<String>> getAllEnvKeys;
@@ -594,6 +598,26 @@ public String getContainerOrchestratorImage() {
594598
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_IMAGE, "airbyte/container-orchestrator:" + getAirbyteVersion().serialize());
595599
}
596600

601+
@Override
602+
public String getReplicationOrchestratorCpuRequest() {
603+
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_CPU_REQUEST, null);
604+
}
605+
606+
@Override
607+
public String getReplicationOrchestratorCpuLimit() {
608+
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_CPU_LIMIT, null);
609+
}
610+
611+
@Override
612+
public String getReplicationOrchestratorMemoryRequest() {
613+
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_MEMORY_REQUEST, null);
614+
}
615+
616+
@Override
617+
public String getReplicationOrchestratorMemoryLimit() {
618+
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_MEMORY_LIMIT, null);
619+
}
620+
597621
@Override
598622
public int getMaxActivityTimeoutSecond() {
599623
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "120"));

airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java

+15
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,13 @@
1313
public class WorkerConfigs {
1414

1515
private final Configs.WorkerEnvironment workerEnvironment;
16+
17+
// for running source, destination, normalization, dbt, normalization orchestrator, and dbt
18+
// orchestrator pods
1619
private final ResourceRequirements resourceRequirements;
20+
// for running replication orchestrator pods
21+
private final ResourceRequirements replicationOrchestratorResourceRequirements;
22+
1723
private final List<TolerationPOJO> workerKubeTolerations;
1824
private final Map<String, String> workerKubeNodeSelectors;
1925
private final String jobImagePullSecret;
@@ -30,6 +36,11 @@ public WorkerConfigs(final Configs configs) {
3036
.withCpuLimit(configs.getJobMainContainerCpuLimit())
3137
.withMemoryRequest(configs.getJobMainContainerMemoryRequest())
3238
.withMemoryLimit(configs.getJobMainContainerMemoryLimit());
39+
this.replicationOrchestratorResourceRequirements = new ResourceRequirements()
40+
.withCpuRequest(configs.getReplicationOrchestratorCpuRequest())
41+
.withCpuLimit(configs.getReplicationOrchestratorCpuLimit())
42+
.withMemoryRequest(configs.getReplicationOrchestratorMemoryRequest())
43+
.withMemoryLimit(configs.getReplicationOrchestratorMemoryLimit());
3344
this.workerKubeTolerations = configs.getJobKubeTolerations();
3445
this.workerKubeNodeSelectors = configs.getJobKubeNodeSelectors();
3546
this.jobImagePullSecret = configs.getJobKubeMainContainerImagePullSecret();
@@ -80,4 +91,8 @@ public Map<String, String> getEnvMap() {
8091
return envMap;
8192
}
8293

94+
public ResourceRequirements getReplicationOrchestratorResourceRequirements() {
95+
return replicationOrchestratorResourceRequirements;
96+
}
97+
8398
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public ReplicationLauncherWorker(final UUID connectionId,
4141
INIT_FILE_SOURCE_LAUNCHER_CONFIG, Jsons.serialize(sourceLauncherConfig),
4242
INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)),
4343
containerOrchestratorConfig,
44-
workerConfigs.getResourceRequirements(),
44+
workerConfigs.getReplicationOrchestratorResourceRequirements(),
4545
ReplicationOutput.class);
4646
}
4747

0 commit comments

Comments
 (0)