Skip to content

Use worker config resource requirements by default in jobs #10231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public class EnvConfigs implements Configs {
// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
private static final String DEFAULT_JOB_CPU_REQUIREMENT = null;
private static final String DEFAULT_JOB_MEMORY_REQUIREMENT = null;
private static final String DEFAULT_JOB_CPU_REQUIREMENT = "0.2";
private static final String DEFAULT_JOB_MEMORY_REQUIREMENT = "500Mi";
private static final String DEFAULT_JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent";
private static final String SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID";
private static final String SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.WorkerConfigs;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -48,13 +49,14 @@ public class JobScheduler implements Runnable {

public JobScheduler(final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final TrackingClient trackingClient) {
final TrackingClient trackingClient,
final WorkerConfigs workerConfigs) {
this(
jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository),
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
Expand Down Expand Up @@ -113,6 +114,8 @@ public SchedulerApp(final Path workspaceRoot,
}

public void start() throws IOException {
final Configs configs = new EnvConfigs();
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
Copy link
Contributor Author

@lmossman lmossman Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be safe to do here in the SchedulerApp, because I see that the resource requirement env variables exist in each of these environments:

  • Docker compose:
    - JOB_MAIN_CONTAINER_CPU_LIMIT=${JOB_MAIN_CONTAINER_CPU_LIMIT}
    - JOB_MAIN_CONTAINER_CPU_REQUEST=${JOB_MAIN_CONTAINER_CPU_REQUEST}
    - JOB_MAIN_CONTAINER_MEMORY_LIMIT=${JOB_MAIN_CONTAINER_MEMORY_LIMIT}
    - JOB_MAIN_CONTAINER_MEMORY_REQUEST=${JOB_MAIN_CONTAINER_MEMORY_REQUEST}
  • Kube overlays:
    JOB_MAIN_CONTAINER_CPU_REQUEST=
    JOB_MAIN_CONTAINER_CPU_LIMIT=
    JOB_MAIN_CONTAINER_MEMORY_REQUEST=
    JOB_MAIN_CONTAINER_MEMORY_LIMIT=
  • Helm charts:
    - name: JOB_MAIN_CONTAINER_CPU_REQUEST
    valueFrom:
    configMapKeyRef:
    name: airbyte-env
    key: JOB_MAIN_CONTAINER_CPU_REQUEST
    - name: JOB_MAIN_CONTAINER_CPU_LIMIT
    valueFrom:
    configMapKeyRef:
    name: airbyte-env
    key: JOB_MAIN_CONTAINER_CPU_LIMIT
    - name: JOB_MAIN_CONTAINER_MEMORY_REQUEST
    valueFrom:
    configMapKeyRef:
    name: airbyte-env
    key: JOB_MAIN_CONTAINER_MEMORY_REQUEST
    - name: JOB_MAIN_CONTAINER_MEMORY_LIMIT
    valueFrom:
    configMapKeyRef:
    name: airbyte-env
    key: JOB_MAIN_CONTAINER_MEMORY_LIMIT

final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
if (!featureFlags.usesNewScheduler()) {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(submitterNumThreads, THREAD_FACTORY);
Expand All @@ -126,7 +129,7 @@ public void start() throws IOException {
featureFlags);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient, workerConfigs);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
Expand All @@ -24,10 +25,14 @@ public class DefaultJobCreator implements JobCreator {

private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final ResourceRequirements workerResourceRequirements;

public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigRepository configRepository) {
public DefaultJobCreator(final JobPersistence jobPersistence,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the class with the main changes of using the WorkerConfig resource requirements if they are not set on the StandardSync

final ConfigRepository configRepository,
final ResourceRequirements workerResourceRequirements) {
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.workerResourceRequirements = workerResourceRequirements;
}

@Override
Expand All @@ -50,7 +55,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withOperationSequence(standardSyncOperations)
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
.withState(null)
.withResourceRequirements(standardSync.getResourceRequirements());
.withResourceRequirements(getJobResourceRequirements(standardSync));

configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

Expand Down Expand Up @@ -86,12 +91,34 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
.withDestinationConfiguration(destination.getConfiguration())
.withOperationSequence(standardSyncOperations)
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
.withResourceRequirements(standardSync.getResourceRequirements());
.withResourceRequirements(getJobResourceRequirements(standardSync));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

private ResourceRequirements getJobResourceRequirements(final StandardSync standardSync) {
if (standardSync.getResourceRequirements() == null) {
return workerResourceRequirements;
}

final ResourceRequirements jobResourceRequirements = standardSync.getResourceRequirements();
if (jobResourceRequirements.getCpuRequest() == null) {
jobResourceRequirements.setCpuRequest(workerResourceRequirements.getCpuRequest());
}
if (jobResourceRequirements.getCpuLimit() == null) {
jobResourceRequirements.setCpuLimit(workerResourceRequirements.getCpuLimit());
}
if (jobResourceRequirements.getMemoryRequest() == null) {
jobResourceRequirements.setMemoryRequest(workerResourceRequirements.getMemoryRequest());
}
if (jobResourceRequirements.getMemoryLimit() == null) {
jobResourceRequirements.setMemoryLimit(workerResourceRequirements.getMemoryLimit());
}

return jobResourceRequirements;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class DefaultJobCreatorTest {
private JobPersistence jobPersistence;
private ConfigRepository configRepository;
private JobCreator jobCreator;
private ResourceRequirements workerResourceRequirements;

static {
final UUID workspaceId = UUID.randomUUID();
Expand Down Expand Up @@ -114,7 +116,8 @@ public class DefaultJobCreatorTest {
void setup() {
jobPersistence = mock(JobPersistence.class);
configRepository = mock(ConfigRepository.class);
jobCreator = new DefaultJobCreator(jobPersistence, configRepository);
workerResourceRequirements = new ResourceRequirements().withCpuLimit("0.2").withCpuRequest("0.2").withMemoryLimit("200Mi").withMemoryRequest("200Mi");
jobCreator = new DefaultJobCreator(jobPersistence, configRepository, workerResourceRequirements);
}

@Test
Expand All @@ -128,7 +131,8 @@ void testCreateSyncJob() throws IOException {
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog())
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -158,7 +162,8 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog())
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -192,7 +197,8 @@ void testCreateResetConnectionJob() throws IOException {
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand Down Expand Up @@ -225,7 +231,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION));
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand Down
6 changes: 4 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private static void assertDatabasesReady(final Configs configs,

public static ServerRunnable getServer(final ServerFactory apiFactory, final ConfigPersistence seed) throws Exception {
final Configs configs = new EnvConfigs();
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);

LogClientSingleton.getInstance().setWorkspaceMdc(
configs.getWorkerEnvironment(),
Expand Down Expand Up @@ -184,7 +185,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs);
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient);
final SchedulerJobClient schedulerJobClient =
new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence, configRepository));
new DefaultSchedulerJobClient(jobPersistence,
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
Expand All @@ -209,7 +211,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
trackingClient,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
new WorkerConfigs(configs),
workerConfigs,
configs.getWebappUrl(),
configs.getAirbyteVersion(),
configs.getWorkspaceRoot(),
Expand Down
11 changes: 5 additions & 6 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void start() {

syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);

final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository);
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements());

final Worker connectionUpdaterWorker =
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
Expand Down Expand Up @@ -272,7 +272,7 @@ public static record ContainerOrchestratorConfig(
String containerOrchestratorImage,
String googleApplicationCredentials) {}

static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Configs configs) {
static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(final Configs configs) {
if (configs.getContainerOrchestratorEnabled()) {
final var kubernetesClient = new DefaultKubernetesClient();

Expand All @@ -295,6 +295,7 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Conf

private static void launchWorkerApp() throws IOException {
final Configs configs = new EnvConfigs();
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);

LogClientSingleton.getInstance().setWorkspaceMdc(configs.getWorkerEnvironment(), configs.getLogConfigs(),
LogClientSingleton.getInstance().getSchedulerLogsRoot(configs.getWorkspaceRoot()));
Expand Down Expand Up @@ -342,7 +343,7 @@ private static void launchWorkerApp() throws IOException {
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository),
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient));

Expand All @@ -360,8 +361,6 @@ private static void launchWorkerApp() throws IOException {
configRepository,
jobPersistence);

final WorkerConfigs workerConfigs = new WorkerConfigs(configs);

final ConnectionHelper connectionHelper = new ConnectionHelper(
configRepository,
workspaceHelper,
Expand Down Expand Up @@ -401,7 +400,7 @@ private static void launchWorkerApp() throws IOException {
public static void main(final String[] args) {
try {
launchWorkerApp();
} catch (Throwable t) {
} catch (final Throwable t) {
LOGGER.error("Worker app failed", t);
System.exit(1);
}
Expand Down