Skip to content

Commit dedd1ea

Browse files
authored
Use worker config resource requirements by default in jobs (#10231)
* use WorkerConfigs resource requirements by default for Jobs * format * fix test * set default resource requirements in EnvConfigs * Revert "set default resource requirements in EnvConfigs" This reverts commit 39d21f0. * format
1 parent efbb624 commit dedd1ea

File tree

6 files changed

+63
-19
lines changed

6 files changed

+63
-19
lines changed

airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
1818
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
1919
import io.airbyte.validation.json.JsonValidationException;
20+
import io.airbyte.workers.WorkerConfigs;
2021
import java.io.IOException;
2122
import java.time.Instant;
2223
import java.util.List;
@@ -48,13 +49,14 @@ public class JobScheduler implements Runnable {
4849

4950
public JobScheduler(final JobPersistence jobPersistence,
5051
final ConfigRepository configRepository,
51-
final TrackingClient trackingClient) {
52+
final TrackingClient trackingClient,
53+
final WorkerConfigs workerConfigs) {
5254
this(
5355
jobPersistence,
5456
configRepository,
5557
new ScheduleJobPredicate(Instant::now),
5658
new DefaultSyncJobFactory(
57-
new DefaultJobCreator(jobPersistence, configRepository),
59+
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
5860
configRepository,
5961
new OAuthConfigSupplier(configRepository, trackingClient)));
6062
}

airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.airbyte.scheduler.persistence.JobPersistence;
3838
import io.airbyte.scheduler.persistence.WorkspaceHelper;
3939
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
40+
import io.airbyte.workers.WorkerConfigs;
4041
import io.airbyte.workers.temporal.TemporalClient;
4142
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
4243
import java.io.IOException;
@@ -113,6 +114,8 @@ public SchedulerApp(final Path workspaceRoot,
113114
}
114115

115116
public void start() throws IOException {
117+
final Configs configs = new EnvConfigs();
118+
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
116119
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
117120
if (!featureFlags.usesNewScheduler()) {
118121
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(submitterNumThreads, THREAD_FACTORY);
@@ -126,7 +129,7 @@ public void start() throws IOException {
126129
featureFlags);
127130
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
128131
final TrackingClient trackingClient = TrackingClientSingleton.get();
129-
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient);
132+
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient, workerConfigs);
130133
final JobSubmitter jobSubmitter = new JobSubmitter(
131134
workerThreadPool,
132135
jobPersistence,

airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.airbyte.config.JobConfig.ConfigType;
1010
import io.airbyte.config.JobResetConnectionConfig;
1111
import io.airbyte.config.JobSyncConfig;
12+
import io.airbyte.config.ResourceRequirements;
1213
import io.airbyte.config.SourceConnection;
1314
import io.airbyte.config.StandardSync;
1415
import io.airbyte.config.StandardSyncOperation;
@@ -24,10 +25,14 @@ public class DefaultJobCreator implements JobCreator {
2425

2526
private final JobPersistence jobPersistence;
2627
private final ConfigRepository configRepository;
28+
private final ResourceRequirements workerResourceRequirements;
2729

28-
public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigRepository configRepository) {
30+
public DefaultJobCreator(final JobPersistence jobPersistence,
31+
final ConfigRepository configRepository,
32+
final ResourceRequirements workerResourceRequirements) {
2933
this.jobPersistence = jobPersistence;
3034
this.configRepository = configRepository;
35+
this.workerResourceRequirements = workerResourceRequirements;
3136
}
3237

3338
@Override
@@ -50,7 +55,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
5055
.withOperationSequence(standardSyncOperations)
5156
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
5257
.withState(null)
53-
.withResourceRequirements(standardSync.getResourceRequirements());
58+
.withResourceRequirements(getJobResourceRequirements(standardSync));
5459

5560
configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
5661

@@ -86,12 +91,34 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
8691
.withDestinationConfiguration(destination.getConfiguration())
8792
.withOperationSequence(standardSyncOperations)
8893
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
89-
.withResourceRequirements(standardSync.getResourceRequirements());
94+
.withResourceRequirements(getJobResourceRequirements(standardSync));
9095

9196
final JobConfig jobConfig = new JobConfig()
9297
.withConfigType(ConfigType.RESET_CONNECTION)
9398
.withResetConnection(resetConnectionConfig);
9499
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
95100
}
96101

102+
private ResourceRequirements getJobResourceRequirements(final StandardSync standardSync) {
103+
if (standardSync.getResourceRequirements() == null) {
104+
return workerResourceRequirements;
105+
}
106+
107+
final ResourceRequirements jobResourceRequirements = standardSync.getResourceRequirements();
108+
if (jobResourceRequirements.getCpuRequest() == null) {
109+
jobResourceRequirements.setCpuRequest(workerResourceRequirements.getCpuRequest());
110+
}
111+
if (jobResourceRequirements.getCpuLimit() == null) {
112+
jobResourceRequirements.setCpuLimit(workerResourceRequirements.getCpuLimit());
113+
}
114+
if (jobResourceRequirements.getMemoryRequest() == null) {
115+
jobResourceRequirements.setMemoryRequest(workerResourceRequirements.getMemoryRequest());
116+
}
117+
if (jobResourceRequirements.getMemoryLimit() == null) {
118+
jobResourceRequirements.setMemoryLimit(workerResourceRequirements.getMemoryLimit());
119+
}
120+
121+
return jobResourceRequirements;
122+
}
123+
97124
}

airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
2121
import io.airbyte.config.OperatorNormalization;
2222
import io.airbyte.config.OperatorNormalization.Option;
23+
import io.airbyte.config.ResourceRequirements;
2324
import io.airbyte.config.SourceConnection;
2425
import io.airbyte.config.StandardSync;
2526
import io.airbyte.config.StandardSyncOperation;
@@ -55,6 +56,7 @@ public class DefaultJobCreatorTest {
5556
private JobPersistence jobPersistence;
5657
private ConfigRepository configRepository;
5758
private JobCreator jobCreator;
59+
private ResourceRequirements workerResourceRequirements;
5860

5961
static {
6062
final UUID workspaceId = UUID.randomUUID();
@@ -114,7 +116,12 @@ public class DefaultJobCreatorTest {
114116
void setup() {
115117
jobPersistence = mock(JobPersistence.class);
116118
configRepository = mock(ConfigRepository.class);
117-
jobCreator = new DefaultJobCreator(jobPersistence, configRepository);
119+
workerResourceRequirements = new ResourceRequirements()
120+
.withCpuLimit("0.2")
121+
.withCpuRequest("0.2")
122+
.withMemoryLimit("200Mi")
123+
.withMemoryRequest("200Mi");
124+
jobCreator = new DefaultJobCreator(jobPersistence, configRepository, workerResourceRequirements);
118125
}
119126

120127
@Test
@@ -128,7 +135,8 @@ void testCreateSyncJob() throws IOException {
128135
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
129136
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
130137
.withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog())
131-
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
138+
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
139+
.withResourceRequirements(workerResourceRequirements);
132140

133141
final JobConfig jobConfig = new JobConfig()
134142
.withConfigType(JobConfig.ConfigType.SYNC)
@@ -158,7 +166,8 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
158166
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
159167
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
160168
.withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog())
161-
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
169+
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
170+
.withResourceRequirements(workerResourceRequirements);
162171

163172
final JobConfig jobConfig = new JobConfig()
164173
.withConfigType(JobConfig.ConfigType.SYNC)
@@ -192,7 +201,8 @@ void testCreateResetConnectionJob() throws IOException {
192201
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
193202
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
194203
.withConfiguredAirbyteCatalog(expectedCatalog)
195-
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
204+
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
205+
.withResourceRequirements(workerResourceRequirements);
196206

197207
final JobConfig jobConfig = new JobConfig()
198208
.withConfigType(ConfigType.RESET_CONNECTION)
@@ -225,7 +235,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
225235
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
226236
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
227237
.withConfiguredAirbyteCatalog(expectedCatalog)
228-
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
238+
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
239+
.withResourceRequirements(workerResourceRequirements);
229240

230241
final JobConfig jobConfig = new JobConfig()
231242
.withConfigType(ConfigType.RESET_CONNECTION)

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ private static void assertDatabasesReady(final Configs configs,
141141

142142
public static ServerRunnable getServer(final ServerFactory apiFactory, final ConfigPersistence seed) throws Exception {
143143
final Configs configs = new EnvConfigs();
144+
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
144145

145146
LogClientSingleton.getInstance().setWorkspaceMdc(
146147
configs.getWorkerEnvironment(),
@@ -184,7 +185,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
184185
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs);
185186
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient);
186187
final SchedulerJobClient schedulerJobClient =
187-
new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence, configRepository));
188+
new DefaultSchedulerJobClient(jobPersistence,
189+
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()));
188190
final DefaultSynchronousSchedulerClient syncSchedulerClient =
189191
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
190192
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
@@ -209,7 +211,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
209211
trackingClient,
210212
configs.getWorkerEnvironment(),
211213
configs.getLogConfigs(),
212-
new WorkerConfigs(configs),
214+
workerConfigs,
213215
configs.getWebappUrl(),
214216
configs.getAirbyteVersion(),
215217
configs.getWorkspaceRoot(),

airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void start() {
183183

184184
syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);
185185

186-
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository);
186+
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements());
187187

188188
final Worker connectionUpdaterWorker =
189189
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
@@ -272,7 +272,7 @@ public static record ContainerOrchestratorConfig(
272272
String containerOrchestratorImage,
273273
String googleApplicationCredentials) {}
274274

275-
static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Configs configs) {
275+
static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(final Configs configs) {
276276
if (configs.getContainerOrchestratorEnabled()) {
277277
final var kubernetesClient = new DefaultKubernetesClient();
278278

@@ -295,6 +295,7 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Conf
295295

296296
private static void launchWorkerApp() throws IOException {
297297
final Configs configs = new EnvConfigs();
298+
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
298299

299300
LogClientSingleton.getInstance().setWorkspaceMdc(configs.getWorkerEnvironment(), configs.getLogConfigs(),
300301
LogClientSingleton.getInstance().getSchedulerLogsRoot(configs.getWorkspaceRoot()));
@@ -342,7 +343,7 @@ private static void launchWorkerApp() throws IOException {
342343
configRepository);
343344
final TrackingClient trackingClient = TrackingClientSingleton.get();
344345
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
345-
new DefaultJobCreator(jobPersistence, configRepository),
346+
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
346347
configRepository,
347348
new OAuthConfigSupplier(configRepository, trackingClient));
348349

@@ -360,8 +361,6 @@ private static void launchWorkerApp() throws IOException {
360361
configRepository,
361362
jobPersistence);
362363

363-
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
364-
365364
final ConnectionHelper connectionHelper = new ConnectionHelper(
366365
configRepository,
367366
workspaceHelper,
@@ -401,7 +400,7 @@ private static void launchWorkerApp() throws IOException {
401400
public static void main(final String[] args) {
402401
try {
403402
launchWorkerApp();
404-
} catch (Throwable t) {
403+
} catch (final Throwable t) {
405404
LOGGER.error("Worker app failed", t);
406405
System.exit(1);
407406
}

0 commit comments

Comments
 (0)