Skip to content

Commit 3369237

Browse files
authored
🎉 Add labels to Kube jobs. Allow injection of Kube Image Pull Secrets. (#6368)
As title. The labeling is intended to help one differentiate between a pod created for check, spec, discover, sync and normalise. The main impetus for this change is Cloud cost tracking, though this can help with operational debugging too. The injection of Kube image pull secret enables pulling jobs from private docker repositories that have docker authentication.
1 parent 802a818 commit 3369237

File tree

15 files changed

+80
-20
lines changed

15 files changed

+80
-20
lines changed

‎airbyte-config/models/src/main/java/io/airbyte/config/Configs.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public interface Configs {
8383

8484
String getSubmitterNumThreads();
8585

86+
String getJobsImagePullSecret();
87+
8688
// Resources
8789
String getCpuRequest();
8890

‎airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class EnvConfigs implements Configs {
6767
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
6868
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
6969
private static final String RESOURCE_MEMORY_LIMIT = "RESOURCE_MEMORY_LIMIT";
70+
private static final String JOBS_IMAGE_PULL_SECRET = "JOBS_IMAGE_PULL_SECRET";
7071

7172
// defaults
7273
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
@@ -382,6 +383,16 @@ public String getMemoryLimit() {
382383
return getEnvOrDefault(RESOURCE_MEMORY_LIMIT, DEFAULT_RESOURCE_REQUIREMENT_MEMORY);
383384
}
384385

386+
/**
387+
* Returns the name of the secret to be used when pulling down docker images for jobs. Automatically
388+
* injected in the KubePodProcess class and used in the job pod templates. The empty string is a
389+
* no-op value.
390+
*/
391+
@Override
392+
public String getJobsImagePullSecret() {
393+
return getEnvOrDefault(JOBS_IMAGE_PULL_SECRET, "");
394+
}
395+
385396
@Override
386397
public String getS3LogBucket() {
387398
return getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, "");

‎airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.airbyte.config.OperatorDbt;
1313
import io.airbyte.config.ResourceRequirements;
1414
import io.airbyte.workers.normalization.NormalizationRunner;
15+
import io.airbyte.workers.process.KubeProcessFactory;
1516
import io.airbyte.workers.process.ProcessFactory;
1617
import java.nio.file.Path;
1718
import java.util.ArrayList;
@@ -72,7 +73,9 @@ public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode confi
7273
}
7374
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
7475
process =
75-
processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", resourceRequirements, dbtArguments);
76+
processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", resourceRequirements,
77+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
78+
dbtArguments);
7679

7780
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
7881
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

‎airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class WorkerUtils {
3131
public static final List<WorkerPodToleration> DEFAULT_WORKER_POD_TOLERATIONS = initWorkerPodTolerations();
3232
public static final Map<String, String> DEFAULT_WORKER_POD_NODE_SELECTORS = initWorkerPodNodeSelectors();
3333
public static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = initResourceRequirements();
34+
public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = new EnvConfigs().getJobsImagePullSecret();
3435

3536
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class);
3637

‎airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.airbyte.workers.WorkerConstants;
1717
import io.airbyte.workers.WorkerException;
1818
import io.airbyte.workers.WorkerUtils;
19+
import io.airbyte.workers.process.KubeProcessFactory;
1920
import io.airbyte.workers.process.ProcessFactory;
2021
import java.nio.file.Path;
2122
import java.util.Map;
@@ -102,7 +103,8 @@ private boolean runProcess(final String jobId,
102103
final String... args)
103104
throws Exception {
104105
try {
105-
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, resourceRequirements, args);
106+
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, resourceRequirements,
107+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args);
106108

107109
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
108110
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

‎airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
5858
Collections.emptyMap(),
5959
null,
6060
resourceRequirement,
61+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB),
6162
"spec");
6263
}
6364

@@ -72,6 +73,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
7273
ImmutableMap.of(configFilename, configContents),
7374
null,
7475
resourceRequirement,
76+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
7577
"check",
7678
"--config", configFilename);
7779
}
@@ -87,6 +89,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
8789
ImmutableMap.of(configFilename, configContents),
8890
null,
8991
resourceRequirement,
92+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
9093
"discover",
9194
"--config", configFilename);
9295
}
@@ -126,6 +129,7 @@ public Process read(final Path jobRoot,
126129
files,
127130
null,
128131
resourceRequirement,
132+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
129133
arguments);
130134
}
131135

@@ -149,6 +153,7 @@ public Process write(final Path jobRoot,
149153
files,
150154
null,
151155
resourceRequirement,
156+
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
152157
"write",
153158
"--config", configFilename,
154159
"--catalog", catalogFilename);

‎airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public Process create(String jobId,
6969
final Map<String, String> files,
7070
final String entrypoint,
7171
final ResourceRequirements resourceRequirements,
72+
final Map<String, String> labels,
7273
final String... args)
7374
throws WorkerException {
7475
try {

‎airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.fabric8.kubernetes.api.model.Container;
1212
import io.fabric8.kubernetes.api.model.ContainerBuilder;
1313
import io.fabric8.kubernetes.api.model.DeletionPropagation;
14+
import io.fabric8.kubernetes.api.model.LocalObjectReference;
1415
import io.fabric8.kubernetes.api.model.Pod;
1516
import io.fabric8.kubernetes.api.model.PodBuilder;
1617
import io.fabric8.kubernetes.api.model.Quantity;
@@ -179,6 +180,7 @@ private static Container getMain(String image,
179180
.withCommand("sh", "-c", mainCommand)
180181
.withWorkingDir(CONFIG_DIR)
181182
.withVolumeMounts(mainVolumeMounts);
183+
182184
final ResourceRequirementsBuilder resourceRequirementsBuilder = getResourceRequirementsBuilder(resourceRequirements);
183185
if (resourceRequirementsBuilder != null) {
184186
containerBuilder.withResources(resourceRequirementsBuilder.build());
@@ -251,6 +253,7 @@ public KubePodProcess(String processRunnerHost,
251253
final Map<String, String> files,
252254
final String entrypointOverride,
253255
ResourceRequirements resourceRequirements,
256+
String imagePullSecret,
254257
List<WorkerPodToleration> tolerations,
255258
Map<String, String> nodeSelectors,
256259
Map<String, String> labels,
@@ -359,6 +362,7 @@ public KubePodProcess(String processRunnerHost,
359362
.endMetadata()
360363
.withNewSpec()
361364
.withTolerations(buildPodTolerations(tolerations))
365+
.withImagePullSecrets(new LocalObjectReference(imagePullSecret)) // An empty string turns this into a no-op setting.
362366
.withNodeSelector(nodeSelectors.isEmpty() ? null : nodeSelectors)
363367
.withRestartPolicy("Never")
364368
.withInitContainers(init)

‎airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kubernetes.client.openapi.ApiClient;
1414
import java.net.InetAddress;
1515
import java.nio.file.Path;
16+
import java.util.HashMap;
1617
import java.util.Map;
1718
import java.util.regex.Matcher;
1819
import java.util.regex.Pattern;
@@ -24,6 +25,19 @@ public class KubeProcessFactory implements ProcessFactory {
2425

2526
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);
2627

28+
public static final String JOB_TYPE = "job_type";
29+
public static final String SYNC_JOB = "sync";
30+
public static final String SPEC_JOB = "spec";
31+
public static final String CHECK_JOB = "check";
32+
public static final String DISCOVER_JOB = "discover";
33+
public static final String NORMALIZATION_JOB = "normalize";
34+
35+
public static final String SYNC_STEP = "sync_step";
36+
public static final String READ_STEP = "read";
37+
public static final String WRITE_STEP = "write";
38+
public static final String NORMALISE_STEP = "normalise";
39+
public static final String CUSTOM_STEP = "custom";
40+
2741
private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");;
2842
private static final String JOB_LABEL_KEY = "job_id";
2943
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
@@ -77,11 +91,11 @@ public Process create(String jobId,
7791
final Map<String, String> files,
7892
final String entrypoint,
7993
final ResourceRequirements resourceRequirements,
94+
final Map<String, String> customLabels,
8095
final String... args)
8196
throws WorkerException {
8297
try {
8398
// used to differentiate source and destination processes with the same id and attempt
84-
8599
final String podName = createPodName(imageName, jobId, attempt);
86100

87101
final int stdoutLocalPort = KubePortManagerSingleton.take();
@@ -90,6 +104,13 @@ public Process create(String jobId,
90104
final int stderrLocalPort = KubePortManagerSingleton.take();
91105
LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort);
92106

107+
var allLabels = new HashMap<>(customLabels);
108+
var generalKubeLabels = Map.of(
109+
JOB_LABEL_KEY, jobId,
110+
ATTEMPT_LABEL_KEY, String.valueOf(attempt),
111+
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
112+
allLabels.putAll(generalKubeLabels);
113+
93114
return new KubePodProcess(
94115
processRunnerHost,
95116
officialClient,
@@ -104,11 +125,10 @@ public Process create(String jobId,
104125
files,
105126
entrypoint,
106127
resourceRequirements,
128+
WorkerUtils.DEFAULT_JOBS_IMAGE_PULL_SECRET,
107129
WorkerUtils.DEFAULT_WORKER_POD_TOLERATIONS,
108130
WorkerUtils.DEFAULT_WORKER_POD_NODE_SELECTORS,
109-
Map.of(JOB_LABEL_KEY, jobId,
110-
ATTEMPT_LABEL_KEY, String.valueOf(attempt),
111-
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE),
131+
allLabels,
112132
args);
113133
} catch (Exception e) {
114134
throw new WorkerException(e.getMessage(), e);

‎airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ public interface ProcessFactory {
1717
*
1818
* @param jobId job Id
1919
* @param attempt attempt Id
20-
* @param jobPath Workspace directory to run the process from
21-
* @param imageName Docker image name to start the process from
22-
* @param files file name to contents map that will be written into the working dir of the process
23-
* prior to execution
20+
* @param jobPath Workspace directory to run the process from.
21+
* @param imageName Docker image name to start the process from.
22+
* @param files File name to contents map that will be written into the working dir of the process
23+
* prior to execution.
2424
* @param entrypoint If not null, the default entrypoint program of the docker image can be changed
25-
* by this argument
26-
* @param args arguments to pass to the docker image being run in the new process
27-
* @return the ProcessBuilder object to run the process
25+
* by this argument.
26+
* @param resourceRequirements CPU and RAM to assign to the created process.
27+
* @param labels Labels to assign to the created Kube pod, if any. Ignore for docker.
28+
* @param args Arguments to pass to the docker image being run in the new process.
29+
* @return ProcessBuilder object to run the process.
2830
* @throws WorkerException
2931
*/
3032
Process create(String jobId,
@@ -35,6 +37,7 @@ Process create(String jobId,
3537
final Map<String, String> files,
3638
final String entrypoint,
3739
final ResourceRequirements resourceRequirements,
40+
final Map<String, String> labels,
3841
final String... args)
3942
throws WorkerException;
4043

@@ -46,9 +49,10 @@ default Process create(String jobId,
4649
final Map<String, String> files,
4750
final String entrypoint,
4851
final ResourceRequirements resourceRequirements,
52+
final Map<String, String> labels,
4953
final List<String> args)
5054
throws WorkerException {
51-
return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, resourceRequirements, args.toArray(new String[0]));
55+
return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, resourceRequirements, labels, args.toArray(new String[0]));
5256
}
5357

5458
}

0 commit comments

Comments
 (0)