Skip to content

Commit 082538c

Browse files
jbfbellaaronsteersJoe Reuteranomnacotolik0
authored andcommitted
Destination Clickhouse - 1.0, remove normalization (#34637)
Co-authored-by: Aaron ("AJ") Steers <[email protected]> Co-authored-by: Joe Reuter <[email protected]> Co-authored-by: Obioma Anomnachi <[email protected]> Co-authored-by: Anatolii Yatsuk <[email protected]> Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: maxi297 <[email protected]> Co-authored-by: Ryan Waskewich <[email protected]> Co-authored-by: Catherine Noll <[email protected]> Co-authored-by: Marius Posta <[email protected]> Co-authored-by: Edward Gao <[email protected]> Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: SatishChGit <[email protected]> Co-authored-by: evantahler <[email protected]> Co-authored-by: Rodi Reich Zilberman <[email protected]> Co-authored-by: Anton Karpets <[email protected]> Co-authored-by: Christo Grabowski <[email protected]> Co-authored-by: Akash Kulkarni <[email protected]> Co-authored-by: Akash Kulkarni <[email protected]> Co-authored-by: Gireesh Sreepathi <[email protected]> Co-authored-by: Artem Inzhyyants <[email protected]>
1 parent 0899722 commit 082538c

File tree

28 files changed

+445
-198
lines changed

28 files changed

+445
-198
lines changed

airbyte-cdk/java/airbyte-cdk/build.gradle

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
3+
plugins {
4+
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
5+
}
6+
17
final var cdkVersion = {
28
var props = new Properties()
39
file("core/src/main/resources/version.properties").withInputStream(props::load)
410
return props.getProperty('version', 'undefined')
511
}()
612

13+
14+
715
allprojects {
816
apply plugin: 'java-library'
917
apply plugin: 'maven-publish'
1018
apply plugin: 'java-test-fixtures'
19+
apply plugin: 'org.jetbrains.kotlin.jvm'
1120

1221
group 'io.airbyte.cdk'
1322

@@ -44,6 +53,19 @@ allprojects {
4453
}
4554
}
4655
}
56+
57+
compileKotlin {
58+
compilerOptions {
59+
jvmTarget = JvmTarget.JVM_21
60+
languageVersion = KotlinVersion.KOTLIN_1_9
61+
}
62+
}
63+
compileTestKotlin {
64+
compilerOptions {
65+
jvmTarget = JvmTarget.JVM_21
66+
languageVersion = KotlinVersion.KOTLIN_1_9
67+
}
68+
}
4769
}
4870

4971
project.configurations {

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55
package io.airbyte.cdk.integrations.base.ssh;
66

7-
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.*;
7+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.CONNECTION_OPTIONS_KEY;
8+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS;
9+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_KEY;
10+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS;
11+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_KEY;
12+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.getInstance;
13+
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.sshWrap;
814

915
import com.fasterxml.jackson.databind.JsonNode;
1016
import com.fasterxml.jackson.databind.node.ObjectNode;

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/StandardNameTransformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public String getNamespace(final String namespace) {
3131
}
3232

3333
@Override
34+
// @Deprecated see https://github.com/airbytehq/airbyte/issues/35333
3435
public String getRawTableName(final String streamName) {
3536
return convertStreamName("_airbyte_raw_" + streamName);
3637
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.airbyte.cdk.integrations.util
2+
3+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
4+
5+
/**
6+
* For streams in [catalog] which do not have a namespace specified, explicitly set their namespace
7+
* to the [defaultNamespace]
8+
*/
9+
fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) {
10+
if (defaultNamespace == null) {
11+
return
12+
}
13+
// TODO: This logic exists in all V2 destinations.
14+
// This is sad that if we forget to add this, there will be a null pointer during parseCatalog
15+
for (catalogStream in catalog.streams) {
16+
if (catalogStream.stream.namespace.isNullOrEmpty()) {
17+
catalogStream.stream.namespace = defaultNamespace
18+
}
19+
}
20+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.21.4
1+
version=0.22.1

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.integrations.destination.jdbc;
66

77
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
8+
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;
89

910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.google.common.annotations.VisibleForTesting;
@@ -40,7 +41,6 @@
4041
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
4142
import io.airbyte.protocol.models.v0.AirbyteMessage;
4243
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
43-
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
4444
import java.sql.SQLException;
4545
import java.util.List;
4646
import java.util.Map;
@@ -49,7 +49,6 @@
4949
import java.util.function.Consumer;
5050
import javax.sql.DataSource;
5151
import org.apache.commons.lang3.NotImplementedException;
52-
import org.apache.commons.lang3.StringUtils;
5352
import org.slf4j.Logger;
5453
import org.slf4j.LoggerFactory;
5554

@@ -72,6 +71,10 @@ protected SqlOperations getSqlOperations() {
7271
return sqlOperations;
7372
}
7473

74+
protected String getConfigSchemaKey() {
75+
return "schema";
76+
}
77+
7578
public AbstractJdbcDestination(final String driverClass,
7679
final NamingConventionTransformer namingResolver,
7780
final SqlOperations sqlOperations) {
@@ -276,44 +279,16 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
276279
final ConfiguredAirbyteCatalog catalog,
277280
final Consumer<AirbyteMessage> outputRecordCollector)
278281
throws Exception {
279-
final DataSource dataSource = getDataSource(config);
280-
final JdbcDatabase database = getDatabase(dataSource);
282+
final JdbcDatabase database = getDatabase(getDataSource(config));
283+
final String defaultNamespace;
284+
final TyperDeduper typerDeduper;
281285
if (TypingAndDedupingFlag.isDestinationV2()) {
282-
// TODO: This logic exists in all V2 destinations.
283-
// This is sad that if we forget to add this, there will be a null pointer during parseCatalog
284-
final String defaultNamespace = config.get("schema").asText();
285-
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
286-
if (StringUtils.isEmpty(stream.getStream().getNamespace())) {
287-
stream.getStream().setNamespace(defaultNamespace);
288-
}
289-
}
290-
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
291-
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
292-
.map(override -> new CatalogParser(sqlGenerator, override))
293-
.orElse(new CatalogParser(sqlGenerator))
294-
.parseCatalog(catalog);
295-
final String databaseName = getDatabaseName(config);
296-
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
297-
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
298-
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
299-
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
300-
final TyperDeduper typerDeduper;
301-
if (disableTypeDedupe) {
302-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator,
303-
8);
304-
} else {
305-
typerDeduper =
306-
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, 8);
307-
}
308-
return JdbcBufferedConsumerFactory.createAsync(
309-
outputRecordCollector,
310-
database,
311-
sqlOperations,
312-
namingResolver,
313-
config,
314-
catalog,
315-
defaultNamespace,
316-
typerDeduper);
286+
defaultNamespace = config.get(getConfigSchemaKey()).asText();
287+
addDefaultNamespaceToStreams(catalog, defaultNamespace);
288+
typerDeduper = getV2TyperDeduper(config, catalog, database);
289+
} else {
290+
defaultNamespace = null;
291+
typerDeduper = new NoopTyperDeduper();
317292
}
318293
return JdbcBufferedConsumerFactory.createAsync(
319294
outputRecordCollector,
@@ -322,8 +297,37 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
322297
namingResolver,
323298
config,
324299
catalog,
325-
null,
326-
new NoopTyperDeduper());
300+
defaultNamespace,
301+
typerDeduper);
302+
}
303+
304+
/**
305+
* Creates the appropriate TyperDeduper class for the jdbc destination and the user's configuration
306+
*
307+
* @param config the configuration for the connection
308+
* @param catalog the catalog for the connection
309+
* @param database a database instance
310+
* @return the appropriate TyperDeduper instance for this connection.
311+
*/
312+
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
313+
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
314+
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
315+
.map(override -> new CatalogParser(sqlGenerator, override))
316+
.orElse(new CatalogParser(sqlGenerator))
317+
.parseCatalog(catalog);
318+
final String databaseName = getDatabaseName(config);
319+
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
320+
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
321+
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
322+
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
323+
final TyperDeduper typerDeduper;
324+
if (disableTypeDedupe) {
325+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
326+
} else {
327+
typerDeduper =
328+
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
329+
}
330+
return typerDeduper;
327331
}
328332

329333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
2+
3+
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
4+
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
5+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
6+
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
7+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
8+
import org.jooq.Condition
9+
import org.jooq.DataType
10+
import org.jooq.Field
11+
import org.jooq.SQLDialect
12+
import java.util.*
13+
14+
/**
15+
* Some Destinations do not support Typing and Deduping but have the updated raw table format
16+
* SqlGenerator implementations are only for "final" tables and are a required input for
17+
* TyperDeduper classes. This implementation appeases that requirement but does not implement
18+
* any "final" table operations.
19+
*/
20+
class RawOnlySqlGenerator(private val namingTransformer: NamingConventionTransformer) :
21+
JdbcSqlGenerator(namingTransformer) {
22+
override fun getStructType(): DataType<*>? {
23+
throw NotImplementedError("This Destination does not support final tables")
24+
}
25+
26+
override fun getArrayType(): DataType<*>? {
27+
throw NotImplementedError("This Destination does not support final tables")
28+
}
29+
30+
override fun getWidestType(): DataType<*>? {
31+
throw NotImplementedError("This Destination does not support final tables")
32+
}
33+
34+
override fun getDialect(): SQLDialect? {
35+
throw NotImplementedError("This Destination does not support final tables")
36+
}
37+
38+
override fun extractRawDataFields(
39+
columns: LinkedHashMap<ColumnId, AirbyteType>,
40+
useExpensiveSaferCasting: Boolean
41+
): List<Field<*>>? {
42+
throw NotImplementedError("This Destination does not support final tables")
43+
}
44+
45+
override fun buildAirbyteMetaColumn(columns: LinkedHashMap<ColumnId, AirbyteType>): Field<*>? {
46+
throw NotImplementedError("This Destination does not support final tables")
47+
}
48+
49+
override fun cdcDeletedAtNotNullCondition(): Condition? {
50+
throw NotImplementedError("This Destination does not support final tables")
51+
}
52+
53+
override fun getRowNumber(
54+
primaryKey: List<ColumnId>,
55+
cursorField: Optional<ColumnId>
56+
): Field<Int>? {
57+
throw NotImplementedError("This Destination does not support final tables")
58+
}
59+
60+
override fun existingSchemaMatchesStreamConfig(
61+
stream: StreamConfig,
62+
existingTable: TableDefinition
63+
): Boolean {
64+
throw NotImplementedError("This Destination does not support final tables")
65+
}
66+
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ public void testIncrementalDedupeSync() throws Exception {
789789
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
790790
.collect(Collectors.toList());
791791
final JsonNode config = getConfig();
792-
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, true);
792+
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, supportsNormalization());
793793

794794
final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(
795795
new AirbyteMessage()
@@ -820,7 +820,7 @@ public void testIncrementalDedupeSync() throws Exception {
820820
.withType(Type.STATE)
821821
.withState(new AirbyteStateMessage().withData(
822822
Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));
823-
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, true);
823+
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, false);
824824

825825
final List<AirbyteMessage> expectedMessagesAfterSecondSync = new ArrayList<>();
826826
expectedMessagesAfterSecondSync.addAll(firstSyncMessages);
@@ -853,22 +853,11 @@ public void testIncrementalDedupeSync() throws Exception {
853853
final String defaultSchema = getDefaultSchema(config);
854854
retrieveRawRecordsAndAssertSameMessages(catalog, expectedMessagesAfterSecondSync,
855855
defaultSchema);
856-
final List<AirbyteRecordMessage> actualMessages = retrieveNormalizedRecords(catalog,
857-
defaultSchema);
858-
assertSameMessages(expectedMessages, actualMessages, true);
859-
}
860-
861-
private String generateBigString(final int addExtraCharacters) {
862-
final int length = getMaxRecordValueLimit() + addExtraCharacters;
863-
return RANDOM
864-
.ints('a', 'z' + 1)
865-
.limit(length)
866-
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
867-
.toString();
868-
}
869-
870-
protected int getGenerateBigStringAddExtraCharacters() {
871-
return 0;
856+
if (normalizationFromDefinition()) {
857+
final List<AirbyteRecordMessage> actualMessages = retrieveNormalizedRecords(catalog,
858+
defaultSchema);
859+
assertSameMessages(expectedMessages, actualMessages, true);
860+
}
872861
}
873862

874863
/**
@@ -1347,7 +1336,7 @@ private List<AirbyteMessage> runSync(
13471336

13481337
destination.close();
13491338

1350-
if (!runNormalization || (runNormalization && supportsInDestinationNormalization())) {
1339+
if (!runNormalization || (supportsInDestinationNormalization())) {
13511340
return destinationOutput;
13521341
}
13531342

@@ -1860,6 +1849,10 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
18601849

18611850
}
18621851

1852+
private boolean supportsNormalization() {
1853+
return supportsInDestinationNormalization() || normalizationFromDefinition();
1854+
}
1855+
18631856
private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
18641857
return Jsons.object(Jsons.jsonNode(v1), klass);
18651858
}

0 commit comments

Comments
 (0)