Skip to content

Commit 6cca28f

Browse files
committed
revert: "feat: add internal api to support field hashing" (#13859)
1 parent bf799dd commit 6cca28f

File tree

28 files changed

+83
-617
lines changed

28 files changed

+83
-617
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

-3
Original file line numberDiff line numberDiff line change
@@ -10538,9 +10538,6 @@ components:
1053810538
selectedFields:
1053910539
description: This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
1054010540
$ref: "#/components/schemas/SelectedFields"
10541-
hashedFields:
10542-
description: Fields that should be hashed before being written to the destination.
10543-
$ref: "#/components/schemas/SelectedFields"
1054410541
minimumGenerationId:
1054510542
type: integer
1054610543
format: int64

airbyte-commons-converters/build.gradle.kts

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ dependencies {
1616
implementation(project(":oss:airbyte-commons"))
1717
implementation(project(":oss:airbyte-config:config-models"))
1818
implementation(project(":oss:airbyte-json-validation"))
19-
implementation(project(":oss:airbyte-mappers"))
2019
implementation(libs.airbyte.protocol)
2120
implementation(libs.guava)
2221
implementation(libs.slf4j.api)

airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java

+9-34
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,11 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
1010
import io.airbyte.api.client.model.generated.DestinationSyncMode;
11-
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1211
import io.airbyte.api.client.model.generated.SyncMode;
1312
import io.airbyte.commons.enums.Enums;
1413
import io.airbyte.commons.text.Names;
1514
import io.airbyte.config.ConfiguredAirbyteStream;
16-
import io.airbyte.config.ConfiguredMapper;
17-
import io.airbyte.config.helpers.FieldGenerator;
18-
import io.airbyte.mappers.helpers.MapperHelperKt;
1915
import io.airbyte.validation.json.JsonValidationException;
20-
import jakarta.annotation.Nullable;
2116
import java.util.Collections;
2217
import java.util.List;
2318
import java.util.Optional;
@@ -32,9 +27,6 @@
3227
*/
3328
public class CatalogClientConverters {
3429

35-
// TODO(pedro): This should be refactored to use dependency injection.
36-
private static final FieldGenerator fieldGenerator = new FieldGenerator();
37-
3830
/**
3931
* Convert to api model to airbyte protocol model.
4032
*
@@ -195,34 +187,18 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
195187
.withIsResumable(stream.isResumable());
196188
}
197189

198-
private static List<ConfiguredMapper> toConfiguredHashingMappers(final @Nullable List<SelectedFieldInfo> hashedFields) {
199-
if (hashedFields == null) {
200-
return Collections.emptyList();
201-
}
202-
203-
// FIXME(pedro): See https://github.com/airbytehq/airbyte-internal-issues/issues/9718
204-
// We shouldn't have to rebuild these here, and can potentially lead to losing configuration that's
205-
// actually stored in the db.
206-
return hashedFields.stream().map(f -> MapperHelperKt.createHashingMapper(f.getFieldPath().getFirst()) // We don't support nested fields for now.
207-
).toList();
208-
}
209-
210190
private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
211191
final AirbyteStreamConfiguration config)
212192
throws JsonValidationException {
213-
final var convertedStream = toStreamInternal(stream, config);
214-
return new ConfiguredAirbyteStream.Builder()
215-
.stream(convertedStream)
216-
.syncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class))
217-
.destinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
218-
.primaryKey(config.getPrimaryKey())
219-
.cursorField(config.getCursorField())
220-
.generationId(config.getGenerationId())
221-
.minimumGenerationId(config.getMinimumGenerationId())
222-
.syncId(config.getSyncId())
223-
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
224-
.mappers(toConfiguredHashingMappers(config.getHashedFields()))
225-
.build();
193+
return new ConfiguredAirbyteStream(
194+
toStreamInternal(stream, config),
195+
Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class),
196+
Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
197+
.withPrimaryKey(config.getPrimaryKey())
198+
.withCursorField(config.getCursorField())
199+
.withGenerationId(config.getGenerationId())
200+
.withMinimumGenerationId(config.getMinimumGenerationId())
201+
.withSyncId(config.getSyncId());
226202
}
227203

228204
/**
@@ -254,7 +230,6 @@ private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration
254230
null,
255231
null,
256232
null,
257-
null,
258233
null);
259234
}
260235

airbyte-commons-converters/src/test/java/io/airbyte/commons/converters/CatalogClientConvertersTest.java

-39
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@
88

99
import com.google.common.collect.Lists;
1010
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
11-
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
1211
import io.airbyte.commons.text.Names;
13-
import io.airbyte.config.ConfiguredAirbyteCatalog;
14-
import io.airbyte.config.helpers.FieldGenerator;
15-
import io.airbyte.mappers.helpers.MapperHelperKt;
1612
import io.airbyte.protocol.models.AirbyteCatalog;
1713
import io.airbyte.protocol.models.AirbyteStream;
1814
import io.airbyte.protocol.models.CatalogHelpers;
@@ -26,7 +22,6 @@
2622

2723
class CatalogClientConvertersTest {
2824

29-
private static final FieldGenerator fieldGenerator = new FieldGenerator();
3025
public static final String ID_FIELD_NAME = "id";
3126
private static final String STREAM_NAME = "users-data";
3227
private static final AirbyteStream STREAM = new AirbyteStream()
@@ -62,7 +57,6 @@ class CatalogClientConvertersTest {
6257
null,
6358
null,
6459
null,
65-
null,
6660
null);
6761

6862
private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
@@ -87,38 +81,6 @@ void testConvertToProtocol() {
8781
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
8882
}
8983

90-
@Test
91-
void testConvertInternalWithMapping() {
92-
final var streamConfig = new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration(
93-
io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
94-
io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND,
95-
List.of(ID_FIELD_NAME),
96-
List.of(),
97-
Names.toAlphanumericAndUnderscore(STREAM_NAME),
98-
true,
99-
null,
100-
null,
101-
null,
102-
List.of(new SelectedFieldInfo(List.of(ID_FIELD_NAME))),
103-
null,
104-
null,
105-
null);
106-
final io.airbyte.api.client.model.generated.AirbyteCatalog clientCatalog =
107-
new io.airbyte.api.client.model.generated.AirbyteCatalog(
108-
List.of(
109-
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration(
110-
CLIENT_STREAM,
111-
streamConfig)));
112-
113-
final ConfiguredAirbyteCatalog configuredCatalog = CatalogClientConverters.toConfiguredAirbyteInternal(clientCatalog);
114-
final var stream = configuredCatalog.getStreams().getFirst();
115-
assertEquals(STREAM_NAME, stream.getStream().getName());
116-
assertEquals(1, stream.getFields().size());
117-
assertEquals(fieldGenerator.getFieldsFromSchema(stream.getStream().getJsonSchema()), stream.getFields());
118-
assertEquals(1, stream.getMappers().size());
119-
assertEquals(MapperHelperKt.createHashingMapper(ID_FIELD_NAME), stream.getMappers().getFirst());
120-
}
121-
12284
@Test
12385
void testIsResumableImport() {
12486
final List<Boolean> boolValues = new ArrayList<>(List.of(Boolean.TRUE, Boolean.FALSE));
@@ -158,7 +120,6 @@ void testIsResumableExport() {
158120
null,
159121
null,
160122
null,
161-
null,
162123
null);
163124
final var streamAndConf = new AirbyteStreamAndConfiguration(stream, conf);
164125
final List<AirbyteStreamAndConfiguration> streams = List.of(streamAndConf);

airbyte-commons-server/build.gradle.kts

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ dependencies {
6161
implementation(project(":oss:airbyte-config:specs"))
6262
implementation(project(":oss:airbyte-data"))
6363
implementation(project(":oss:airbyte-featureflag"))
64-
implementation(project(":oss:airbyte-mappers"))
6564
implementation(project(":oss:airbyte-metrics:metrics-lib"))
6665
implementation(project(":oss:airbyte-db:db-lib"))
6766
implementation(project(":oss:airbyte-json-validation"))

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@
123123
import io.airbyte.featureflag.FeatureFlagClient;
124124
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
125125
import io.airbyte.featureflag.Workspace;
126-
import io.airbyte.mappers.helpers.MapperHelperKt;
127126
import io.airbyte.metrics.lib.ApmTraceUtils;
128127
import io.airbyte.metrics.lib.MetricAttribute;
129128
import io.airbyte.metrics.lib.MetricClientFactory;
@@ -264,10 +263,7 @@ private void applyPatchToStandardSync(final StandardSync sync, final ConnectionU
264263
validateCatalogDoesntContainDuplicateStreamNames(patch.getSyncCatalog());
265264
validateCatalogSize(patch.getSyncCatalog(), workspaceId, "update");
266265

267-
final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(patch.getSyncCatalog());
268-
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
269-
270-
sync.setCatalog(configuredCatalog);
266+
sync.setCatalog(CatalogConverter.toConfiguredInternal(patch.getSyncCatalog()));
271267
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
272268
}
273269

@@ -556,9 +552,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
556552
validateCatalogDoesntContainDuplicateStreamNames(connectionCreate.getSyncCatalog());
557553
validateCatalogSize(connectionCreate.getSyncCatalog(), workspaceId, "create");
558554

559-
final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog());
560-
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
561-
standardSync.withCatalog(configuredCatalog);
555+
standardSync.withCatalog(CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog()));
562556
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
563557
} else {
564558
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
@@ -693,7 +687,7 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
693687
return metadata;
694688
}
695689

696-
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, final String updateReason, final Boolean autoUpdate)
690+
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, String updateReason, Boolean autoUpdate)
697691
throws ConfigNotFoundException, IOException, JsonValidationException {
698692

699693
final UUID connectionId = connectionPatch.getConnectionId();

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,10 @@
5959
import io.airbyte.config.ActorCatalogFetchEvent;
6060
import io.airbyte.config.ActorDefinitionVersion;
6161
import io.airbyte.config.ConfiguredAirbyteCatalog;
62-
import io.airbyte.config.Field;
6362
import io.airbyte.config.JobStatusSummary;
6463
import io.airbyte.config.RefreshStream.RefreshType;
6564
import io.airbyte.config.StandardSourceDefinition;
6665
import io.airbyte.config.StandardSync;
67-
import io.airbyte.config.helpers.FieldGenerator;
6866
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
6967
import io.airbyte.config.persistence.ConfigNotFoundException;
7068
import io.airbyte.config.persistence.ConfigRepository;
@@ -112,7 +110,6 @@ public class WebBackendConnectionsHandler {
112110
private final ConnectionService connectionService;
113111
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
114112
private final FeatureFlagClient featureFlagClient;
115-
private final FieldGenerator fieldGenerator;
116113

117114
public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDefinitionVersionHandler,
118115
final ConnectionsHandler connectionsHandler,
@@ -126,7 +123,6 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
126123
final ConfigRepository configRepositoryDoNotUse,
127124
final ConnectionService connectionService,
128125
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
129-
final FieldGenerator fieldGenerator,
130126
final FeatureFlagClient featureFlagClient) {
131127
this.actorDefinitionVersionHandler = actorDefinitionVersionHandler;
132128
this.connectionsHandler = connectionsHandler;
@@ -140,7 +136,6 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
140136
this.configRepositoryDoNotUse = configRepositoryDoNotUse;
141137
this.connectionService = connectionService;
142138
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
143-
this.fieldGenerator = fieldGenerator;
144139
this.featureFlagClient = featureFlagClient;
145140
}
146141

@@ -484,9 +479,9 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
484479
* catalog
485480
*/
486481
@VisibleForTesting
487-
protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
488-
final AirbyteCatalog originalDiscovered,
489-
final AirbyteCatalog discovered) {
482+
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
483+
final AirbyteCatalog originalDiscovered,
484+
final AirbyteCatalog discovered) {
490485
/*
491486
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
492487
* we just define a quick-and-dirty record class.
@@ -541,17 +536,6 @@ protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final Airbyt
541536
outputStreamConfig.setSuggested(originalConfiguredStream.getConfig().getSuggested());
542537
outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());
543538

544-
// Add hashed field configs that are still present in the schema
545-
if (originalStreamConfig.getHashedFields() != null && !originalStreamConfig.getHashedFields().isEmpty()) {
546-
final List<String> discoveredFields =
547-
fieldGenerator.getFieldsFromSchema(stream.getJsonSchema()).stream().map(Field::getName).toList();
548-
for (final SelectedFieldInfo hashedField : originalStreamConfig.getHashedFields()) {
549-
if (discoveredFields.contains(hashedField.getFieldPath().getFirst())) {
550-
outputStreamConfig.addHashedFieldsItem(hashedField);
551-
}
552-
}
553-
}
554-
555539
if (outputStreamConfig.getFieldSelectionEnabled()) {
556540
// TODO(mfsiega-airbyte): support nested fields.
557541
// If field selection is enabled, populate the selected fields.

0 commit comments

Comments
 (0)