Skip to content

Commit e32215f

Browse files
implement PerfBackgroundJsonValidation feature-flag (#21569)
builds on add perf feature-flag; ensure compatability with LaunchDarkly (LDv5) users #21534 add support for checking the PerfBackgroundJsonValidation feature-flag within the RecordSchemaValidator class I spent a lot of time with trying to get the Kotlin beans injectable into the java code. What should just work doesn't and it well documented on the Micronaut side as to what exactly is required (or at least I never found the documentation). After many attempts I and looking at the auto-generated kotlin Micronaut projects I landed on kapt being necessary in kotlin projects. Even this isn't entirely true. If the Kotlin project is defined with the following gradle setup, the beans will not be visible to java projects (and possibly other projects as well): plugins { kotlin("jvm") version "1.8.0" } dependencies { annotationProcessor(platform(libs.micronaut.bom)) annotationProcessor(libs.bundles.micronaut.annotation.processor) } But they will be visible with the following setup: plugins { kotlin("jvm") version "1.8.0" id("io.micronaut.minimal.application") version "3.7.0" } dependencies { annotationProcessor(platform(libs.micronaut.bom)) annotationProcessor(libs.bundles.micronaut.annotation.processor) } micronaut { version("3.8.2") } and also visible with the following setup: plugins { kotlin("jvm") version "1.8.0" kotlin("kapt") version "1.8.0" } dependencies { kapt(platform(libs.micronaut.bom)) kapt(libs.bundles.micronaut.annotation.processor) } I went with the kapt solution. It's worth noting that kapt is deprecated and is being replaced with kps in Micronaut 4 (this is a good thing). --------- Co-authored-by: Davin Chia <[email protected]>
1 parent e9d8b7a commit e32215f

File tree

23 files changed

+228
-125
lines changed

23 files changed

+228
-125
lines changed

airbyte-commons-worker/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ dependencies {
1212
implementation 'io.fabric8:kubernetes-client:5.12.2'
1313
implementation 'com.auth0:java-jwt:3.19.2'
1414
implementation libs.guava
15-
implementation (libs.temporal.sdk) {
15+
implementation(libs.temporal.sdk) {
1616
exclude module: 'guava'
1717
}
1818
implementation 'org.apache.ant:ant:1.10.10'
@@ -24,6 +24,7 @@ dependencies {
2424
implementation project(':airbyte-commons-temporal')
2525
implementation project(':airbyte-config:config-models')
2626
implementation project(':airbyte-config:config-persistence')
27+
implementation project(':airbyte-featureflag')
2728
implementation project(':airbyte-json-validation')
2829
implementation project(':airbyte-metrics:metrics-lib')
2930
implementation project(':airbyte-persistence:job-persistence')

airbyte-commons-worker/src/main/java/io/airbyte/workers/RecordSchemaValidator.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,41 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import io.airbyte.featureflag.FeatureFlagClient;
10+
import io.airbyte.featureflag.PerfBackgroundJsonValidation;
11+
import io.airbyte.featureflag.Workspace;
912
import io.airbyte.protocol.models.AirbyteRecordMessage;
1013
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
1114
import io.airbyte.validation.json.JsonSchemaValidator;
1215
import io.airbyte.validation.json.JsonValidationException;
1316
import io.airbyte.workers.exception.RecordSchemaValidationException;
17+
import java.lang.invoke.MethodHandles;
1418
import java.util.HashSet;
1519
import java.util.List;
1620
import java.util.Map;
1721
import java.util.Set;
22+
import java.util.UUID;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
1825

1926
/**
2027
* Validates that AirbyteRecordMessage data conforms to the JSON schema defined by the source's
2128
* configured catalog
2229
*/
23-
2430
public class RecordSchemaValidator {
2531

26-
private static final JsonSchemaValidator validator = new JsonSchemaValidator();
32+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
2733

34+
private final FeatureFlagClient featureFlagClient;
35+
private final UUID workspaceId;
36+
private static final JsonSchemaValidator validator = new JsonSchemaValidator();
2837
private final Map<AirbyteStreamNameNamespacePair, JsonNode> streams;
2938

30-
public RecordSchemaValidator(final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
39+
public RecordSchemaValidator(final FeatureFlagClient featureFlagClient,
40+
final UUID workspaceId,
41+
final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
42+
this.featureFlagClient = featureFlagClient;
43+
this.workspaceId = workspaceId;
3144
// streams is Map of a stream source namespace + name mapped to the stream schema
3245
// for easy access when we check each record's schema
3346
this.streams = streamNamesToSchemas;
@@ -55,6 +68,16 @@ public void validateSchema(final AirbyteRecordMessage message, final AirbyteStre
5568
final JsonNode messageData = message.getData();
5669
final JsonNode matchingSchema = streams.get(messageStream);
5770

71+
if (workspaceId != null) {
72+
if (featureFlagClient.enabled(PerfBackgroundJsonValidation.INSTANCE, new Workspace(workspaceId))) {
73+
log.info("feature flag enabled for workspace {}", workspaceId);
74+
} else {
75+
log.info("feature flag disabled for workspace {}", workspaceId);
76+
}
77+
} else {
78+
log.info("workspace id is null");
79+
}
80+
5881
try {
5982
validator.ensureInitializedSchema(messageStream.toString(), messageData);
6083
} catch (final JsonValidationException e) {

airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public static ImmutablePair<StandardSync, StandardSyncInput> createSyncConfig(fi
134134
.withCatalog(standardSync.getCatalog())
135135
.withSourceConfiguration(sourceConnectionConfig.getConfiguration())
136136
.withState(state)
137-
.withOperationSequence(List.of(normalizationOperation, customDbtOperation));
137+
.withOperationSequence(List.of(normalizationOperation, customDbtOperation))
138+
.withWorkspaceId(workspaceId);
138139

139140
return new ImmutablePair<>(standardSync, syncInput);
140141
}

airbyte-commons-worker/src/test/java/io/airbyte/workers/RecordSchemaValidatorTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import io.airbyte.config.StandardSync;
1010
import io.airbyte.config.StandardSyncInput;
11+
import io.airbyte.featureflag.TestClient;
1112
import io.airbyte.protocol.models.AirbyteMessage;
1213
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
1314
import io.airbyte.workers.exception.RecordSchemaValidationException;
@@ -34,13 +35,17 @@ void setup() throws Exception {
3435

3536
@Test
3637
void testValidateValidSchema() throws Exception {
37-
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
38+
final var featureFlagClient = new TestClient();
39+
final var recordSchemaValidator = new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(),
40+
WorkerUtils.mapStreamNamesToSchemas(syncInput));
3841
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), AirbyteStreamNameNamespacePair.fromRecordMessage(VALID_RECORD.getRecord()));
3942
}
4043

4144
@Test
4245
void testValidateInvalidSchema() throws Exception {
43-
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
46+
final var featureFlagClient = new TestClient();
47+
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(),
48+
WorkerUtils.mapStreamNamesToSchemas(syncInput));
4449
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(),
4550
AirbyteStreamNameNamespacePair.fromRecordMessage(INVALID_RECORD.getRecord())));
4651
}

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.airbyte.config.WorkerSourceConfig;
4141
import io.airbyte.config.helpers.LogClientSingleton;
4242
import io.airbyte.config.helpers.LogConfigs;
43+
import io.airbyte.featureflag.TestClient;
4344
import io.airbyte.metrics.lib.MetricClient;
4445
import io.airbyte.metrics.lib.MetricClientFactory;
4546
import io.airbyte.protocol.models.AirbyteLogMessage.Level;
@@ -48,7 +49,9 @@
4849
import io.airbyte.protocol.models.AirbyteTraceMessage;
4950
import io.airbyte.protocol.models.Config;
5051
import io.airbyte.validation.json.JsonSchemaValidator;
51-
import io.airbyte.workers.*;
52+
import io.airbyte.workers.RecordSchemaValidator;
53+
import io.airbyte.workers.WorkerMetricReporter;
54+
import io.airbyte.workers.WorkerUtils;
5255
import io.airbyte.workers.exception.WorkerException;
5356
import io.airbyte.workers.helper.ConnectorConfigUpdater;
5457
import io.airbyte.workers.helper.FailureHelper;
@@ -456,8 +459,9 @@ void testOnlySelectedFieldsDeliveredToDestinationWithFieldSelectionEnabled() thr
456459
// Use a real schema validator to make sure validation doesn't affect this.
457460
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
458461
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
459-
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
460-
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
462+
recordSchemaValidator = new RecordSchemaValidator(new TestClient(), syncInput.getWorkspaceId(),
463+
Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
464+
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
461465
final ReplicationWorker worker = new DefaultReplicationWorker(
462466
JOB_ID,
463467
JOB_ATTEMPT,
@@ -487,8 +491,9 @@ void testAllFieldsDeliveredWithFieldSelectionDisabled() throws Exception {
487491
// Use a real schema validator to make sure validation doesn't affect this.
488492
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
489493
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
490-
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
491-
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
494+
recordSchemaValidator = new RecordSchemaValidator(new TestClient(), syncInput.getWorkspaceId(),
495+
Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
496+
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
492497
final ReplicationWorker worker = new DefaultReplicationWorker(
493498
JOB_ID,
494499
JOB_ATTEMPT,

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
1818
import io.airbyte.config.ReplicationOutput;
1919
import io.airbyte.config.StandardSyncInput;
20+
import io.airbyte.featureflag.TestClient;
2021
import io.airbyte.metrics.lib.NotImplementedMetricClient;
2122
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
2223
import io.airbyte.protocol.models.CatalogHelpers;
@@ -36,6 +37,7 @@
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.Optional;
40+
import java.util.UUID;
3941
import java.util.concurrent.atomic.AtomicReference;
4042
import lombok.extern.slf4j.Slf4j;
4143
import org.mockito.Mockito;
@@ -80,7 +82,8 @@ public void executeOneSync() throws InterruptedException {
8082
final var connectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);
8183
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
8284
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
83-
final var validator = new RecordSchemaValidator(Map.of(
85+
final var workspaceID = UUID.randomUUID();
86+
final var validator = new RecordSchemaValidator(new TestClient(), workspaceID, Map.of(
8487
new AirbyteStreamNameNamespacePair("s1", null),
8588
CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING))));
8689

airbyte-container-orchestrator/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies {
3434
implementation project(':airbyte-commons-temporal')
3535
implementation project(':airbyte-commons-worker')
3636
implementation project(':airbyte-config:init')
37+
implementation project(':airbyte-featureflag')
3738
implementation project(':airbyte-json-validation')
3839
implementation project(':airbyte-protocol:protocol-models')
3940
implementation project(':airbyte-metrics:metrics-lib')

airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.container_orchestrator.orchestrator.NoOpOrchestrator;
1818
import io.airbyte.container_orchestrator.orchestrator.NormalizationJobOrchestrator;
1919
import io.airbyte.container_orchestrator.orchestrator.ReplicationJobOrchestrator;
20+
import io.airbyte.featureflag.FeatureFlagClient;
2021
import io.airbyte.persistence.job.models.JobRunConfig;
2122
import io.airbyte.workers.WorkerConfigs;
2223
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
@@ -99,14 +100,16 @@ JobOrchestrator<?> jobOrchestrator(
99100
final EnvConfigs envConfigs,
100101
final ProcessFactory processFactory,
101102
final FeatureFlags featureFlags,
103+
final FeatureFlagClient featureFlagClient,
102104
final WorkerConfigs workerConfigs,
103105
final AirbyteMessageSerDeProvider serdeProvider,
104106
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
105107
final JobRunConfig jobRunConfig,
106108
final SourceApi sourceApi,
107109
final DestinationApi destinationApi) {
108110
return switch (application) {
109-
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(envConfigs, processFactory, featureFlags, serdeProvider,
111+
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(envConfigs, processFactory, featureFlags, featureFlagClient,
112+
serdeProvider,
110113
migratorFactory, jobRunConfig, sourceApi, destinationApi);
111114
case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(envConfigs, processFactory, jobRunConfig);
112115
case DbtLauncherWorker.DBT -> new DbtJobOrchestrator(envConfigs, workerConfigs, processFactory, jobRunConfig);

airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.airbyte.config.Configs;
2424
import io.airbyte.config.ReplicationOutput;
2525
import io.airbyte.config.StandardSyncInput;
26+
import io.airbyte.featureflag.FeatureFlagClient;
2627
import io.airbyte.metrics.lib.ApmTraceUtils;
2728
import io.airbyte.metrics.lib.MetricClientFactory;
2829
import io.airbyte.metrics.lib.MetricEmittingApps;
@@ -61,6 +62,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
6162
private final ProcessFactory processFactory;
6263
private final Configs configs;
6364
private final FeatureFlags featureFlags;
65+
private final FeatureFlagClient featureFlagClient;
6466
private final AirbyteMessageSerDeProvider serDeProvider;
6567
private final AirbyteProtocolVersionedMigratorFactory migratorFactory;
6668
private final JobRunConfig jobRunConfig;
@@ -70,6 +72,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
7072
public ReplicationJobOrchestrator(final Configs configs,
7173
final ProcessFactory processFactory,
7274
final FeatureFlags featureFlags,
75+
final FeatureFlagClient featureFlagClient,
7376
final AirbyteMessageSerDeProvider serDeProvider,
7477
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
7578
final JobRunConfig jobRunConfig,
@@ -78,6 +81,7 @@ public ReplicationJobOrchestrator(final Configs configs,
7881
this.configs = configs;
7982
this.processFactory = processFactory;
8083
this.featureFlags = featureFlags;
84+
this.featureFlagClient = featureFlagClient;
8185
this.serDeProvider = serDeProvider;
8286
this.migratorFactory = migratorFactory;
8387
this.jobRunConfig = jobRunConfig;
@@ -166,7 +170,7 @@ public Optional<String> runJob() throws Exception {
166170
Optional.of(syncInput.getCatalog())),
167171
migratorFactory.getProtocolSerializer(destinationLauncherConfig.getProtocolVersion())),
168172
new AirbyteMessageTracker(featureFlags),
169-
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
173+
new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)),
170174
metricReporter,
171175
new ConnectorConfigUpdater(sourceApi, destinationApi),
172176
FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));

airbyte-container-orchestrator/src/main/resources/application.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ airbyte:
77
acceptance:
88
test:
99
enabled: ${ACCEPTANCE_TEST_ENABLED:false}
10-
config-dir: /config
1110
control:
1211
plane:
1312
auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:}
@@ -16,6 +15,10 @@ airbyte:
1615
service-account:
1716
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
1817
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}
18+
feature-flag:
19+
client: ${FEATURE_FLAG_CLIENT:config}
20+
path: ${FEATURE_FLAG_PATH:/flags}
21+
api-key: ${LAUNCHDARKLY_KEY:}
1922
internal:
2023
api:
2124
auth-header:

airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactoryTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
1616
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
1717
import io.airbyte.config.EnvConfigs;
18+
import io.airbyte.featureflag.FeatureFlagClient;
19+
import io.airbyte.featureflag.TestClient;
1820
import io.airbyte.persistence.job.models.JobRunConfig;
1921
import io.airbyte.workers.WorkerConfigs;
2022
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
@@ -23,8 +25,11 @@
2325
import io.airbyte.workers.sync.DbtLauncherWorker;
2426
import io.airbyte.workers.sync.NormalizationLauncherWorker;
2527
import io.airbyte.workers.sync.ReplicationLauncherWorker;
28+
import io.micronaut.context.annotation.Bean;
29+
import io.micronaut.context.annotation.Replaces;
2630
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
2731
import jakarta.inject.Inject;
32+
import java.util.Map;
2833
import org.junit.jupiter.api.Test;
2934

3035
@MicronautTest
@@ -33,6 +38,10 @@ class ContainerOrchestratorFactoryTest {
3338
@Inject
3439
FeatureFlags featureFlags;
3540

41+
@Bean
42+
@Replaces(FeatureFlagClient.class)
43+
FeatureFlagClient featureFlagClient = new TestClient(Map.of());
44+
3645
@Inject
3746
EnvConfigs envConfigs;
3847

@@ -94,29 +103,29 @@ void jobOrchestrator() {
94103
final var factory = new ContainerOrchestratorFactory();
95104

96105
final var repl = factory.jobOrchestrator(
97-
ReplicationLauncherWorker.REPLICATION, envConfigs, processFactory, featureFlags, workerConfigs,
106+
ReplicationLauncherWorker.REPLICATION, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
98107
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
99108
assertEquals("Replication", repl.getOrchestratorName());
100109

101110
final var norm = factory.jobOrchestrator(
102-
NormalizationLauncherWorker.NORMALIZATION, envConfigs, processFactory, featureFlags, workerConfigs,
111+
NormalizationLauncherWorker.NORMALIZATION, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
103112
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
104113
assertEquals("Normalization", norm.getOrchestratorName());
105114

106115
final var dbt = factory.jobOrchestrator(
107-
DbtLauncherWorker.DBT, envConfigs, processFactory, featureFlags, workerConfigs,
116+
DbtLauncherWorker.DBT, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
108117
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
109118
assertEquals("DBT Transformation", dbt.getOrchestratorName());
110119

111120
final var noop = factory.jobOrchestrator(
112-
AsyncOrchestratorPodProcess.NO_OP, envConfigs, processFactory, featureFlags, workerConfigs,
121+
AsyncOrchestratorPodProcess.NO_OP, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
113122
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
114123
assertEquals("NO_OP", noop.getOrchestratorName());
115124

116125
var caught = false;
117126
try {
118127
factory.jobOrchestrator(
119-
"does not exist", envConfigs, processFactory, featureFlags, workerConfigs,
128+
"does not exist", envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
120129
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
121130
} catch (final Exception e) {
122131
caught = true;

airbyte-featureflag/build.gradle.kts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
44
plugins {
55
`java-library`
66
kotlin("jvm") version "1.8.0"
7+
kotlin("kapt") version "1.8.0"
78
}
89

910
dependencies {
10-
annotationProcessor(platform(libs.micronaut.bom))
11-
annotationProcessor(libs.bundles.micronaut.annotation.processor)
11+
kapt(platform(libs.micronaut.bom))
12+
kapt(libs.bundles.micronaut.annotation.processor)
1213

1314
implementation(platform(libs.micronaut.bom))
1415
implementation(libs.micronaut.inject)
@@ -17,9 +18,8 @@ dependencies {
1718
implementation(libs.jackson.dataformat)
1819
implementation(libs.jackson.kotlin)
1920

20-
testAnnotationProcessor(platform(libs.micronaut.bom))
21-
testAnnotationProcessor(libs.micronaut.inject)
22-
testAnnotationProcessor(libs.bundles.micronaut.test.annotation.processor)
21+
kaptTest(platform(libs.micronaut.bom))
22+
kaptTest(libs.bundles.micronaut.test.annotation.processor)
2323

2424
testImplementation(kotlin("test"))
2525
testImplementation(kotlin("test-junit5"))

0 commit comments

Comments
 (0)