Skip to content

Commit 3c35466

Browse files
authored
MySQL to adapt to new cdk (#36742)
1 parent 887a585 commit 3c35466

File tree

29 files changed

+76
-510
lines changed

29 files changed

+76
-510
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,9 @@ constructor(
3636
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
3737
): List<T> {
3838
dataSource.connection.use { connection ->
39-
JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
40-
.use { results ->
41-
return results.toList()
42-
}
39+
toUnsafeStream<T>(query.apply(connection), recordTransform).use { results ->
40+
return results.toList()
41+
}
4342
}
4443
}
4544

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/streaming/AdaptiveStreamingQueryConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import java.sql.*
77
import org.slf4j.Logger
88
import org.slf4j.LoggerFactory
99

10-
class AdaptiveStreamingQueryConfig : JdbcStreamingQueryConfig {
10+
open class AdaptiveStreamingQueryConfig : JdbcStreamingQueryConfig {
1111
private val fetchSizeEstimator: FetchSizeEstimator = TwoStageSizeEstimator.Companion.instance
1212
private var currentFetchSize: Int
1313

Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.29.3
1+
version=0.29.4

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected constructor(val container: C) : AutoCloseable {
133133
return name + suffix
134134
}
135135

136-
val databaseName: String
136+
open val databaseName: String
137137
get() = withNamespace("db")
138138

139139
val userName: String

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ abstract class DebeziumPropertiesManager(
9393
const val NAME_KEY: String = "name"
9494
const val TOPIC_PREFIX_KEY: String = "topic.prefix"
9595

96+
@JvmStatic
9697
fun sanitizeTopicPrefix(topicName: String): String {
9798
val sanitizedNameBuilder = StringBuilder(topicName.length)
9899
var changed = false

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RecordWaitTimeUtil.kt

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ object RecordWaitTimeUtil {
1717
val DEFAULT_FIRST_RECORD_WAIT_TIME: Duration = Duration.ofMinutes(5)
1818
val DEFAULT_SUBSEQUENT_RECORD_WAIT_TIME: Duration = Duration.ofMinutes(1)
1919

20+
@JvmStatic
2021
fun checkFirstRecordWaitTime(config: JsonNode) {
2122
// we need to skip the check because in tests, we set initial_waiting_seconds
2223
// to 5 seconds for performance reasons, which is shorter than the minimum

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

+1
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ abstract class AbstractJdbcSource<Datatype>(
725725
* @return a map by StreamName to associated list of primary keys
726726
*/
727727
@VisibleForTesting
728+
@JvmStatic
728729
fun aggregatePrimateKeys(
729730
entries: List<PrimaryKeyAttributesFromDb>
730731
): Map<String, MutableList<String>> {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object DbSourceDiscoverUtil {
2929
* underlying table schema changed between syncs (ii) The source connector's mapping of datatypes to
3030
* Airbyte types changed between runs
3131
*/
32+
@JvmStatic
3233
fun <DataType> logSourceSchemaChange(
3334
fullyQualifiedTableNameToInfo: Map<String?, TableInfo<CommonField<DataType>>>,
3435
catalog: ConfiguredAirbyteCatalog,

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ object RelationalDbQueryUtils {
5454
}
5555

5656
/** @return fully qualified table name with the schema (if a schema exists) without quotes. */
57+
@JvmStatic
5758
fun getFullyQualifiedTableName(schemaName: String?, tableName: String): String {
5859
return if (schemaName != null) "$schemaName.$tableName" else tableName
5960
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ object StateGeneratorUtils {
266266
* @Param supportedStateType the [AirbyteStateType] supported by this connector.
267267
* @return The deserialized object representation of the state.
268268
*/
269+
@JvmStatic
269270
fun deserializeInitialState(
270271
initialStateJson: JsonNode?,
271272
supportedStateType: AirbyteStateMessage.AirbyteStateType

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+8-7
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ import org.slf4j.LoggerFactory
2727
abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
2828
@JvmField protected var testdb: T = createTestDatabase()
2929

30-
protected fun createTableSqlFmt(): String {
30+
protected open fun createTableSqlFmt(): String {
3131
return "CREATE TABLE %s.%s(%s);"
3232
}
3333

34-
protected fun createSchemaSqlFmt(): String {
34+
protected open fun createSchemaSqlFmt(): String {
3535
return "CREATE SCHEMA %s;"
3636
}
3737

38-
protected fun modelsSchema(): String {
38+
protected open fun modelsSchema(): String {
3939
return "models_schema"
4040
}
4141

4242
/** The schema of a random table which is used as a new table in snapshot test */
43-
protected fun randomSchema(): String {
43+
protected open fun randomSchema(): String {
4444
return "models_schema_random"
4545
}
4646

@@ -203,7 +203,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
203203
writeRecords(recordJson, modelsSchema(), MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL)
204204
}
205205

206-
protected fun writeRecords(
206+
protected open fun writeRecords(
207207
recordJson: JsonNode,
208208
dbName: String?,
209209
streamName: String?,
@@ -224,15 +224,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
224224
)
225225
}
226226

227-
protected fun deleteMessageOnIdCol(streamName: String?, idCol: String?, idValue: Int) {
227+
protected open fun deleteMessageOnIdCol(streamName: String?, idCol: String?, idValue: Int) {
228228
testdb!!.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), streamName, idCol, idValue)
229229
}
230230

231231
protected open fun deleteCommand(streamName: String?) {
232232
testdb!!.with("DELETE FROM %s.%s", modelsSchema(), streamName)
233233
}
234234

235-
protected fun updateCommand(
235+
protected open fun updateCommand(
236236
streamName: String?,
237237
modelCol: String?,
238238
modelVal: String?,
@@ -1084,6 +1084,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
10841084
}
10851085
.toList()
10861086

1087+
@JvmStatic
10871088
protected fun removeDuplicates(
10881089
messages: Set<AirbyteRecordMessage>
10891090
): Set<AirbyteRecordMessage> {

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
190190

191191
@Test
192192
@Throws(Exception::class)
193-
fun testSpec() {
193+
open fun testSpec() {
194194
val actual = source()!!.spec()
195195
val resourceString = MoreResources.readResource("spec.json")
196196
val expected = Jsons.deserialize(resourceString, ConnectorSpecification::class.java)
@@ -1342,7 +1342,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
13421342
.collect(Collectors.toList())
13431343
}
13441344

1345-
protected fun createState(states: List<DbStreamState>): List<AirbyteStateMessage> {
1345+
protected open fun createState(states: List<DbStreamState>): List<AirbyteStateMessage> {
13461346
return states
13471347
.stream()
13481348
.map { s: DbStreamState ->

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/AutoCloseableIterators.kt

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ object AutoCloseableIterators {
225225
}
226226

227227
@SafeVarargs
228+
@JvmStatic
228229
fun <T> concatWithEagerClose(
229230
airbyteStreamStatusConsumer: Consumer<AirbyteStreamStatusHolder>?,
230231
vararg iterators: AutoCloseableIterator<T>

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.23.8'
9+
cdkVersionRequired = '0.29.4'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.13
12+
dockerImageTag: 3.3.14
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ private static List<JsonNode> getTableEstimate(final JdbcDatabase database, fina
212212
// Construct the table estimate query.
213213
final String tableEstimateQuery =
214214
String.format(TABLE_ESTIMATE_QUERY, TABLE_SIZE_BYTES_COL, AVG_ROW_LENGTH, namespace, name);
215+
LOGGER.info("Querying for table size estimate: {}", tableEstimateQuery);
215216
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
216217
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
217218
Preconditions.checkState(jsonNodes.size() == 1);

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
401401
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
402402
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
403403
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
404-
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, quoteString);
404+
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, getQuoteString());
405405
final CursorBasedStreams cursorBasedStreams =
406406
new CursorBasedStreams(MySqlInitialReadUtil.identifyStreamsForCursorBased(catalog, initialLoadStreams.streamsForInitialLoad()),
407407
pairToCursorBasedStatus);
@@ -411,7 +411,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
411411

412412
final MySqlInitialLoadStreamStateManager mySqlInitialLoadStreamStateManager =
413413
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
414-
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString));
414+
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
415415
final MySqlInitialLoadHandler initialLoadHandler =
416416
new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), mySqlInitialLoadStreamStateManager,
417417
namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair))),
@@ -502,7 +502,7 @@ private boolean convertToBoolean(final String value) {
502502
}
503503

504504
private boolean cloudDeploymentMode() {
505-
return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(featureFlags.deploymentMode());
505+
return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(getFeatureFlags().deploymentMode());
506506
}
507507

508508
@Override
@@ -539,7 +539,7 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept
539539
sourceOperations,
540540
streamingQueryConfigProvider);
541541

542-
quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
542+
setQuoteString((getQuoteString() == null ? database.getMetaData().getIdentifierQuoteString() : getQuoteString()));
543543
database.setSourceConfig(sourceConfig);
544544
database.setDatabaseConfig(jdbcConfig);
545545
return database;

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
282282

283283
assert !offset.isEmpty();
284284
assert Objects.nonNull(schemaHistory);
285-
assert Objects.nonNull(schemaHistory.schema());
285+
assert Objects.nonNull(schemaHistory.getSchema());
286286

287287
final JsonNode asJson = serialize(offset, schemaHistory);
288288
LOGGER.info("Initial Debezium state constructed: {}", asJson);
@@ -296,7 +296,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
296296
public static JsonNode serialize(final Map<String, String> offset, final SchemaHistory dbHistory) {
297297
final Map<String, Object> state = new HashMap<>();
298298
state.put(MysqlCdcStateConstants.MYSQL_CDC_OFFSET, offset);
299-
state.put(MysqlCdcStateConstants.MYSQL_DB_HISTORY, dbHistory.schema());
299+
state.put(MysqlCdcStateConstants.MYSQL_DB_HISTORY, dbHistory.getSchema());
300300
state.put(MysqlCdcStateConstants.IS_COMPRESSED, dbHistory.isCompressed());
301301

302302
return Jsons.jsonNode(state);

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44

55
package io.airbyte.integrations.source.mysql.initialsync;
66

7-
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
8-
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
9-
107
import com.fasterxml.jackson.databind.JsonNode;
118
import com.google.common.collect.AbstractIterator;
129
import com.mysql.cj.MysqlType;
@@ -123,7 +120,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {
123120
final String tableName = pair.getName();
124121
final String schemaName = pair.getNamespace();
125122
LOGGER.info("Preparing query for table: {}", tableName);
126-
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
123+
final String fullTableName = RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
127124
quoteString);
128125

129126
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
@@ -132,7 +129,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {
132129

133130
if (pkLoadStatus == null) {
134131
LOGGER.info("pkLoadStatus is null");
135-
final String quotedCursorField = enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
132+
final String quotedCursorField = RelationalDbQueryUtils.enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
136133
final String sql;
137134
// We cannot load in chunks for a composite key load, since each field might not have distinct
138135
// values.
@@ -148,7 +145,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {
148145
return preparedStatement;
149146
} else {
150147
LOGGER.info("pkLoadStatus value is : {}", pkLoadStatus.getPkVal());
151-
final String quotedCursorField = enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
148+
final String quotedCursorField = RelationalDbQueryUtils.enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
152149
final String sql;
153150
// We cannot load in chunks for a composite key load, since each field might not have distinct
154151
// values. Furthermore, we have to issue a >=

airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshMySqlSourceAcceptanceTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
6363
.withCursorField(Lists.newArrayList("id"))
6464
.withDestinationSyncMode(DestinationSyncMode.APPEND)
6565
.withStream(CatalogHelpers.createAirbyteStream(
66-
String.format("%s.%s", config.get(JdbcUtils.DATABASE_KEY).asText(), STREAM_NAME),
66+
String.format("%s", STREAM_NAME),
6767
Field.of("id", JsonSchemaType.NUMBER),
6868
Field.of("name", JsonSchemaType.STRING))
6969
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
@@ -72,7 +72,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
7272
.withCursorField(Lists.newArrayList("id"))
7373
.withDestinationSyncMode(DestinationSyncMode.APPEND)
7474
.withStream(CatalogHelpers.createAirbyteStream(
75-
String.format("%s.%s", config.get(JdbcUtils.DATABASE_KEY).asText(), STREAM_NAME2),
75+
String.format("%s", STREAM_NAME2),
76+
config.get(JdbcUtils.DATABASE_KEY).asText(),
7677
Field.of("id", JsonSchemaType.NUMBER),
7778
Field.of("name", JsonSchemaType.STRING))
7879
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));

airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
7575
.withCursorField(Lists.newArrayList("id"))
7676
.withDestinationSyncMode(DestinationSyncMode.APPEND)
7777
.withStream(CatalogHelpers.createAirbyteStream(
78-
String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME),
78+
String.format("%s", STREAM_NAME), testdb.getDatabaseName(),
7979
Field.of("id", JsonSchemaType.NUMBER),
8080
Field.of("name", JsonSchemaType.STRING))
8181
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
@@ -84,7 +84,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
8484
.withCursorField(Lists.newArrayList("id"))
8585
.withDestinationSyncMode(DestinationSyncMode.APPEND)
8686
.withStream(CatalogHelpers.createAirbyteStream(
87-
String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME2),
87+
String.format("%s", STREAM_NAME2),
88+
testdb.getDatabaseName(),
8889
Field.of("id", JsonSchemaType.NUMBER),
8990
Field.of("name", JsonSchemaType.STRING))
9091
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));

0 commit comments

Comments
 (0)