Skip to content

Commit c4b1a9a

Browse files
committed
fix: handle invalid mapper configs (#13825)
1 parent 03cce50 commit c4b1a9a

File tree

13 files changed

+234
-48
lines changed

13 files changed

+234
-48
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.airbyte.featureflag.SourceType;
2828
import io.airbyte.featureflag.Workspace;
2929
import io.airbyte.mappers.application.RecordMapper;
30+
import io.airbyte.mappers.transformations.DestinationCatalogGenerator;
3031
import io.airbyte.metrics.lib.MetricClient;
3132
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
3233
import io.airbyte.persistence.job.models.JobRunConfig;
@@ -90,6 +91,7 @@ public class ReplicationWorkerFactory {
9091
private final StreamStatusTrackerFactory streamStatusTrackerFactory;
9192
private final Clock clock;
9293
private final RecordMapper recordMapper;
94+
private final DestinationCatalogGenerator destinationCatalogGenerator;
9395

9496
public ReplicationWorkerFactory(
9597
final AirbyteIntegrationLauncherFactory airbyteIntegrationLauncherFactory,
@@ -104,7 +106,8 @@ public ReplicationWorkerFactory(
104106
final StreamStatusCompletionTracker streamStatusCompletionTracker,
105107
final StreamStatusTrackerFactory streamStatusTrackerFactory,
106108
final Clock clock,
107-
final RecordMapper recordMapper) {
109+
final RecordMapper recordMapper,
110+
final DestinationCatalogGenerator destinationCatalogGenerator) {
108111
this.airbyteIntegrationLauncherFactory = airbyteIntegrationLauncherFactory;
109112
this.airbyteApiClient = airbyteApiClient;
110113
this.syncPersistenceFactory = syncPersistenceFactory;
@@ -119,6 +122,7 @@ public ReplicationWorkerFactory(
119122
this.streamStatusTrackerFactory = streamStatusTrackerFactory;
120123
this.clock = clock;
121124
this.recordMapper = recordMapper;
125+
this.destinationCatalogGenerator = destinationCatalogGenerator;
122126
}
123127

124128
/**
@@ -168,7 +172,8 @@ public ReplicationWorker create(final ReplicationInput replicationInput,
168172
syncPersistence, recordSchemaValidator, fieldSelector, heartbeatTimeoutChaperone,
169173
featureFlagClient, jobRunConfig, replicationInput, replicationAirbyteMessageEventPublishingHelper,
170174
onReplicationRunning, destinationTimeout, workloadApiClient, workloadEnabled, analyticsMessageTracker,
171-
workloadId, airbyteApiClient, streamStatusCompletionTracker, streamStatusTrackerFactory, metricClient, recordMapper);
175+
workloadId, airbyteApiClient, streamStatusCompletionTracker, streamStatusTrackerFactory, metricClient, recordMapper,
176+
destinationCatalogGenerator);
172177
}
173178

174179
/**
@@ -316,7 +321,8 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou
316321
final StreamStatusCompletionTracker streamStatusCompletionTracker,
317322
final StreamStatusTrackerFactory streamStatusTrackerFactory,
318323
final MetricClient metricClient,
319-
final RecordMapper recordMapper) {
324+
final RecordMapper recordMapper,
325+
final DestinationCatalogGenerator destinationCatalogGenerator) {
320326
final Context flagContext = getFeatureFlagContext(replicationInput);
321327

322328
final int bufferSize = featureFlagClient.intVariation(ReplicationBufferOverride.INSTANCE, flagContext);
@@ -352,7 +358,8 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou
352358
metricClient,
353359
replicationInput,
354360
recordMapper,
355-
featureFlagClient);
361+
featureFlagClient,
362+
destinationCatalogGenerator);
356363
}
357364

358365
private static Context getFeatureFlagContext(final ReplicationInput replicationInput) {
@@ -402,12 +409,13 @@ private static ReplicationWorker buildReplicationWorkerInstance(final String job
402409
final MetricClient metricClient,
403410
final ReplicationInput replicationInput,
404411
final RecordMapper recordMapper,
405-
final FeatureFlagClient featureFlagClient) {
412+
final FeatureFlagClient featureFlagClient,
413+
final DestinationCatalogGenerator destinationCatalogGenerator) {
406414
final ReplicationWorkerHelper replicationWorkerHelper =
407415
new ReplicationWorkerHelper(fieldSelector, mapper, messageTracker, syncPersistence,
408416
messageEventPublishingHelper, new ThreadedTimeTracker(), onReplicationRunning, workloadApiClient,
409417
workloadEnabled, analyticsMessageTracker, workloadId, airbyteApiClient, streamStatusCompletionTracker,
410-
streamStatusTrackerFactory, recordMapper, featureFlagClient);
418+
streamStatusTrackerFactory, recordMapper, featureFlagClient, destinationCatalogGenerator);
411419

412420
return new BufferedReplicationWorker(jobId, attempt, source, destination, syncPersistence, recordSchemaValidator,
413421
srcHeartbeatTimeoutChaperone, replicationFeatureFlagReader, replicationWorkerHelper, destinationTimeout, streamStatusCompletionTracker,

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java

+33-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import io.airbyte.featureflag.EnableMappers;
2828
import io.airbyte.featureflag.FeatureFlagClient;
2929
import io.airbyte.mappers.transformations.DestinationCatalogGenerator;
30+
import io.airbyte.metrics.lib.MetricAttribute;
3031
import io.airbyte.metrics.lib.MetricClient;
32+
import io.airbyte.metrics.lib.MetricTags;
33+
import io.airbyte.metrics.lib.OssMetricsRegistry;
3134
import io.airbyte.protocol.models.AirbyteMessage;
3235
import io.airbyte.protocol.models.AirbyteMessage.Type;
3336
import io.airbyte.workers.WorkerUtils;
@@ -78,6 +81,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
7881
private final DestinationTimeoutMonitor destinationTimeoutMonitor;
7982
private final DestinationCatalogGenerator destinationCatalogGenerator;
8083
private final FeatureFlagClient featureFlagClient;
84+
private final MetricClient metricClient;
8185

8286
@VisibleForTesting
8387
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
@@ -116,6 +120,7 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
116120
this.messageMetricsTracker = new MessageMetricsTracker(metricClient);
117121
this.destinationCatalogGenerator = destinationCatalogGenerator;
118122
this.featureFlagClient = featureFlagClient;
123+
this.metricClient = metricClient;
119124
}
120125

121126
@Override
@@ -131,9 +136,34 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo
131136
if (usesMapper) {
132137
LOGGER.info("Applying mapper transformation to destination catalog.");
133138
}
134-
final ConfiguredAirbyteCatalog catalogToSendToDestination =
135-
usesMapper ? destinationCatalogGenerator.generateDestinationCatalog(destinationConfig.getCatalog())
136-
: destinationConfig.getCatalog();
139+
final ConfiguredAirbyteCatalog catalogToSendToDestination;
140+
141+
if (usesMapper) {
142+
DestinationCatalogGenerator.CatalogGenerationResult transformedCatalog =
143+
destinationCatalogGenerator.generateDestinationCatalog(destinationConfig.getCatalog(),
144+
destinationConfig.getConnectionId());
145+
146+
transformedCatalog.getErrors().entrySet().stream().forEach(error -> {
147+
error.getValue().values().forEach(errorType -> {
148+
switch (errorType) {
149+
case DestinationCatalogGenerator.MapperError.MISSING_MAPPER:
150+
metricClient.count(OssMetricsRegistry.MISSING_MAPPER, 1, new MetricAttribute(MetricTags.CONNECTION_ID,
151+
destinationConfig.getConnectionId().toString()));
152+
break;
153+
case DestinationCatalogGenerator.MapperError.INVALID_MAPPER_CONFIG:
154+
metricClient.count(OssMetricsRegistry.INVALID_MAPPER_CONFIG, 1, new MetricAttribute(MetricTags.CONNECTION_ID,
155+
destinationConfig.getConnectionId().toString()));
156+
break;
157+
default:
158+
break;
159+
}
160+
});
161+
});
162+
163+
catalogToSendToDestination = transformedCatalog.getCatalog();
164+
} else {
165+
catalogToSendToDestination = destinationConfig.getCatalog();
166+
}
137167

138168
LOGGER.info("Running destination...");
139169
destinationProcess = integrationLauncher.write(

airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/ReplicationWorkerHelper.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import io.airbyte.featureflag.Connection
3333
import io.airbyte.featureflag.EnableMappers
3434
import io.airbyte.featureflag.FeatureFlagClient
3535
import io.airbyte.mappers.application.RecordMapper
36+
import io.airbyte.mappers.transformations.DestinationCatalogGenerator
3637
import io.airbyte.metrics.lib.ApmTraceUtils
3738
import io.airbyte.metrics.lib.MetricAttribute
3839
import io.airbyte.metrics.lib.MetricClient
@@ -107,6 +108,7 @@ class ReplicationWorkerHelper(
107108
private val streamStatusTrackerFactory: StreamStatusTrackerFactory,
108109
private val recordMapper: RecordMapper,
109110
private val featureFlagClient: FeatureFlagClient,
111+
private val destinationCatalogGenerator: DestinationCatalogGenerator,
110112
) {
111113
private val metricClient = MetricClientFactory.getMetricClient()
112114
private val metricAttrs: MutableList<MetricAttribute> = mutableListOf()
@@ -240,8 +242,10 @@ class ReplicationWorkerHelper(
240242
metricClient.count(OssMetricsRegistry.SYNC_WITH_EMPTY_CATALOG, 1, *metricAttrs.toTypedArray())
241243
}
242244

245+
val catalogWithoutInvalidMappers = destinationCatalogGenerator.generateDestinationCatalog(configuredAirbyteCatalog, ctx.connectionId)
246+
243247
mappersPerStreamDescriptor =
244-
configuredAirbyteCatalog.streams.map { stream ->
248+
catalogWithoutInvalidMappers.catalog.streams.map { stream ->
245249
stream.streamDescriptor to stream.mappers
246250
}.toMap()
247251

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/BufferedReplicationWorkerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ ReplicationWorker getDefaultReplicationWorker(final boolean fieldSelectionEnable
2929
replicationWorkerHelper = spy(new ReplicationWorkerHelper(fieldSelector, mapper, messageTracker, syncPersistence,
3030
replicationAirbyteMessageEventPublishingHelper, new ThreadedTimeTracker(), onReplicationRunning, workloadApiClient, false,
3131
analyticsMessageTracker, Optional.empty(), airbyteApiClient, streamStatusCompletionTracker, streamStatusTrackerFactory,
32-
recordMapper, featureFlagClient));
32+
recordMapper, featureFlagClient, destinationCatalogGenerator));
3333
return new BufferedReplicationWorker(
3434
JOB_ID,
3535
JOB_ATTEMPT,

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java

+27-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.airbyte.featureflag.FeatureFlagClient;
3939
import io.airbyte.featureflag.TestClient;
4040
import io.airbyte.mappers.application.RecordMapper;
41+
import io.airbyte.mappers.transformations.DestinationCatalogGenerator;
4142
import io.airbyte.persistence.job.models.ReplicationInput;
4243
import io.airbyte.protocol.models.AirbyteAnalyticsTraceMessage;
4344
import io.airbyte.protocol.models.AirbyteLogMessage;
@@ -93,6 +94,7 @@ class ReplicationWorkerHelperTest {
9394
private ReplicationAirbyteMessageEventPublishingHelper replicationAirbyteMessageEventPublishingHelper;
9495
private RecordMapper recordMapper;
9596
private FeatureFlagClient featureFlagClient;
97+
private DestinationCatalogGenerator destinationCatalogGenerator;
9698

9799
private final ReplicationContext replicationContext = new ReplicationContext(true, UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), 0L,
98100
1, UUID.randomUUID(), SOURCE_IMAGE, DESTINATION_IMAGE, UUID.randomUUID(), UUID.randomUUID());
@@ -120,6 +122,7 @@ void setUp() {
120122
recordMapper = mock(RecordMapper.class);
121123
featureFlagClient = mock(TestClient.class);
122124
when(featureFlagClient.boolVariation(eq(EnableMappers.INSTANCE), any())).thenReturn(false);
125+
destinationCatalogGenerator = mock(DestinationCatalogGenerator.class);
123126
replicationWorkerHelper = spy(new ReplicationWorkerHelper(
124127
mock(FieldSelector.class),
125128
mapper,
@@ -136,7 +139,8 @@ void setUp() {
136139
streamStatusCompletionTracker,
137140
streamStatusTrackerFactory,
138141
recordMapper,
139-
featureFlagClient));
142+
featureFlagClient,
143+
destinationCatalogGenerator));
140144
}
141145

142146
@AfterEach
@@ -150,6 +154,8 @@ void testGetReplicationOutput(final boolean supportRefreshes) throws IOException
150154
mockSupportRefreshes(supportRefreshes);
151155
// Need to pass in a replication context
152156
final ConfiguredAirbyteCatalog catalog = buildConfiguredAirbyteCatalog();
157+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
158+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
153159
replicationWorkerHelper.initialize(
154160
replicationContext,
155161
mock(ReplicationFeatureFlags.class),
@@ -177,11 +183,14 @@ void testGetReplicationOutput(final boolean supportRefreshes) throws IOException
177183
void testAnalyticsMessageHandling() throws IOException {
178184
mockSupportRefreshes(false);
179185
// Need to pass in a replication context
186+
final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
187+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
188+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
180189
replicationWorkerHelper.initialize(
181190
replicationContext,
182191
mock(ReplicationFeatureFlags.class),
183192
mock(Path.class),
184-
mock(ConfiguredAirbyteCatalog.class),
193+
catalog,
185194
mock(State.class));
186195
// Need to have a configured catalog for getReplicationOutput
187196
replicationWorkerHelper.startDestination(
@@ -242,12 +251,15 @@ void testMessageMapIsRevertedBeforeProcessing() {
242251
@Test
243252
void callsStreamStatusTrackerOnSourceMessage() throws IOException {
244253
mockSupportRefreshes(true);
254+
final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
255+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
256+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
245257

246258
replicationWorkerHelper.initialize(
247259
replicationContext,
248260
mock(ReplicationFeatureFlags.class),
249261
mock(Path.class),
250-
mock(ConfiguredAirbyteCatalog.class),
262+
catalog,
251263
mock(State.class));
252264

253265
final AirbyteMessage message = mock(AirbyteMessage.class);
@@ -260,12 +272,14 @@ void callsStreamStatusTrackerOnSourceMessage() throws IOException {
260272
@Test
261273
void callsStreamStatusTrackerOnDestinationMessage() throws IOException {
262274
mockSupportRefreshes(true);
263-
275+
final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
276+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
277+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
264278
replicationWorkerHelper.initialize(
265279
replicationContext,
266280
mock(ReplicationFeatureFlags.class),
267281
mock(Path.class),
268-
mock(ConfiguredAirbyteCatalog.class),
282+
catalog,
269283
mock(State.class));
270284

271285
final AirbyteMessage message = mock(AirbyteMessage.class);
@@ -282,6 +296,8 @@ void testSupportRefreshesIsPassed(final boolean supportRefreshes) throws Excepti
282296
mockSupportRefreshes(supportRefreshes);
283297
// Need to pass in a replication context
284298
final ConfiguredAirbyteCatalog catalog = buildConfiguredAirbyteCatalog();
299+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
300+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
285301
replicationWorkerHelper.initialize(
286302
replicationContext,
287303
mock(ReplicationFeatureFlags.class),
@@ -303,12 +319,15 @@ void testSupportRefreshesIsPassed(final boolean supportRefreshes) throws Excepti
303319
@ValueSource(booleans = {true, false})
304320
void testApplyTransformationFlagDisableOrNoMapper(final boolean mappersEnabled) throws IOException {
305321
mockSupportRefreshes(false);
322+
ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
323+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
324+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
306325
// Need to pass in a replication context
307326
replicationWorkerHelper.initialize(
308327
replicationContext,
309328
mock(ReplicationFeatureFlags.class),
310329
mock(Path.class),
311-
mock(ConfiguredAirbyteCatalog.class),
330+
catalog,
312331
mock(State.class));
313332

314333
final AirbyteMessage recordMessage = new AirbyteMessage().withType(Type.RECORD)
@@ -335,7 +354,8 @@ void testApplyTransformationMapper() throws IOException {
335354
when(stream.getMappers()).thenReturn(mappers);
336355
when(catalog.getStreams()).thenReturn(List.of(stream));
337356
when(featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(replicationContext.getConnectionId()))).thenReturn(true);
338-
357+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
358+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(catalog, Map.of()));
339359
// Need to pass in a replication context
340360
replicationWorkerHelper.initialize(
341361
replicationContext,

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import io.airbyte.featureflag.FeatureFlagClient;
6666
import io.airbyte.featureflag.TestClient;
6767
import io.airbyte.mappers.application.RecordMapper;
68+
import io.airbyte.mappers.transformations.DestinationCatalogGenerator;
6869
import io.airbyte.metrics.lib.MetricAttribute;
6970
import io.airbyte.metrics.lib.MetricClient;
7071
import io.airbyte.metrics.lib.MetricTags;
@@ -203,6 +204,7 @@ abstract class ReplicationWorkerTest {
203204
protected StreamStatusTrackerFactory streamStatusTrackerFactory;
204205
protected RecordMapper recordMapper;
205206
protected FeatureFlagClient featureFlagClient;
207+
protected DestinationCatalogGenerator destinationCatalogGenerator;
206208

207209
ReplicationWorker getDefaultReplicationWorker() {
208210
return getDefaultReplicationWorker(false);
@@ -308,6 +310,9 @@ void setup() throws Exception {
308310
recordMapper = mock(RecordMapper.class);
309311
featureFlagClient = mock(TestClient.class);
310312
when(featureFlagClient.boolVariation(eq(EnableMappers.INSTANCE), any())).thenReturn(false);
313+
destinationCatalogGenerator = mock(DestinationCatalogGenerator.class);
314+
when(destinationCatalogGenerator.generateDestinationCatalog(any(), any()))
315+
.thenReturn(new DestinationCatalogGenerator.CatalogGenerationResult(destinationConfig.getCatalog(), Map.of()));
311316
}
312317

313318
@AfterEach

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/performance/ReplicationWorkerPerformanceTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.airbyte.featureflag.FeatureFlagClient;
2828
import io.airbyte.featureflag.TestClient;
2929
import io.airbyte.mappers.application.RecordMapper;
30+
import io.airbyte.mappers.transformations.DestinationCatalogGenerator;
3031
import io.airbyte.metrics.lib.MetricClient;
3132
import io.airbyte.metrics.lib.NotImplementedMetricClient;
3233
import io.airbyte.persistence.job.models.ReplicationInput;
@@ -203,7 +204,7 @@ public void executeOneSync() throws InterruptedException {
203204
new ReplicationWorkerHelper(fieldSelector, dstNamespaceMapper, messageTracker, syncPersistence,
204205
replicationAirbyteMessageEventPublishingHelper, new ThreadedTimeTracker(), () -> {}, workloadApiClient, false, analyticsMessageTracker,
205206
Optional.empty(), airbyteApiClient, mock(StreamStatusCompletionTracker.class), streamStatusTrackerFactory,
206-
recordMapper, featureFlagClient);
207+
recordMapper, featureFlagClient, mock(DestinationCatalogGenerator.class));
207208
final StreamStatusCompletionTracker streamStatusCompletionTracker = mock(StreamStatusCompletionTracker.class);
208209

209210
final var worker = getReplicationWorker("1", 0,

0 commit comments

Comments
 (0)