Skip to content

Commit bf799dd

Browse files
committed
feat: add internal api to support field hashing (#13798)
1 parent aff5a15 commit bf799dd

File tree

28 files changed

+617
-83
lines changed

28 files changed

+617
-83
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -10538,6 +10538,9 @@ components:
1053810538
selectedFields:
1053910539
description: This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
1054010540
$ref: "#/components/schemas/SelectedFields"
10541+
hashedFields:
10542+
description: Fields that should be hashed before being written to the destination.
10543+
$ref: "#/components/schemas/SelectedFields"
1054110544
minimumGenerationId:
1054210545
type: integer
1054310546
format: int64

airbyte-commons-converters/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
implementation(project(":oss:airbyte-commons"))
1717
implementation(project(":oss:airbyte-config:config-models"))
1818
implementation(project(":oss:airbyte-json-validation"))
19+
implementation(project(":oss:airbyte-mappers"))
1920
implementation(libs.airbyte.protocol)
2021
implementation(libs.guava)
2122
implementation(libs.slf4j.api)

airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
1010
import io.airbyte.api.client.model.generated.DestinationSyncMode;
11+
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1112
import io.airbyte.api.client.model.generated.SyncMode;
1213
import io.airbyte.commons.enums.Enums;
1314
import io.airbyte.commons.text.Names;
1415
import io.airbyte.config.ConfiguredAirbyteStream;
16+
import io.airbyte.config.ConfiguredMapper;
17+
import io.airbyte.config.helpers.FieldGenerator;
18+
import io.airbyte.mappers.helpers.MapperHelperKt;
1519
import io.airbyte.validation.json.JsonValidationException;
20+
import jakarta.annotation.Nullable;
1621
import java.util.Collections;
1722
import java.util.List;
1823
import java.util.Optional;
@@ -27,6 +32,9 @@
2732
*/
2833
public class CatalogClientConverters {
2934

35+
// TODO(pedro): This should be refactored to use dependency injection.
36+
private static final FieldGenerator fieldGenerator = new FieldGenerator();
37+
3038
/**
3139
* Convert to api model to airbyte protocol model.
3240
*
@@ -187,18 +195,34 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
187195
.withIsResumable(stream.isResumable());
188196
}
189197

198+
private static List<ConfiguredMapper> toConfiguredHashingMappers(final @Nullable List<SelectedFieldInfo> hashedFields) {
199+
if (hashedFields == null) {
200+
return Collections.emptyList();
201+
}
202+
203+
// FIXME(pedro): See https://github.com/airbytehq/airbyte-internal-issues/issues/9718
204+
// We shouldn't have to rebuild these here, and can potentially lead to losing configuration that's
205+
// actually stored in the db.
206+
return hashedFields.stream().map(f -> MapperHelperKt.createHashingMapper(f.getFieldPath().getFirst()) // We don't support nested fields for now.
207+
).toList();
208+
}
209+
190210
private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
191211
final AirbyteStreamConfiguration config)
192212
throws JsonValidationException {
193-
return new ConfiguredAirbyteStream(
194-
toStreamInternal(stream, config),
195-
Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class),
196-
Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
197-
.withPrimaryKey(config.getPrimaryKey())
198-
.withCursorField(config.getCursorField())
199-
.withGenerationId(config.getGenerationId())
200-
.withMinimumGenerationId(config.getMinimumGenerationId())
201-
.withSyncId(config.getSyncId());
213+
final var convertedStream = toStreamInternal(stream, config);
214+
return new ConfiguredAirbyteStream.Builder()
215+
.stream(convertedStream)
216+
.syncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class))
217+
.destinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
218+
.primaryKey(config.getPrimaryKey())
219+
.cursorField(config.getCursorField())
220+
.generationId(config.getGenerationId())
221+
.minimumGenerationId(config.getMinimumGenerationId())
222+
.syncId(config.getSyncId())
223+
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
224+
.mappers(toConfiguredHashingMappers(config.getHashedFields()))
225+
.build();
202226
}
203227

204228
/**
@@ -230,6 +254,7 @@ private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration
230254
null,
231255
null,
232256
null,
257+
null,
233258
null);
234259
}
235260

airbyte-commons-converters/src/test/java/io/airbyte/commons/converters/CatalogClientConvertersTest.java

+39
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
import com.google.common.collect.Lists;
1010
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
11+
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1112
import io.airbyte.commons.text.Names;
13+
import io.airbyte.config.ConfiguredAirbyteCatalog;
14+
import io.airbyte.config.helpers.FieldGenerator;
15+
import io.airbyte.mappers.helpers.MapperHelperKt;
1216
import io.airbyte.protocol.models.AirbyteCatalog;
1317
import io.airbyte.protocol.models.AirbyteStream;
1418
import io.airbyte.protocol.models.CatalogHelpers;
@@ -22,6 +26,7 @@
2226

2327
class CatalogClientConvertersTest {
2428

29+
private static final FieldGenerator fieldGenerator = new FieldGenerator();
2530
public static final String ID_FIELD_NAME = "id";
2631
private static final String STREAM_NAME = "users-data";
2732
private static final AirbyteStream STREAM = new AirbyteStream()
@@ -57,6 +62,7 @@ class CatalogClientConvertersTest {
5762
null,
5863
null,
5964
null,
65+
null,
6066
null);
6167

6268
private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
@@ -81,6 +87,38 @@ void testConvertToProtocol() {
8187
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
8288
}
8389

90+
@Test
91+
void testConvertInternalWithMapping() {
92+
final var streamConfig = new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration(
93+
io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
94+
io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND,
95+
List.of(ID_FIELD_NAME),
96+
List.of(),
97+
Names.toAlphanumericAndUnderscore(STREAM_NAME),
98+
true,
99+
null,
100+
null,
101+
null,
102+
List.of(new SelectedFieldInfo(List.of(ID_FIELD_NAME))),
103+
null,
104+
null,
105+
null);
106+
final io.airbyte.api.client.model.generated.AirbyteCatalog clientCatalog =
107+
new io.airbyte.api.client.model.generated.AirbyteCatalog(
108+
List.of(
109+
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration(
110+
CLIENT_STREAM,
111+
streamConfig)));
112+
113+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogClientConverters.toConfiguredAirbyteInternal(clientCatalog);
114+
final var stream = configuredCatalog.getStreams().getFirst();
115+
assertEquals(STREAM_NAME, stream.getStream().getName());
116+
assertEquals(1, stream.getFields().size());
117+
assertEquals(fieldGenerator.getFieldsFromSchema(stream.getStream().getJsonSchema()), stream.getFields());
118+
assertEquals(1, stream.getMappers().size());
119+
assertEquals(MapperHelperKt.createHashingMapper(ID_FIELD_NAME), stream.getMappers().getFirst());
120+
}
121+
84122
@Test
85123
void testIsResumableImport() {
86124
final List<Boolean> boolValues = new ArrayList<>(List.of(Boolean.TRUE, Boolean.FALSE));
@@ -120,6 +158,7 @@ void testIsResumableExport() {
120158
null,
121159
null,
122160
null,
161+
null,
123162
null);
124163
final var streamAndConf = new AirbyteStreamAndConfiguration(stream, conf);
125164
final List<AirbyteStreamAndConfiguration> streams = List.of(streamAndConf);

airbyte-commons-server/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ dependencies {
6161
implementation(project(":oss:airbyte-config:specs"))
6262
implementation(project(":oss:airbyte-data"))
6363
implementation(project(":oss:airbyte-featureflag"))
64+
implementation(project(":oss:airbyte-mappers"))
6465
implementation(project(":oss:airbyte-metrics:metrics-lib"))
6566
implementation(project(":oss:airbyte-db:db-lib"))
6667
implementation(project(":oss:airbyte-json-validation"))

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
import io.airbyte.featureflag.FeatureFlagClient;
124124
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
125125
import io.airbyte.featureflag.Workspace;
126+
import io.airbyte.mappers.helpers.MapperHelperKt;
126127
import io.airbyte.metrics.lib.ApmTraceUtils;
127128
import io.airbyte.metrics.lib.MetricAttribute;
128129
import io.airbyte.metrics.lib.MetricClientFactory;
@@ -263,7 +264,10 @@ private void applyPatchToStandardSync(final StandardSync sync, final ConnectionU
263264
validateCatalogDoesntContainDuplicateStreamNames(patch.getSyncCatalog());
264265
validateCatalogSize(patch.getSyncCatalog(), workspaceId, "update");
265266

266-
sync.setCatalog(CatalogConverter.toConfiguredInternal(patch.getSyncCatalog()));
267+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(patch.getSyncCatalog());
268+
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
269+
270+
sync.setCatalog(configuredCatalog);
267271
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
268272
}
269273

@@ -552,7 +556,9 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
552556
validateCatalogDoesntContainDuplicateStreamNames(connectionCreate.getSyncCatalog());
553557
validateCatalogSize(connectionCreate.getSyncCatalog(), workspaceId, "create");
554558

555-
standardSync.withCatalog(CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog()));
559+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog());
560+
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
561+
standardSync.withCatalog(configuredCatalog);
556562
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
557563
} else {
558564
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
@@ -687,7 +693,7 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
687693
return metadata;
688694
}
689695

690-
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, String updateReason, Boolean autoUpdate)
696+
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, final String updateReason, final Boolean autoUpdate)
691697
throws ConfigNotFoundException, IOException, JsonValidationException {
692698

693699
final UUID connectionId = connectionPatch.getConnectionId();

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@
5959
import io.airbyte.config.ActorCatalogFetchEvent;
6060
import io.airbyte.config.ActorDefinitionVersion;
6161
import io.airbyte.config.ConfiguredAirbyteCatalog;
62+
import io.airbyte.config.Field;
6263
import io.airbyte.config.JobStatusSummary;
6364
import io.airbyte.config.RefreshStream.RefreshType;
6465
import io.airbyte.config.StandardSourceDefinition;
6566
import io.airbyte.config.StandardSync;
67+
import io.airbyte.config.helpers.FieldGenerator;
6668
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
6769
import io.airbyte.config.persistence.ConfigNotFoundException;
6870
import io.airbyte.config.persistence.ConfigRepository;
@@ -110,6 +112,7 @@ public class WebBackendConnectionsHandler {
110112
private final ConnectionService connectionService;
111113
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
112114
private final FeatureFlagClient featureFlagClient;
115+
private final FieldGenerator fieldGenerator;
113116

114117
public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDefinitionVersionHandler,
115118
final ConnectionsHandler connectionsHandler,
@@ -123,6 +126,7 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
123126
final ConfigRepository configRepositoryDoNotUse,
124127
final ConnectionService connectionService,
125128
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
129+
final FieldGenerator fieldGenerator,
126130
final FeatureFlagClient featureFlagClient) {
127131
this.actorDefinitionVersionHandler = actorDefinitionVersionHandler;
128132
this.connectionsHandler = connectionsHandler;
@@ -136,6 +140,7 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
136140
this.configRepositoryDoNotUse = configRepositoryDoNotUse;
137141
this.connectionService = connectionService;
138142
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
143+
this.fieldGenerator = fieldGenerator;
139144
this.featureFlagClient = featureFlagClient;
140145
}
141146

@@ -479,9 +484,9 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
479484
* catalog
480485
*/
481486
@VisibleForTesting
482-
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
483-
final AirbyteCatalog originalDiscovered,
484-
final AirbyteCatalog discovered) {
487+
protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
488+
final AirbyteCatalog originalDiscovered,
489+
final AirbyteCatalog discovered) {
485490
/*
486491
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
487492
* we just define a quick-and-dirty record class.
@@ -536,6 +541,17 @@ protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final
536541
outputStreamConfig.setSuggested(originalConfiguredStream.getConfig().getSuggested());
537542
outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());
538543

544+
// Add hashed field configs that are still present in the schema
545+
if (originalStreamConfig.getHashedFields() != null && !originalStreamConfig.getHashedFields().isEmpty()) {
546+
final List<String> discoveredFields =
547+
fieldGenerator.getFieldsFromSchema(stream.getJsonSchema()).stream().map(Field::getName).toList();
548+
for (final SelectedFieldInfo hashedField : originalStreamConfig.getHashedFields()) {
549+
if (discoveredFields.contains(hashedField.getFieldPath().getFirst())) {
550+
outputStreamConfig.addHashedFieldsItem(hashedField);
551+
}
552+
}
553+
}
554+
539555
if (outputStreamConfig.getFieldSelectionEnabled()) {
540556
// TODO(mfsiega-airbyte): support nested fields.
541557
// If field selection is enabled, populate the selected fields.

0 commit comments

Comments
 (0)