Skip to content

Commit ddcdcbb

Browse files
gosusnpedgao
andauthored
Add normalization strict incremental feature flag (#22514)
* maybe add feature flag? * pattern matching * also add version check * formatting * refactor test also * extract test + fix method call * minor tweaks * add context to log message * put workspace id in normalization input * use non-semver tag * add flag for version of normalization * also flag old version * add test * missed part of the commit * format * add test for null workspace ID * Revert "also flag old version" This reverts commit 3be601d. * Revert "missed part of the commit" This reverts commit 47a67b4. * always apply flag, even if we're behind a version * derp * Add more logging to the normalization activity * Update charts and kustomize for the feature flag * Format --------- Co-authored-by: Edward Gao <[email protected]>
1 parent d76e18b commit ddcdcbb

File tree

17 files changed

+167
-22
lines changed

17 files changed

+167
-22
lines changed

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public class OrchestratorConstants {
7373
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA,
7474
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION,
7575
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES,
76+
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
77+
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG,
7678
FEATURE_FLAG_CLIENT,
7779
FEATURE_FLAG_PATH,
7880
EnvConfigs.LAUNCHDARKLY_KEY,

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ public Process write(final Path jobRoot,
223223

224224
private Map<String, String> getWorkerMetadata() {
225225
final Configs configs = new EnvConfigs();
226+
// We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert
227+
// back to hashmap
226228
return Maps.newHashMap(
227229
ImmutableMap.<String, String>builder()
228230
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName)
@@ -232,6 +234,8 @@ private Map<String, String> getWorkerMetadata() {
232234
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()))
233235
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()))
234236
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces())
237+
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces())
238+
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag())
235239
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit())
236240
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest())
237241
.put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey())

airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class AirbyteIntegrationLauncherTest {
6666
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()))
6767
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()))
6868
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces())
69+
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces())
70+
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag())
6971
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit())
7072
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest())
7173
.put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey())

airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
2222

2323
public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";
2424

25+
public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
26+
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";
27+
2528
@Override
2629
public boolean autoDisablesFailingConnections() {
2730
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
@@ -64,6 +67,16 @@ public String fieldSelectionWorkspaces() {
6467
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
6568
}
6669

70+
@Override
71+
public String strictComparisonNormalizationWorkspaces() {
72+
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg);
73+
}
74+
75+
@Override
76+
public String strictComparisonNormalizationTag() {
77+
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg);
78+
}
79+
6780
// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
6881
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
6982
final String value = System.getenv(key);

airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,43 @@
44

55
package io.airbyte.commons.features;
66

7+
import com.google.common.annotations.VisibleForTesting;
78
import java.util.HashSet;
89
import java.util.Set;
910
import java.util.UUID;
11+
import java.util.function.Function;
1012
import lombok.extern.slf4j.Slf4j;
1113

1214
@Slf4j
1315
public class FeatureFlagHelper {
1416

1517
public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
16-
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
18+
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection")
19+
|| featureFlags.applyFieldSelection();
20+
}
21+
22+
public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
23+
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId,
24+
"strict comparison in normalization");
25+
}
26+
27+
@VisibleForTesting
28+
static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags,
29+
final Function<FeatureFlags, String> flagRetriever,
30+
final UUID workspaceId,
31+
final String context) {
32+
final String workspaceIdsString = flagRetriever.apply(featureFlags);
1733
final Set<UUID> workspaceIds = new HashSet<>();
1834
if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) {
1935
for (final String id : workspaceIdsString.split(",")) {
2036
try {
2137
workspaceIds.add(UUID.fromString(id));
2238
} catch (final IllegalArgumentException e) {
23-
log.warn("Malformed workspace id for field selection: {}", id);
39+
log.warn("Malformed workspace id for {}: {}", context, id);
2440
}
2541
}
2642
}
27-
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
28-
return true;
29-
}
30-
31-
return featureFlags.applyFieldSelection();
43+
return workspaceId != null && workspaceIds.contains(workspaceId);
3244
}
3345

3446
}

airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,18 @@ public interface FeatureFlags {
3737
*/
3838
String fieldSelectionWorkspaces();
3939

40+
/**
41+
* Get the workspaces allow-listed for strict incremental comparison in normalization. This takes
42+
* precedence over the normalization version in destination_definitions.yaml.
43+
*
44+
* @return a comma-separated list of workspace ids where strict incremental comparison should be
45+
* enabled in normalization.
46+
*/
47+
String strictComparisonNormalizationWorkspaces();
48+
49+
/**
50+
* @return The Docker image tag representing the normalization version with strict-comparison
51+
*/
52+
String strictComparisonNormalizationTag();
53+
4054
}

airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,40 +26,47 @@ void beforeEach() {
2626
void isFieldSelectionEnabledForWorkspaceWithEmptyString() {
2727
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");
2828

29-
assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
29+
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
30+
}
31+
32+
@Test
33+
void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() {
34+
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");
35+
36+
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null));
3037
}
3138

3239
@Test
3340
void isFieldSelectionEnabledForWorkspaceWithSpaceString() {
3441
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" ");
3542

36-
assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
43+
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
3744
}
3845

3946
@Test
4047
void isFieldSelectionEnabledForWorkspaceWithNullString() {
4148
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null);
4249

43-
assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
50+
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
4451
}
4552

4653
@Test
4754
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() {
4855
final UUID workspaceId = UUID.randomUUID();
4956
final UUID randomId = UUID.randomUUID();
50-
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString());
57+
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId);
5158

52-
assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
59+
assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
5360
}
5461

5562
@Test
5663
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() {
5764
final UUID workspaceId = UUID.randomUUID();
5865
final UUID randomId1 = UUID.randomUUID();
5966
final UUID randomId2 = UUID.randomUUID();
60-
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString());
67+
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2);
6168

62-
assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
69+
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
6370
}
6471

6572
}

airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ public interface Configs {
758758

759759
String getFieldSelectionWorkspaces();
760760

761+
String getStrictComparisonNormalizationWorkspaces();
762+
763+
String getStrictComparisonNormalizationTag();
764+
761765
enum TrackingStrategy {
762766
SEGMENT,
763767
LOGGING

airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ public class EnvConfigs implements Configs {
224224
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
225225
private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";
226226

227+
private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
228+
private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";
229+
227230
public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
228231
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
229232
AIRBYTE_ROLE, EnvConfigs::getAirbyteRole,
@@ -1152,6 +1155,16 @@ public String getFieldSelectionWorkspaces() {
11521155
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "");
11531156
}
11541157

1158+
@Override
1159+
public String getStrictComparisonNormalizationWorkspaces() {
1160+
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "");
1161+
}
1162+
1163+
@Override
1164+
public String getStrictComparisonNormalizationTag() {
1165+
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison");
1166+
}
1167+
11551168
@Override
11561169
public int getActivityNumberOfAttempt() {
11571170
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));

airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ properties:
2121
type: object
2222
description: optional resource requirements to run sync workers
2323
existingJavaType: io.airbyte.config.ResourceRequirements
24+
workspaceId:
25+
description: The id of the workspace associated with this sync
26+
type: string
27+
format: uuid

airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
9696
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
9797
environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection()));
9898
environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
99+
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
100+
featureFlags.strictComparisonNormalizationWorkspaces());
101+
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag());
99102
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);
100103
environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint);
101104
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import datadog.trace.api.Trace;
1414
import io.airbyte.api.client.AirbyteApiClient;
1515
import io.airbyte.api.client.model.generated.JobIdRequestBody;
16+
import io.airbyte.commons.features.FeatureFlagHelper;
17+
import io.airbyte.commons.features.FeatureFlags;
1618
import io.airbyte.commons.functional.CheckedSupplier;
1719
import io.airbyte.commons.json.Jsons;
1820
import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper;
@@ -50,8 +52,10 @@
5052
import java.util.Optional;
5153
import java.util.UUID;
5254
import java.util.function.Supplier;
55+
import lombok.extern.slf4j.Slf4j;
5356

5457
@Singleton
58+
@Slf4j
5559
public class NormalizationActivityImpl implements NormalizationActivity {
5660

5761
private final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig;
@@ -62,13 +66,17 @@ public class NormalizationActivityImpl implements NormalizationActivity {
6266
private final WorkerEnvironment workerEnvironment;
6367
private final LogConfigs logConfigs;
6468
private final String airbyteVersion;
69+
private final FeatureFlags featureFlags;
6570
private final Integer serverPort;
6671
private final AirbyteConfigValidator airbyteConfigValidator;
6772
private final TemporalUtils temporalUtils;
6873
private final ResourceRequirements normalizationResourceRequirements;
6974
private final AirbyteApiClient airbyteApiClient;
7075

71-
private final static Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0");
76+
// This constant is not currently in use. We'll need to bump it when we try releasing v1 again.
77+
private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0");
78+
private static final String V1_NORMALIZATION_MINOR_VERSION = "3";
79+
private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25";
7280

7381
public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
7482
@Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs,
@@ -78,6 +86,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
7886
final WorkerEnvironment workerEnvironment,
7987
final LogConfigs logConfigs,
8088
@Value("${airbyte.version}") final String airbyteVersion,
89+
final FeatureFlags featureFlags,
8190
@Value("${micronaut.server.port}") final Integer serverPort,
8291
final AirbyteConfigValidator airbyteConfigValidator,
8392
final TemporalUtils temporalUtils,
@@ -91,6 +100,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
91100
this.workerEnvironment = workerEnvironment;
92101
this.logConfigs = logConfigs;
93102
this.airbyteVersion = airbyteVersion;
103+
this.featureFlags = featureFlags;
94104
this.serverPort = serverPort;
95105
this.airbyteConfigValidator = airbyteConfigValidator;
96106
this.temporalUtils = temporalUtils;
@@ -111,11 +121,17 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
111121
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
112122
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
113123

124+
if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) {
125+
log.info("Using strict comparison normalization");
126+
replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag());
127+
}
128+
114129
// Check the version of normalization
115130
// We require at least version 0.3.0 to support data types v1. Using an older version would lead to
116131
// all columns being typed as JSONB. If normalization is using an older version, fallback to using
117132
// v0 data types.
118133
if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) {
134+
log.info("Using protocol v0");
119135
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog());
120136
} else {
121137

@@ -124,6 +140,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
124140
// phase v0 out.
125141
// Performance impact should be low considering the nature of the check compared to the time to run
126142
// normalization.
143+
log.info("Using protocol v1");
127144
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog());
128145
}
129146

@@ -134,6 +151,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
134151

135152
final CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> workerFactory;
136153

154+
log.info("Using normalization: " + destinationLauncherConfig.getNormalizationDockerImage());
137155
if (containerOrchestratorConfig.isPresent()) {
138156
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig,
139157
() -> context);
@@ -162,22 +180,33 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn
162180
return new NormalizationInput()
163181
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
164182
.withCatalog(syncOutput.getOutputCatalog())
165-
.withResourceRequirements(normalizationResourceRequirements);
183+
.withResourceRequirements(normalizationResourceRequirements)
184+
.withWorkspaceId(syncInput.getWorkspaceId());
166185
}
167186

168187
@VisibleForTesting
169188
static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) {
170189
try {
171-
final String[] normalizationImage = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2);
172-
final Version normalizationVersion = new Version(normalizationImage[1]);
173-
return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1);
190+
final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig));
191+
return V1_NORMALIZATION_MINOR_VERSION.equals(normalizationVersion.getMinorVersion());
174192
} catch (final IllegalArgumentException e) {
175193
// IllegalArgument here means that the version isn't in a semver format.
176194
// The current behavior is to assume it supports v0 data types for dev purposes.
177195
return false;
178196
}
179197
}
180198

199+
private static String getNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig) {
200+
return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1];
201+
}
202+
203+
@VisibleForTesting
204+
static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) {
205+
final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2);
206+
imageComponents[1] = newTag;
207+
destinationLauncherConfig.setNormalizationDockerImage(String.join(":", imageComponents));
208+
}
209+
181210
private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> getLegacyWorkerFactory(
182211
final IntegrationLauncherConfig destinationLauncherConfig,
183212
final JobRunConfig jobRunConfig) {

0 commit comments

Comments
 (0)