Skip to content

Commit f574899

Browse files
authored
Fix unit test failures from env configs (#10998)
* Fix unit test failures from env configs * Default null to empty string * Format code * Fix one more unit test * Remove unused import
1 parent 5d4deb4 commit f574899

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

airbyte-commons/src/main/java/io/airbyte/commons/lang/Exceptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,12 @@ public interface Procedure {
7171

7272
}
7373

74+
public static <T> T swallowWithDefault(final Callable<T> procedure, final T defaultValue) {
75+
try {
76+
return procedure.call();
77+
} catch (Exception e) {
78+
return defaultValue;
79+
}
80+
}
81+
7482
}

airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.base.Preconditions;
88
import com.google.common.base.Splitter;
99
import com.google.common.base.Strings;
10+
import io.airbyte.commons.lang.Exceptions;
1011
import io.airbyte.commons.map.MoreMaps;
1112
import io.airbyte.commons.version.AirbyteVersion;
1213
import io.airbyte.config.helpers.LogClientSingleton;
@@ -651,9 +652,12 @@ public Map<String, String> getJobDefaultEnvMap() {
651652
final Map<String, String> jobPrefixedEnvMap = getAllEnvKeys.get().stream()
652653
.filter(key -> key.startsWith(JOB_DEFAULT_ENV_PREFIX))
653654
.collect(Collectors.toMap(key -> key.replace(JOB_DEFAULT_ENV_PREFIX, ""), getEnv));
655+
// This method assumes that these shared env variables are not critical to the execution
656+
// of the jobs, and only serve as metadata. So any exception is swallowed and default to
657+
// an empty string. Change this logic if this assumption no longer holds.
654658
final Map<String, String> jobSharedEnvMap = JOB_SHARED_ENVS.entrySet().stream().collect(Collectors.toMap(
655659
Entry::getKey,
656-
entry -> Objects.requireNonNullElse(entry.getValue().apply(this), "")));
660+
entry -> Exceptions.swallowWithDefault(() -> Objects.requireNonNullElse(entry.getValue().apply(this), ""), "")));
657661
return MoreMaps.merge(jobPrefixedEnvMap, jobSharedEnvMap);
658662
}
659663

airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.collect.ImmutableMap;
88
import com.google.common.collect.Lists;
99
import io.airbyte.config.EnvConfigs;
10+
import io.airbyte.config.WorkerEnvConstants;
1011
import io.airbyte.workers.WorkerConfigs;
1112
import io.airbyte.workers.WorkerException;
1213
import java.nio.file.Path;
@@ -31,6 +32,10 @@ class AirbyteIntegrationLauncherTest {
3132
"config", "{}",
3233
"catalog", "{}",
3334
"state", "{}");
35+
private static final Map<String, String> JOB_METADATA = Map.of(
36+
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
37+
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
38+
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT));
3439

3540
private WorkerConfigs workerConfigs;
3641
private ProcessFactory processFactory;
@@ -48,7 +53,7 @@ void spec() throws WorkerException {
4853
launcher.spec(JOB_ROOT);
4954

5055
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null,
51-
workerConfigs.getResourceRequirements(), Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB), Map.of(), Map.of(),
56+
workerConfigs.getResourceRequirements(), Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB), JOB_METADATA, Map.of(),
5257
"spec");
5358
}
5459

@@ -59,7 +64,7 @@ void check() throws WorkerException {
5964
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
6065
workerConfigs.getResourceRequirements(),
6166
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
62-
Map.of(),
67+
JOB_METADATA,
6368
Map.of(),
6469
"check",
6570
"--config", "config");
@@ -72,7 +77,7 @@ void discover() throws WorkerException {
7277
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
7378
workerConfigs.getResourceRequirements(),
7479
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
75-
Map.of(),
80+
JOB_METADATA,
7681
Map.of(),
7782
"discover",
7883
"--config", "config");
@@ -85,7 +90,7 @@ void read() throws WorkerException {
8590
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null,
8691
workerConfigs.getResourceRequirements(),
8792
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
88-
Map.of(),
93+
JOB_METADATA,
8994
Map.of(),
9095
Lists.newArrayList(
9196
"read",
@@ -101,7 +106,7 @@ void write() throws WorkerException {
101106
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null,
102107
workerConfigs.getResourceRequirements(),
103108
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
104-
Map.of(),
109+
JOB_METADATA,
105110
Map.of(),
106111
"write",
107112
"--config", "config",

0 commit comments

Comments
 (0)