Skip to content

Commit a586537

Browse files
authored
Specify namespace when creating pod (#19399)
* Specify namespace when creating pof * PR comments * rm new line * Fix micronaut injection
1 parent 89b7740 commit a586537

File tree

7 files changed

+33
-12
lines changed

7 files changed

+33
-12
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ public AsyncKubePodStatus getDocStoreStatus() {
260260
public void create(final Map<String, String> allLabels,
261261
final ResourceRequirements resourceRequirements,
262262
final Map<String, String> fileMap,
263-
final Map<Integer, Integer> portMap) {
263+
final Map<Integer, Integer> portMap,
264+
final Map<String, String> nodeSelectors) {
264265
final List<Volume> volumes = new ArrayList<>();
265266
final List<VolumeMount> volumeMounts = new ArrayList<>();
266267
final List<EnvVar> envVars = new ArrayList<>();
@@ -352,6 +353,7 @@ public void create(final Map<String, String> allLabels,
352353
.withContainers(mainContainer)
353354
.withInitContainers(initContainer)
354355
.withVolumes(volumes)
356+
.withNodeSelector(nodeSelectors)
355357
.endSpec()
356358
.build();
357359

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/DbtLauncherWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public DbtLauncherWorker(final UUID connectionId,
4242
Void.class,
4343
activityContext,
4444
serverPort,
45-
temporalUtils);
45+
temporalUtils,
46+
workerConfigs);
4647
}
4748

4849
}

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.airbyte.persistence.job.models.JobRunConfig;
2222
import io.airbyte.workers.ContainerOrchestratorConfig;
2323
import io.airbyte.workers.Worker;
24+
import io.airbyte.workers.WorkerConfigs;
2425
import io.airbyte.workers.WorkerConstants;
2526
import io.airbyte.workers.exception.WorkerException;
2627
import io.airbyte.workers.process.AsyncKubePodStatus;
@@ -70,6 +71,7 @@ public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
7071
private final Supplier<ActivityExecutionContext> activityContext;
7172
private final Integer serverPort;
7273
private final TemporalUtils temporalUtils;
74+
private final WorkerConfigs workerConfigs;
7375

7476
private final AtomicBoolean cancelled = new AtomicBoolean(false);
7577
private AsyncOrchestratorPodProcess process;
@@ -84,7 +86,8 @@ public LauncherWorker(final UUID connectionId,
8486
final Class<OUTPUT> outputClass,
8587
final Supplier<ActivityExecutionContext> activityContext,
8688
final Integer serverPort,
87-
final TemporalUtils temporalUtils) {
89+
final TemporalUtils temporalUtils,
90+
final WorkerConfigs workerConfigs) {
8891

8992
this.connectionId = connectionId;
9093
this.application = application;
@@ -97,6 +100,7 @@ public LauncherWorker(final UUID connectionId,
97100
this.activityContext = activityContext;
98101
this.serverPort = serverPort;
99102
this.temporalUtils = temporalUtils;
103+
this.workerConfigs = workerConfigs;
100104
}
101105

102106
@Trace(operationName = WORKER_OPERATION_NAME)
@@ -174,7 +178,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
174178
allLabels,
175179
resourceRequirements,
176180
fileMap,
177-
portMap);
181+
portMap,
182+
workerConfigs.getworkerKubeNodeSelectors());
178183
} catch (final KubernetesClientException e) {
179184
ApmTraceUtils.addExceptionToTrace(e);
180185
throw new WorkerException(

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/NormalizationLauncherWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public NormalizationLauncherWorker(final UUID connectionId,
4343
NormalizationSummary.class,
4444
activityContext,
4545
serverPort,
46-
temporalUtils);
46+
temporalUtils,
47+
workerConfigs);
4748

4849
}
4950

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/ReplicationLauncherWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
1313
import io.airbyte.persistence.job.models.JobRunConfig;
1414
import io.airbyte.workers.ContainerOrchestratorConfig;
15+
import io.airbyte.workers.WorkerConfigs;
1516
import io.temporal.activity.ActivityExecutionContext;
1617
import java.util.Map;
1718
import java.util.UUID;
@@ -37,7 +38,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
3738
final ResourceRequirements resourceRequirements,
3839
final Supplier<ActivityExecutionContext> activityContext,
3940
final Integer serverPort,
40-
final TemporalUtils temporalUtils) {
41+
final TemporalUtils temporalUtils,
42+
final WorkerConfigs workerConfigs) {
4143
super(
4244
connectionId,
4345
REPLICATION,
@@ -51,7 +53,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
5153
ReplicationOutput.class,
5254
activityContext,
5355
serverPort,
54-
temporalUtils);
56+
temporalUtils,
57+
workerConfigs);
5558
}
5659

5760
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.airbyte.workers.ContainerOrchestratorConfig;
4747
import io.airbyte.workers.RecordSchemaValidator;
4848
import io.airbyte.workers.Worker;
49+
import io.airbyte.workers.WorkerConfigs;
4950
import io.airbyte.workers.WorkerConstants;
5051
import io.airbyte.workers.WorkerMetricReporter;
5152
import io.airbyte.workers.WorkerUtils;
@@ -97,6 +98,7 @@ public class ReplicationActivityImpl implements ReplicationActivity {
9798
private final AirbyteApiClient airbyteApiClient;
9899
private final AirbyteMessageSerDeProvider serDeProvider;
99100
private final AirbyteMessageVersionedMigratorFactory migratorFactory;
101+
private final WorkerConfigs workerConfigs;
100102

101103
public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
102104
@Named("replicationProcessFactory") final ProcessFactory processFactory,
@@ -111,7 +113,8 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
111113
final TemporalUtils temporalUtils,
112114
final AirbyteApiClient airbyteApiClient,
113115
final AirbyteMessageSerDeProvider serDeProvider,
114-
final AirbyteMessageVersionedMigratorFactory migratorFactory) {
116+
final AirbyteMessageVersionedMigratorFactory migratorFactory,
117+
@Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs) {
115118
this.containerOrchestratorConfig = containerOrchestratorConfig;
116119
this.processFactory = processFactory;
117120
this.secretsHydrator = secretsHydrator;
@@ -126,6 +129,7 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
126129
this.airbyteApiClient = airbyteApiClient;
127130
this.serDeProvider = serDeProvider;
128131
this.migratorFactory = migratorFactory;
132+
this.workerConfigs = workerConfigs;
129133
}
130134

131135
// Marking task queue as nullable because we changed activity signature; thus runs started before
@@ -166,7 +170,8 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
166170
destinationLauncherConfig,
167171
jobRunConfig,
168172
syncInput.getResourceRequirements(),
169-
() -> context);
173+
() -> context,
174+
workerConfigs);
170175
} else {
171176
workerFactory =
172177
getLegacyWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput);
@@ -295,7 +300,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
295300
final IntegrationLauncherConfig destinationLauncherConfig,
296301
final JobRunConfig jobRunConfig,
297302
final ResourceRequirements resourceRequirements,
298-
final Supplier<ActivityExecutionContext> activityContext)
303+
final Supplier<ActivityExecutionContext> activityContext,
304+
final WorkerConfigs workerConfigs)
299305
throws ApiException {
300306
final JobIdRequestBody id = new JobIdRequestBody();
301307
id.setId(Long.valueOf(jobRunConfig.getJobId()));
@@ -313,7 +319,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
313319
resourceRequirements,
314320
activityContext,
315321
serverPort,
316-
temporalUtils);
322+
temporalUtils,
323+
workerConfigs);
317324
}
318325

319326
}

airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Inte
128128
.filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
129129
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
130130

131+
final WorkerConfigs workerConfigs = new WorkerConfigs(new EnvConfigs());
132+
131133
asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(
132134
OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP,
133-
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap);
135+
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors());
134136

135137
// a final activity waits until there is output from the kube pod process
136138
asyncProcess.waitFor(10, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)