Skip to content

Commit 3c97cdd

Browse files
authored
🐛Destination-clickhouse-strict-encrypt: enabled normalization tests (#15069)
* [14858] Destination-clickhouse-strict-encrypt: enabled normalization tests
1 parent dbe2c25 commit 3c97cdd

File tree

4 files changed

+174
-49
lines changed

4 files changed

+174
-49
lines changed

airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ dependencies {
2626
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-clickhouse')
2727
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
2828
integrationTestJavaImplementation libs.connectors.destination.testcontainers.clickhouse
29+
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
2930
}

airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.db.jdbc.JdbcUtils;
1818
import io.airbyte.integrations.base.JavaBaseConstants;
1919
import io.airbyte.integrations.destination.ExtendedNameTransformer;
20+
import io.airbyte.integrations.standardtest.destination.DataTypeTestArgumentProvider;
2021
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
2122
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
2223
import io.airbyte.integrations.util.HostPortResolver;
@@ -25,7 +26,8 @@
2526
import java.util.ArrayList;
2627
import java.util.List;
2728
import java.util.stream.Collectors;
28-
import org.junit.jupiter.api.Disabled;
29+
import org.junit.jupiter.params.ParameterizedTest;
30+
import org.junit.jupiter.params.provider.ArgumentsSource;
2931
import org.slf4j.Logger;
3032
import org.slf4j.LoggerFactory;
3133
import org.testcontainers.containers.BindMode;
@@ -56,7 +58,7 @@ private static JdbcDatabase getDatabase(final JsonNode config) {
5658
config.get(JdbcUtils.USERNAME_KEY).asText(),
5759
config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null,
5860
ClickhouseDestination.DRIVER_CLASS,
59-
jdbcStr));
61+
jdbcStr), new ClickhouseTestSourceOperations());
6062
}
6163

6264
@Override
@@ -66,7 +68,7 @@ protected String getImageName() {
6668

6769
@Override
6870
protected boolean supportsNormalization() {
69-
return false;
71+
return true;
7072
}
7173

7274
@Override
@@ -109,16 +111,15 @@ protected String getDefaultSchema(final JsonNode config) {
109111

110112
@Override
111113
protected JsonNode getConfig() {
112-
// Note: ClickHouse official JDBC driver uses HTTP protocol, its default port is 8123
113-
// dbt clickhouse adapter uses native protocol, its default port is 9000
114-
// Since we disabled normalization and dbt test, we only use the JDBC port here.
115114
return Jsons.jsonNode(ImmutableMap.builder()
116115
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(db))
116+
.put(JdbcUtils.TCP_PORT_KEY, NATIVE_SECURE_PORT)
117117
.put(JdbcUtils.PORT_KEY, HTTPS_PORT)
118118
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
119119
.put(JdbcUtils.USERNAME_KEY, USER_NAME)
120120
.put(JdbcUtils.PASSWORD_KEY, "")
121121
.put(JdbcUtils.SCHEMA_KEY, DB_NAME)
122+
.put(JdbcUtils.SSL_KEY, true)
122123
.build());
123124
}
124125

@@ -194,49 +195,20 @@ protected void tearDown(final TestDestinationEnv testEnv) {
194195
db.close();
195196
}
196197

197-
/**
198-
* The SQL script generated by old version of dbt in 'test' step isn't compatible with ClickHouse,
199-
* so we skip this test for now.
200-
*
201-
* Ref: https://github.com/dbt-labs/dbt-core/issues/3905
202-
*
203-
* @throws Exception
204-
*/
205-
@Disabled
206-
public void testCustomDbtTransformations() throws Exception {
207-
super.testCustomDbtTransformations();
208-
}
209-
210-
@Disabled
211-
public void testCustomDbtTransformationsFailure() throws Exception {}
212-
213-
/**
214-
* The normalization container needs native port, while destination container needs HTTP port, we
215-
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
216-
* test, so we skip it.
217-
*
218-
* @throws Exception
219-
*/
220-
@Disabled
221-
public void testIncrementalDedupeSync() throws Exception {
222-
super.testIncrementalDedupeSync();
223-
}
224-
225-
/**
226-
* The normalization container needs native port, while destination container needs HTTP port, we
227-
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
228-
* test, so we skip it.
229-
*
230-
* @throws Exception
231-
*/
232-
@Disabled
233-
public void testSyncWithNormalization(final String messagesFilename, final String catalogFilename) throws Exception {
234-
super.testSyncWithNormalization(messagesFilename, catalogFilename);
235-
}
236-
237-
@Disabled
238-
public void specNormalizationValueShouldBeCorrect() throws Exception {
239-
super.specNormalizationValueShouldBeCorrect();
198+
@ParameterizedTest
199+
@ArgumentsSource(DataTypeTestArgumentProvider.class)
200+
public void testDataTypeTestWithNormalization(final String messagesFilename,
201+
final String catalogFilename,
202+
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
203+
throws Exception {
204+
205+
// arrays are not fully supported yet in jdbc driver
206+
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
207+
if (messagesFilename.contains("array")) {
208+
return;
209+
}
210+
211+
super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
240212
}
241213

242214
}

airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestDataComparator.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,32 @@
66

77
import io.airbyte.integrations.destination.ExtendedNameTransformer;
88
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
9+
import java.time.LocalDate;
10+
import java.time.LocalDateTime;
11+
import java.time.ZoneOffset;
12+
import java.time.ZonedDateTime;
13+
import java.time.format.DateTimeFormatter;
14+
import java.time.format.DateTimeParseException;
915
import java.util.ArrayList;
1016
import java.util.List;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1119

1220
public class ClickhouseTestDataComparator extends AdvancedTestDataComparator {
1321

22+
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class);
1423
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
1524

25+
private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
26+
27+
// https://clickhouse.com/docs/en/sql-reference/data-types/date32/
28+
private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01");
29+
private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06");
30+
private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse(
31+
"1925-01-01T00:00:00.000Z");
32+
private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse(
33+
"2283-11-10T20:23:45.000Z");
34+
1635
@Override
1736
protected List<String> resolveIdentifier(final String identifier) {
1837
final List<String> result = new ArrayList<>();
@@ -26,4 +45,103 @@ protected List<String> resolveIdentifier(final String identifier) {
2645
return result;
2746
}
2847

48+
@Override
49+
protected boolean compareNumericValues(final String firstNumericValue,
50+
final String secondNumericValue) {
51+
// clickhouse stores double 1.14 as 1.1400000000000001
52+
// https://clickhouse.com/docs/en/sql-reference/data-types/float/
53+
double epsilon = 0.000000000000001d;
54+
55+
double firstValue = Double.parseDouble(firstNumericValue);
56+
double secondValue = Double.parseDouble(secondNumericValue);
57+
58+
return Math.abs(firstValue - secondValue) < epsilon;
59+
}
60+
61+
@Override
62+
protected boolean compareBooleanValues(final String firstValue, final String secondValue) {
63+
return parseBool(firstValue) == parseBool(secondValue);
64+
}
65+
66+
@Override
67+
protected boolean compareDateValues(final String airbyteMessageValue,
68+
final String destinationValue) {
69+
final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue);
70+
final LocalDate actualDate = LocalDate.parse(destinationValue);
71+
72+
if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) {
73+
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
74+
// so actually you end up with unpredicted values, more
75+
// https://clickhouse.com/docs/en/sql-reference/data-types/date32
76+
LOGGER.warn(
77+
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
78+
return true;
79+
}
80+
81+
return actualDate.equals(expectedDate);
82+
}
83+
84+
@Override
85+
protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue,
86+
final String destinationValue) {
87+
try {
88+
ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue,
89+
getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
90+
ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue);
91+
92+
if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) {
93+
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
94+
// so actually you end up with unpredicted values, more
95+
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
96+
LOGGER.warn(
97+
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
98+
return true;
99+
}
100+
return airbyteDate.equals(destinationDate);
101+
} catch (DateTimeParseException e) {
102+
LOGGER.warn(
103+
"Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
104+
airbyteMessageValue, destinationValue, e);
105+
return compareTextValues(airbyteMessageValue, destinationValue);
106+
}
107+
}
108+
109+
@Override
110+
protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) {
111+
return ZonedDateTime.parse(destinationValue,
112+
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(
113+
ZoneOffset.UTC);
114+
}
115+
116+
@Override
117+
protected boolean compareDateTimeValues(final String airbyteMessageValue,
118+
final String destinationValue) {
119+
final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue);
120+
final LocalDateTime actualDateTime = LocalDateTime.parse(destinationValue,
121+
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT));
122+
123+
if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime())
124+
|| expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) {
125+
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
126+
// so actually you end up with unpredicted values, more
127+
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
128+
LOGGER.warn(
129+
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
130+
return true;
131+
}
132+
133+
return expectedDateTime.equals(actualDateTime);
134+
}
135+
136+
private boolean parseBool(final String valueAsString) {
137+
// boolen as a String may be returned as true\false and as 0\1
138+
// https://clickhouse.com/docs/en/sql-reference/data-types/boolean
139+
try {
140+
return Integer.parseInt(valueAsString) > 0;
141+
} catch (final NumberFormatException ex) {
142+
return Boolean.parseBoolean(valueAsString);
143+
}
144+
145+
}
146+
29147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.clickhouse;
6+
7+
import com.fasterxml.jackson.databind.node.ObjectNode;
8+
import io.airbyte.db.DataTypeUtils;
9+
import io.airbyte.db.jdbc.JdbcSourceOperations;
10+
import java.sql.ResultSet;
11+
import java.sql.SQLException;
12+
import java.time.LocalDate;
13+
import java.time.LocalDateTime;
14+
import java.time.format.DateTimeFormatter;
15+
16+
public class ClickhouseTestSourceOperations extends JdbcSourceOperations {
17+
18+
@Override
19+
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
20+
node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime()));
21+
}
22+
23+
@Override
24+
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
25+
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
26+
final LocalDate date = timestamp.toLocalDate();
27+
28+
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
29+
DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN);
30+
31+
node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter)));
32+
}
33+
34+
}

0 commit comments

Comments
 (0)