Skip to content

Commit faad484

Browse files
authored
Destination mysql: upgrade to latest cdk (#36926)
1 parent 46b977a commit faad484

File tree

18 files changed

+121
-27
lines changed

18 files changed

+121
-27
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
144144

145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147+
| 0.30.2 | 2024-04-12 | [\#36926](https://github.com/airbytehq/airbyte/pull/36926) | Destinations: Remove `JdbcSqlOperations#formatData`; misc changes for java interop |
147148
| 0.30.1 | 2024-04-11 | [\#36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix regression in sources conversion of null values |
148149
| 0.30.0 | 2024-04-11 | [\#36974](https://github.com/airbytehq/airbyte/pull/36974) | Destinations: Pass config to jdbc sqlgenerator; allow cascade drop |
149150
| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
9+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
10+
import io.airbyte.protocol.models.v0.StreamDescriptor
11+
12+
/** @see StandardNameTransformer.formatJsonPath for details on what this class does. */
13+
class PropertyNameSimplifyingDataTransformer : StreamAwareDataTransformer {
14+
override fun transform(
15+
streamDescriptor: StreamDescriptor?,
16+
data: JsonNode?,
17+
meta: AirbyteRecordMessageMeta?,
18+
): Pair<JsonNode?, AirbyteRecordMessageMeta?> {
19+
if (data == null) {
20+
return Pair(null, meta)
21+
}
22+
return Pair(StandardNameTransformer.formatJsonPath(data), meta)
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.30.1
1+
version=0.30.2

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+1
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
368368
* This method is deprecated. It verifies table creation, but not insert right to a newly
369369
* created table. Use attemptTableOperations with the attemptInsert argument instead.
370370
*/
371+
@JvmStatic
371372
@Deprecated("")
372373
@Throws(Exception::class)
373374
fun attemptSQLCreateAndDropTableOperations(

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

-4
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,6 @@ abstract class JdbcSqlOperations : SqlOperations {
155155
}
156156
}
157157

158-
protected fun formatData(data: JsonNode): JsonNode {
159-
return data
160-
}
161-
162158
override fun truncateTableQuery(
163159
database: JdbcDatabase?,
164160
schemaName: String?,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/WriteConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ constructor(
1919
*
2020
* @return
2121
*/
22-
val namespace: String,
22+
val namespace: String?,
2323
val outputSchemaName: String,
2424
val tmpTableName: String?,
2525
val outputTableName: String?,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import java.nio.file.Files
5959
import java.nio.file.Path
6060
import java.time.Instant
6161
import java.util.*
62+
import java.util.concurrent.TimeUnit
6263
import java.util.concurrent.atomic.AtomicInteger
6364
import java.util.function.Consumer
6465
import java.util.stream.Collectors
@@ -604,7 +605,7 @@ abstract class DestinationAcceptanceTest {
604605
*/
605606
@Test
606607
@Throws(Exception::class)
607-
fun testLineBreakCharacters() {
608+
open fun testLineBreakCharacters() {
608609
val catalog =
609610
Jsons.deserialize<AirbyteCatalog>(
610611
MoreResources.readResource(
@@ -867,6 +868,8 @@ abstract class DestinationAcceptanceTest {
867868
@ParameterizedTest
868869
@ArgumentsSource(DataArgumentsProvider::class)
869870
@Throws(Exception::class)
871+
// Normalization is a pretty slow process. Increase our test timeout.
872+
@Timeout(value = 300, unit = TimeUnit.SECONDS)
870873
open fun testSyncWithNormalization(messagesFilename: String, catalogFilename: String) {
871874
if (!normalizationFromDefinition()) {
872875
return
@@ -879,7 +882,7 @@ abstract class DestinationAcceptanceTest {
879882
)
880883
val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog)
881884
val messages =
882-
MoreResources.readResource(messagesFilename).lines().map {
885+
MoreResources.readResource(messagesFilename).trim().lines().map {
883886
Jsons.deserialize(it, AirbyteMessage::class.java)
884887
}
885888

@@ -1212,6 +1215,7 @@ abstract class DestinationAcceptanceTest {
12121215
getProtocolVersion()
12131216
)
12141217
)
1218+
.trim()
12151219
.lines()
12161220
.map { Jsons.deserialize(it, AirbyteMessage::class.java) }
12171221
val messagesWithNewNamespace = getRecordMessagesWithNewNamespace(messages, namespace)
@@ -1260,12 +1264,12 @@ abstract class DestinationAcceptanceTest {
12601264
val messageFile: String =
12611265
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())
12621266
val ns1Messages =
1263-
MoreResources.readResource(messageFile).lines().map {
1267+
MoreResources.readResource(messageFile).trim().lines().map {
12641268
Jsons.deserialize(it, AirbyteMessage::class.java)
12651269
}
12661270
val ns1MessagesAtNamespace1 = getRecordMessagesWithNewNamespace(ns1Messages, namespace1)
12671271
val ns2Messages: List<io.airbyte.protocol.models.v0.AirbyteMessage> =
1268-
MoreResources.readResource(messageFile).lines().map {
1272+
MoreResources.readResource(messageFile).trim().lines().map {
12691273
Jsons.deserialize(it, AirbyteMessage::class.java)
12701274
}
12711275
val ns2MessagesAtNamespace2 = getRecordMessagesWithNewNamespace(ns2Messages, namespace2)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
136136
}
137137
}
138138

139-
protected fun compareBooleanValues(
139+
protected open fun compareBooleanValues(
140140
firstBooleanValue: String,
141141
secondBooleanValue: String
142142
): Boolean {

airbyte-integrations/connectors/destination-mysql-strict-encrypt/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.8.0'
7+
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
data:
22
registries:
33
cloud:
4-
dockerImageTag: 0.2.0
54
enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling.
65
oss:
7-
dockerImageTag: 0.2.0
86
enabled: false # strict encrypt connectors are not used on OSS.
97
connectorSubtype: database
108
connectorType: destination
119
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
12-
dockerImageTag: 0.3.0
10+
dockerImageTag: 0.3.1
1311
dockerRepository: airbyte/destination-mysql-strict-encrypt
1412
githubIssueLabel: destination-mysql
1513
icon: mysql.svg

airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
1818
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
1919
import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
20+
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider;
2021
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator;
2122
import io.airbyte.cdk.integrations.util.HostPortResolver;
2223
import io.airbyte.commons.json.Jsons;
@@ -34,6 +35,7 @@
3435
import java.util.stream.Collectors;
3536
import org.jooq.DSLContext;
3637
import org.jooq.SQLDialect;
38+
import org.junit.jupiter.api.Disabled;
3739
import org.junit.jupiter.api.Test;
3840
import org.testcontainers.containers.MySQLContainer;
3941

@@ -239,6 +241,20 @@ public void testLineBreakCharacters() {
239241
// overrides test with a no-op until we handle full UTF-8 in the destination
240242
}
241243

244+
/**
245+
* Legacy mysql normalization is broken, and uses the FLOAT type for numbers. This rounds off e.g.
246+
* 12345.678 to 12345.7. We can fix this in DV2, but will not fix legacy normalization. As such,
247+
* disabling the test case.
248+
*/
249+
@Override
250+
@Disabled("MySQL normalization uses the wrong datatype for numbers. This will not be fixed, because we intend to replace normalization with DV2.")
251+
public void testDataTypeTestWithNormalization(final String messagesFilename,
252+
final String catalogFilename,
253+
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
254+
throws Exception {
255+
super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
256+
}
257+
242258
protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) {
243259
if (expectedValue.isBoolean()) {
244260
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here

airbyte-integrations/connectors/destination-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.8.0'
7+
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
5-
dockerImageTag: 0.3.0
5+
dockerImageTag: 0.3.1
66
dockerRepository: airbyte/destination-mysql
77
githubIssueLabel: destination-mysql
88
icon: mysql.svg

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

+35-2
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,31 @@
1616
import io.airbyte.cdk.integrations.base.Destination;
1717
import io.airbyte.cdk.integrations.base.IntegrationRunner;
1818
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
19+
import io.airbyte.cdk.integrations.destination.PropertyNameSimplifyingDataTransformer;
20+
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
1921
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
22+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2023
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2124
import io.airbyte.commons.exceptions.ConnectionErrorException;
2225
import io.airbyte.commons.json.Jsons;
2326
import io.airbyte.commons.map.MoreMaps;
27+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
28+
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
29+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
30+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
31+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
2432
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
2533
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
2634
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
35+
import java.util.Collections;
36+
import java.util.List;
2737
import java.util.Map;
2838
import javax.sql.DataSource;
39+
import org.jetbrains.annotations.NotNull;
2940
import org.slf4j.Logger;
3041
import org.slf4j.LoggerFactory;
3142

32-
public class MySQLDestination extends AbstractJdbcDestination implements Destination {
43+
public class MySQLDestination extends AbstractJdbcDestination<MinimumDestinationState> implements Destination {
3344

3445
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class);
3546
public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName();
@@ -129,15 +140,37 @@ public JsonNode toJdbcConfig(final JsonNode config) {
129140
}
130141

131142
@Override
132-
protected JdbcSqlGenerator getSqlGenerator() {
143+
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
133144
throw new UnsupportedOperationException("mysql does not yet support DV2");
134145
}
135146

147+
@Override
148+
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
149+
return new PropertyNameSimplifyingDataTransformer();
150+
}
151+
136152
public static void main(final String[] args) throws Exception {
137153
final Destination destination = MySQLDestination.sshWrappedDestination();
138154
LOGGER.info("starting destination: {}", MySQLDestination.class);
139155
new IntegrationRunner(destination).run(args);
140156
LOGGER.info("completed destination: {}", MySQLDestination.class);
141157
}
142158

159+
@NotNull
160+
@Override
161+
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
162+
@NotNull JdbcDatabase database,
163+
@NotNull String rawTableSchema) {
164+
throw new UnsupportedOperationException("Mysql does not yet support DV2");
165+
}
166+
167+
@NotNull
168+
@Override
169+
protected List<Migration<MinimumDestinationState>> getMigrations(@NotNull JdbcDatabase database,
170+
@NotNull String databaseName,
171+
@NotNull SqlGenerator sqlGenerator,
172+
@NotNull DestinationHandler<MinimumDestinationState> destinationHandler) {
173+
return Collections.emptyList();
174+
}
175+
143176
}

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@
44

55
package io.airbyte.integrations.destination.mysql;
66

7-
import com.fasterxml.jackson.databind.JsonNode;
87
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
98
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
109
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
11-
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
10+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
1211
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
13-
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
1412
import java.io.File;
1513
import java.io.IOException;
1614
import java.nio.file.Files;
@@ -86,11 +84,6 @@ private void loadDataIntoTable(final JdbcDatabase database,
8684
});
8785
}
8886

89-
@Override
90-
protected JsonNode formatData(final JsonNode data) {
91-
return StandardNameTransformer.formatJsonPath(data);
92-
}
93-
9487
void verifyLocalFileEnabled(final JdbcDatabase database) throws SQLException {
9588
final boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database);
9689
if (!localFileEnabled) {

airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.mysql;
66

7+
import static java.util.concurrent.TimeUnit.SECONDS;
78
import static org.junit.jupiter.api.Assertions.assertEquals;
89
import static org.junit.jupiter.api.Assertions.assertTrue;
910

@@ -39,6 +40,7 @@
3940
import org.jooq.SQLDialect;
4041
import org.junit.jupiter.api.Disabled;
4142
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.Timeout;
4244
import org.testcontainers.containers.MySQLContainer;
4345

4446
public class MySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {
@@ -267,6 +269,10 @@ protected void assertSameValue(final JsonNode expectedValue, final JsonNode actu
267269
}
268270
}
269271

272+
// Something is very weird in our connection check code. A wrong password takes >1 minute to return.
273+
// TODO investigate why invalid creds take so long to detect
274+
@Timeout(value = 300,
275+
unit = SECONDS)
270276
@Test
271277
void testCheckIncorrectPasswordFailure() {
272278
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.PASSWORD_KEY, "fake");
@@ -276,6 +282,8 @@ void testCheckIncorrectPasswordFailure() {
276282
assertStringContains(status.getMessage(), "State code: 28000; Error code: 1045;");
277283
}
278284

285+
@Timeout(value = 300,
286+
unit = SECONDS)
279287
@Test
280288
public void testCheckIncorrectUsernameFailure() {
281289
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.USERNAME_KEY, "fake");
@@ -285,6 +293,8 @@ public void testCheckIncorrectUsernameFailure() {
285293
assertStringContains(status.getMessage(), "State code: 28000; Error code: 1045;");
286294
}
287295

296+
@Timeout(value = 300,
297+
unit = SECONDS)
288298
@Test
289299
public void testCheckIncorrectHostFailure() {
290300
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.HOST_KEY, "localhost2");
@@ -294,6 +304,8 @@ public void testCheckIncorrectHostFailure() {
294304
assertStringContains(status.getMessage(), "State code: 08S01;");
295305
}
296306

307+
@Timeout(value = 300,
308+
unit = SECONDS)
297309
@Test
298310
public void testCheckIncorrectPortFailure() {
299311
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.PORT_KEY, "0000");
@@ -303,6 +315,8 @@ public void testCheckIncorrectPortFailure() {
303315
assertStringContains(status.getMessage(), "State code: 08S01;");
304316
}
305317

318+
@Timeout(value = 300,
319+
unit = SECONDS)
306320
@Test
307321
public void testCheckIncorrectDataBaseFailure() {
308322
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.DATABASE_KEY, "wrongdatabase");
@@ -312,6 +326,8 @@ public void testCheckIncorrectDataBaseFailure() {
312326
assertStringContains(status.getMessage(), "State code: 42000; Error code: 1049;");
313327
}
314328

329+
@Timeout(value = 300,
330+
unit = SECONDS)
315331
@Test
316332
public void testUserHasNoPermissionToDataBase() {
317333
executeQuery("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");

0 commit comments

Comments
 (0)