Skip to content

Specify namespace when creating pod #19399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public AsyncKubePodStatus getDocStoreStatus() {
public void create(final Map<String, String> allLabels,
final ResourceRequirements resourceRequirements,
final Map<String, String> fileMap,
final Map<Integer, Integer> portMap) {
final Map<Integer, Integer> portMap,
final Map<String, String> nodeSelectors) {
final List<Volume> volumes = new ArrayList<>();
final List<VolumeMount> volumeMounts = new ArrayList<>();
final List<EnvVar> envVars = new ArrayList<>();
Expand Down Expand Up @@ -352,6 +353,7 @@ public void create(final Map<String, String> allLabels,
.withContainers(mainContainer)
.withInitContainers(initContainer)
.withVolumes(volumes)
.withNodeSelector(nodeSelectors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should include tolerations as well like in KubePodProcess

final Pod pod = podBuilder.withTolerations(buildPodTolerations(tolerations))

However, our current node pools don't actually use taints, so we don't need tolerations for this to work for our own cluster. In general though, we should include it (maybe in a separate followup PR. buildPodTolerations looks like a private method though it should be easy to extract out of KubePodProcess.)

.endSpec()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public DbtLauncherWorker(final UUID connectionId,
Void.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AsyncKubePodStatus;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
private final Supplier<ActivityExecutionContext> activityContext;
private final Integer serverPort;
private final TemporalUtils temporalUtils;
private final WorkerConfigs workerConfigs;

private final AtomicBoolean cancelled = new AtomicBoolean(false);
private AsyncOrchestratorPodProcess process;
Expand All @@ -84,7 +86,8 @@ public LauncherWorker(final UUID connectionId,
final Class<OUTPUT> outputClass,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {

this.connectionId = connectionId;
this.application = application;
Expand All @@ -97,6 +100,7 @@ public LauncherWorker(final UUID connectionId,
this.activityContext = activityContext;
this.serverPort = serverPort;
this.temporalUtils = temporalUtils;
this.workerConfigs = workerConfigs;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -174,7 +178,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
allLabels,
resourceRequirements,
fileMap,
portMap);
portMap,
workerConfigs.getworkerKubeNodeSelectors());
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public NormalizationLauncherWorker(final UUID connectionId,
NormalizationSummary.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.WorkerConfigs;
import io.temporal.activity.ActivityExecutionContext;
import java.util.Map;
import java.util.UUID;
Expand All @@ -37,7 +38,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
final ResourceRequirements resourceRequirements,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {
super(
connectionId,
REPLICATION,
Expand All @@ -51,7 +53,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
ReplicationOutput.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class ReplicationActivityImpl implements ReplicationActivity {
private final AirbyteApiClient airbyteApiClient;
private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteMessageVersionedMigratorFactory migratorFactory;
private final WorkerConfigs workerConfigs;

public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
@Named("replicationProcessFactory") final ProcessFactory processFactory,
Expand All @@ -111,7 +113,8 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
final TemporalUtils temporalUtils,
final AirbyteApiClient airbyteApiClient,
final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteMessageVersionedMigratorFactory migratorFactory) {
final AirbyteMessageVersionedMigratorFactory migratorFactory,
@Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs) {
this.containerOrchestratorConfig = containerOrchestratorConfig;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
Expand All @@ -126,6 +129,7 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
this.airbyteApiClient = airbyteApiClient;
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.workerConfigs = workerConfigs;
}

// Marking task queue as nullable because we changed activity signature; thus runs started before
Expand Down Expand Up @@ -166,7 +170,8 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
destinationLauncherConfig,
jobRunConfig,
syncInput.getResourceRequirements(),
() -> context);
() -> context,
workerConfigs);
} else {
workerFactory =
getLegacyWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput);
Expand Down Expand Up @@ -295,7 +300,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final ResourceRequirements resourceRequirements,
final Supplier<ActivityExecutionContext> activityContext)
final Supplier<ActivityExecutionContext> activityContext,
final WorkerConfigs workerConfigs)
throws ApiException {
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(Long.valueOf(jobRunConfig.getJobId()));
Expand All @@ -313,7 +319,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
resourceRequirements,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Inte
.filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final WorkerConfigs workerConfigs = new WorkerConfigs(new EnvConfigs());

asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP,
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap);
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors());

// a final activity waits until there is output from the kube pod process
asyncProcess.waitFor(10, TimeUnit.SECONDS);
Expand Down