Skip to content

Commit b7f1ed0

Browse files
committed
revert: "revert: "feat: add internal api to support field hashing"" (#13865)
1 parent 88de307 commit b7f1ed0

File tree

32 files changed

+756
-145
lines changed

32 files changed

+756
-145
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -10685,6 +10685,9 @@ components:
1068510685
selectedFields:
1068610686
description: This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
1068710687
$ref: "#/components/schemas/SelectedFields"
10688+
hashedFields:
10689+
description: Fields that should be hashed before being written to the destination.
10690+
$ref: "#/components/schemas/SelectedFields"
1068810691
minimumGenerationId:
1068910692
type: integer
1069010693
format: int64

airbyte-commons-converters/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
implementation(project(":oss:airbyte-commons"))
1717
implementation(project(":oss:airbyte-config:config-models"))
1818
implementation(project(":oss:airbyte-json-validation"))
19+
implementation(project(":oss:airbyte-mappers"))
1920
implementation(libs.airbyte.protocol)
2021
implementation(libs.guava)
2122
implementation(libs.slf4j.api)

airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java

+45-12
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
1010
import io.airbyte.api.client.model.generated.DestinationSyncMode;
11+
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1112
import io.airbyte.api.client.model.generated.SyncMode;
1213
import io.airbyte.commons.enums.Enums;
1314
import io.airbyte.commons.text.Names;
1415
import io.airbyte.config.ConfiguredAirbyteStream;
16+
import io.airbyte.config.ConfiguredMapper;
17+
import io.airbyte.config.helpers.FieldGenerator;
18+
import io.airbyte.mappers.helpers.MapperHelperKt;
1519
import io.airbyte.validation.json.JsonValidationException;
20+
import jakarta.annotation.Nullable;
1621
import java.util.Collections;
1722
import java.util.List;
1823
import java.util.Optional;
@@ -27,6 +32,9 @@
2732
*/
2833
public class CatalogClientConverters {
2934

35+
// TODO(pedro): This should be refactored to use dependency injection.
36+
private static final FieldGenerator fieldGenerator = new FieldGenerator();
37+
3038
/**
3139
* Convert to api model to airbyte protocol model.
3240
*
@@ -56,12 +64,14 @@ public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(final
5664
* @return the protocol catalog
5765
*/
5866
@SuppressWarnings("checkstyle:LineLength") // the auto-formatter produces a format that conflicts with checkstyle
59-
public static io.airbyte.config.ConfiguredAirbyteCatalog toConfiguredAirbyteInternal(final io.airbyte.api.client.model.generated.AirbyteCatalog catalog) {
67+
public static io.airbyte.config.ConfiguredAirbyteCatalog toConfiguredAirbyteInternal(
68+
final io.airbyte.api.client.model.generated.AirbyteCatalog catalog,
69+
final boolean enableMappers) {
6070
final io.airbyte.config.ConfiguredAirbyteCatalog protoCatalog =
6171
new io.airbyte.config.ConfiguredAirbyteCatalog();
6272
final var airbyteStream = catalog.getStreams().stream().map(stream -> {
6373
try {
64-
return toConfiguredStreamInternal(stream.getStream(), stream.getConfig());
74+
return toConfiguredStreamInternal(stream.getStream(), stream.getConfig(), enableMappers);
6575
} catch (final JsonValidationException e) {
6676
return null;
6777
}
@@ -187,18 +197,40 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
187197
.withIsResumable(stream.isResumable());
188198
}
189199

200+
private static List<ConfiguredMapper> toConfiguredHashingMappers(final @Nullable List<SelectedFieldInfo> hashedFields) {
201+
if (hashedFields == null) {
202+
return Collections.emptyList();
203+
}
204+
205+
// FIXME(pedro): See https://github.com/airbytehq/airbyte-internal-issues/issues/9718
206+
// We shouldn't have to rebuild these here, and can potentially lead to losing configuration that's
207+
// actually stored in the db.
208+
return hashedFields.stream().map(f -> MapperHelperKt.createHashingMapper(f.getFieldPath().getFirst()) // We don't support nested fields for now.
209+
).toList();
210+
}
211+
190212
private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
191-
final AirbyteStreamConfiguration config)
213+
final AirbyteStreamConfiguration config,
214+
final boolean enableMappers)
192215
throws JsonValidationException {
193-
return new ConfiguredAirbyteStream(
194-
toStreamInternal(stream, config),
195-
Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class),
196-
Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
197-
.withPrimaryKey(config.getPrimaryKey())
198-
.withCursorField(config.getCursorField())
199-
.withGenerationId(config.getGenerationId())
200-
.withMinimumGenerationId(config.getMinimumGenerationId())
201-
.withSyncId(config.getSyncId());
216+
final var convertedStream = toStreamInternal(stream, config);
217+
final ConfiguredAirbyteStream.Builder builder = new ConfiguredAirbyteStream.Builder()
218+
.stream(convertedStream)
219+
.syncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class))
220+
.destinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
221+
.primaryKey(config.getPrimaryKey())
222+
.cursorField(config.getCursorField())
223+
.generationId(config.getGenerationId())
224+
.minimumGenerationId(config.getMinimumGenerationId())
225+
.syncId(config.getSyncId());
226+
227+
if (enableMappers) {
228+
builder
229+
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
230+
.mappers(toConfiguredHashingMappers(config.getHashedFields()));
231+
}
232+
233+
return builder.build();
202234
}
203235

204236
/**
@@ -230,6 +262,7 @@ private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration
230262
null,
231263
null,
232264
null,
265+
null,
233266
null);
234267
}
235268

airbyte-commons-converters/src/test/java/io/airbyte/commons/converters/CatalogClientConvertersTest.java

+52
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,17 @@
55
package io.airbyte.commons.converters;
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.mockito.Mockito.reset;
9+
import static org.mockito.Mockito.spy;
10+
import static org.mockito.Mockito.verifyNoInteractions;
811

912
import com.google.common.collect.Lists;
1013
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
14+
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1115
import io.airbyte.commons.text.Names;
16+
import io.airbyte.config.ConfiguredAirbyteCatalog;
17+
import io.airbyte.config.helpers.FieldGenerator;
18+
import io.airbyte.mappers.helpers.MapperHelperKt;
1219
import io.airbyte.protocol.models.AirbyteCatalog;
1320
import io.airbyte.protocol.models.AirbyteStream;
1421
import io.airbyte.protocol.models.CatalogHelpers;
@@ -19,9 +26,12 @@
1926
import java.util.Collections;
2027
import java.util.List;
2128
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.params.ParameterizedTest;
30+
import org.junit.jupiter.params.provider.ValueSource;
2231

2332
class CatalogClientConvertersTest {
2433

34+
private static final FieldGenerator fieldGenerator = spy(new FieldGenerator());
2535
public static final String ID_FIELD_NAME = "id";
2636
private static final String STREAM_NAME = "users-data";
2737
private static final AirbyteStream STREAM = new AirbyteStream()
@@ -57,6 +67,7 @@ class CatalogClientConvertersTest {
5767
null,
5868
null,
5969
null,
70+
null,
6071
null);
6172

6273
private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
@@ -81,6 +92,46 @@ void testConvertToProtocol() {
8192
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
8293
}
8394

95+
@ParameterizedTest
96+
@ValueSource(booleans = {true, false})
97+
void testConvertInternalWithMapping(final boolean enableMappers) {
98+
reset(fieldGenerator);
99+
final var streamConfig = new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration(
100+
io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
101+
io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND,
102+
List.of(ID_FIELD_NAME),
103+
List.of(),
104+
Names.toAlphanumericAndUnderscore(STREAM_NAME),
105+
true,
106+
null,
107+
null,
108+
null,
109+
List.of(new SelectedFieldInfo(List.of(ID_FIELD_NAME))),
110+
null,
111+
null,
112+
null);
113+
final io.airbyte.api.client.model.generated.AirbyteCatalog clientCatalog =
114+
new io.airbyte.api.client.model.generated.AirbyteCatalog(
115+
List.of(
116+
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration(
117+
CLIENT_STREAM,
118+
streamConfig)));
119+
120+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogClientConverters.toConfiguredAirbyteInternal(clientCatalog, enableMappers);
121+
final var stream = configuredCatalog.getStreams().getFirst();
122+
assertEquals(STREAM_NAME, stream.getStream().getName());
123+
if (enableMappers) {
124+
assertEquals(1, stream.getFields().size());
125+
assertEquals(1, stream.getMappers().size());
126+
assertEquals(fieldGenerator.getFieldsFromSchema(stream.getStream().getJsonSchema()), stream.getFields());
127+
assertEquals(MapperHelperKt.createHashingMapper(ID_FIELD_NAME), stream.getMappers().getFirst());
128+
} else {
129+
assertEquals(null, stream.getFields());
130+
assertEquals(0, stream.getMappers().size());
131+
verifyNoInteractions(fieldGenerator);
132+
}
133+
}
134+
84135
@Test
85136
void testIsResumableImport() {
86137
final List<Boolean> boolValues = new ArrayList<>(List.of(Boolean.TRUE, Boolean.FALSE));
@@ -120,6 +171,7 @@ void testIsResumableExport() {
120171
null,
121172
null,
122173
null,
174+
null,
123175
null);
124176
final var streamAndConf = new AirbyteStreamAndConfiguration(stream, conf);
125177
final List<AirbyteStreamAndConfiguration> streams = List.of(streamAndConf);

airbyte-commons-server/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ dependencies {
6161
implementation(project(":oss:airbyte-config:specs"))
6262
implementation(project(":oss:airbyte-data"))
6363
implementation(project(":oss:airbyte-featureflag"))
64+
implementation(project(":oss:airbyte-mappers"))
6465
implementation(project(":oss:airbyte-metrics:metrics-lib"))
6566
implementation(project(":oss:airbyte-db:db-lib"))
6667
implementation(project(":oss:airbyte-json-validation"))

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

+28-10
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,12 @@
120120
import io.airbyte.data.services.shared.ConnectionAutoUpdatedReason;
121121
import io.airbyte.data.services.shared.ConnectionEvent;
122122
import io.airbyte.featureflag.CheckWithCatalog;
123+
import io.airbyte.featureflag.Connection;
124+
import io.airbyte.featureflag.EnableMappers;
123125
import io.airbyte.featureflag.FeatureFlagClient;
124126
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
125127
import io.airbyte.featureflag.Workspace;
128+
import io.airbyte.mappers.helpers.MapperHelperKt;
126129
import io.airbyte.metrics.lib.ApmTraceUtils;
127130
import io.airbyte.metrics.lib.MetricAttribute;
128131
import io.airbyte.metrics.lib.MetricClientFactory;
@@ -263,7 +266,11 @@ private void applyPatchToStandardSync(final StandardSync sync, final ConnectionU
263266
validateCatalogDoesntContainDuplicateStreamNames(patch.getSyncCatalog());
264267
validateCatalogSize(patch.getSyncCatalog(), workspaceId, "update");
265268

266-
sync.setCatalog(CatalogConverter.toConfiguredInternal(patch.getSyncCatalog()));
269+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(sync.getConnectionId()));
270+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(patch.getSyncCatalog(), shouldEnableMappers);
271+
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
272+
273+
sync.setCatalog(configuredCatalog);
267274
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
268275
}
269276

@@ -552,7 +559,11 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
552559
validateCatalogDoesntContainDuplicateStreamNames(connectionCreate.getSyncCatalog());
553560
validateCatalogSize(connectionCreate.getSyncCatalog(), workspaceId, "create");
554561

555-
standardSync.withCatalog(CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog()));
562+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionId));
563+
final ConfiguredAirbyteCatalog configuredCatalog =
564+
CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog(), shouldEnableMappers);
565+
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
566+
standardSync.withCatalog(configuredCatalog);
556567
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
557568
} else {
558569
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
@@ -687,7 +698,7 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
687698
return metadata;
688699
}
689700

690-
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, String updateReason, Boolean autoUpdate)
701+
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, final String updateReason, final Boolean autoUpdate)
691702
throws ConfigNotFoundException, IOException, JsonValidationException {
692703

693704
final UUID connectionId = connectionPatch.getConnectionId();
@@ -830,11 +841,16 @@ public ConnectionRead getConnectionForJob(final UUID connectionId, final Long jo
830841
return buildConnectionRead(connectionId, jobId);
831842
}
832843

833-
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog, final ConfiguredAirbyteCatalog configuredCatalog)
844+
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog,
845+
final AirbyteCatalog newCatalog,
846+
final ConfiguredAirbyteCatalog configuredCatalog,
847+
final UUID connectionId)
834848
throws JsonValidationException {
849+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionId));
850+
835851
return new CatalogDiff().transforms(CatalogDiffHelpers.getCatalogDiff(
836-
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
837-
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)), configuredCatalog)
852+
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog, shouldEnableMappers)),
853+
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog, shouldEnableMappers)), configuredCatalog)
838854
.stream()
839855
.map(CatalogDiffConverters::streamTransformToApi)
840856
.toList());
@@ -843,12 +859,12 @@ public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog
843859
public CatalogDiff getDiff(final ConnectionRead connectionRead, final AirbyteCatalog discoveredCatalog)
844860
throws JsonValidationException, ConfigNotFoundException, IOException {
845861

862+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionRead.getConnectionId()));
846863
final var catalogWithSelectedFieldsAnnotated = connectionRead.getSyncCatalog();
847-
final var configuredCatalog = CatalogConverter.toConfiguredInternal(catalogWithSelectedFieldsAnnotated);
848-
864+
final var configuredCatalog = CatalogConverter.toConfiguredInternal(catalogWithSelectedFieldsAnnotated, shouldEnableMappers);
849865
final var rawCatalog = getConnectionAirbyteCatalog(connectionRead.getConnectionId());
850866

851-
return getDiff(rawCatalog.orElse(catalogWithSelectedFieldsAnnotated), discoveredCatalog, configuredCatalog);
867+
return getDiff(rawCatalog.orElse(catalogWithSelectedFieldsAnnotated), discoveredCatalog, configuredCatalog, connectionRead.getConnectionId());
852868
}
853869

854870
/**
@@ -1279,10 +1295,12 @@ public ConnectionAutoPropagateResult applySchemaChange(
12791295
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog =
12801296
getConnectionAirbyteCatalog(connectionId);
12811297
final io.airbyte.api.model.generated.AirbyteCatalog currentCatalog = connection.getSyncCatalog();
1298+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionId));
12821299
final CatalogDiff diffToApply = getDiff(
12831300
catalogUsedToMakeConfiguredCatalog.orElse(currentCatalog),
12841301
catalog,
1285-
CatalogConverter.toConfiguredInternal(currentCatalog));
1302+
CatalogConverter.toConfiguredInternal(currentCatalog, shouldEnableMappers),
1303+
connectionId);
12861304
final ConnectionUpdate updateObject =
12871305
new ConnectionUpdate().connectionId(connection.getConnectionId());
12881306
final UUID destinationDefinitionId =

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@
8787
import io.airbyte.data.services.SecretPersistenceConfigService;
8888
import io.airbyte.data.services.WorkspaceService;
8989
import io.airbyte.data.services.shared.ConnectionAutoUpdatedReason;
90+
import io.airbyte.featureflag.Connection;
9091
import io.airbyte.featureflag.DiscoverPostprocessInTemporal;
92+
import io.airbyte.featureflag.EnableMappers;
9193
import io.airbyte.featureflag.FeatureFlagClient;
9294
import io.airbyte.featureflag.Organization;
9395
import io.airbyte.featureflag.UseRuntimeSecretPersistence;
@@ -512,10 +514,11 @@ public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAut
512514
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
513515
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog syncCatalog =
514516
connectionRead.getSyncCatalog();
517+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionRead.getConnectionId()));
515518
final CatalogDiff diff =
516519
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(syncCatalog),
517520
sourceAutoPropagateChange.getCatalog(),
518-
CatalogConverter.toConfiguredInternal(syncCatalog));
521+
CatalogConverter.toConfiguredInternal(syncCatalog, shouldEnableMappers), connectionRead.getConnectionId());
519522

520523
final ConnectionUpdate updateObject =
521524
new ConnectionUpdate().connectionId(connectionRead.getConnectionId());
@@ -698,9 +701,10 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
698701
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
699702
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
700703
connectionRead.getSyncCatalog();
704+
final boolean shouldEnableMappers = featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(connectionRead.getConnectionId()));
701705
final CatalogDiff diff =
702706
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
703-
CatalogConverter.toConfiguredInternal(currentAirbyteCatalog));
707+
CatalogConverter.toConfiguredInternal(currentAirbyteCatalog, shouldEnableMappers), connectionRead.getConnectionId());
704708
final boolean containsBreakingChange = AutoPropagateSchemaChangeHelper.containsBreakingChange(diff);
705709
final ConnectionRead updatedConnection =
706710
connectionsHandler.updateSchemaChangesAndAutoDisableConnectionIfNeeded(connectionRead, containsBreakingChange, diff);

0 commit comments

Comments
 (0)