Skip to content

MySQL to adapt to new cdk #36742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ constructor(
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
): List<T> {
dataSource.connection.use { connection ->
JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
.use { results ->
return results.toList()
}
toUnsafeStream<T>(query.apply(connection), recordTransform).use { results ->
return results.toList()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.sql.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class AdaptiveStreamingQueryConfig : JdbcStreamingQueryConfig {
open class AdaptiveStreamingQueryConfig : JdbcStreamingQueryConfig {
private val fetchSizeEstimator: FetchSizeEstimator = TwoStageSizeEstimator.Companion.instance
private var currentFetchSize: Int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import java.io.*
import java.util.*

object SshHelpers {
@get:Throws(IOException::class)
val specAndInjectSsh: ConnectorSpecification?
get() = getSpecAndInjectSsh(Optional.empty())

@JvmStatic
@Throws(IOException::class)
fun getSpecAndInjectSsh(): ConnectorSpecification? {
return getSpecAndInjectSsh(Optional.empty())
}

@JvmStatic
@Throws(IOException::class)
fun getSpecAndInjectSsh(group: Optional<String>): ConnectorSpecification? {
val originalSpec =
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected constructor(val container: C) : AutoCloseable {
}

val isInitialized: Boolean
get() = dslContext != null
get() = this::dslContext.isInitialized

protected abstract fun inContainerBootstrapCmd(): Stream<Stream<String>>

Expand All @@ -131,7 +131,7 @@ protected constructor(val container: C) : AutoCloseable {
return name + suffix
}

val databaseName: String
open val databaseName: String
get() = withNamespace("db")

val userName: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class AirbyteSchemaHistoryStorage(
return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB)
}

@JvmStatic
fun initializeDBHistory(
schemaHistory: SchemaHistory<Optional<JsonNode>>?,
compressSchemaHistoryForState: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ abstract class DebeziumPropertiesManager(
const val NAME_KEY: String = "name"
const val TOPIC_PREFIX_KEY: String = "topic.prefix"

@JvmStatic
fun sanitizeTopicPrefix(topicName: String): String {
val sanitizedNameBuilder = StringBuilder(topicName.length)
var changed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object RecordWaitTimeUtil {
val DEFAULT_FIRST_RECORD_WAIT_TIME: Duration = Duration.ofMinutes(5)
val DEFAULT_SUBSEQUENT_RECORD_WAIT_TIME: Duration = Duration.ofMinutes(1)

@JvmStatic
fun checkFirstRecordWaitTime(config: JsonNode) {
// we need to skip the check because in tests, we set initial_waiting_seconds
// to 5 seconds for performance reasons, which is shorter than the minimum
Expand All @@ -41,6 +42,7 @@ object RecordWaitTimeUtil {
}
}

@JvmStatic
fun getFirstRecordWaitTime(config: JsonNode): Duration {
val isTest = config.has("is_test") && config["is_test"].asBoolean()
var firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME
Expand All @@ -67,6 +69,7 @@ object RecordWaitTimeUtil {
return firstRecordWaitTime
}

@JvmStatic
fun getSubsequentRecordWaitTime(config: JsonNode): Duration {
var subsequentRecordWaitTime = DEFAULT_SUBSEQUENT_RECORD_WAIT_TIME
val isTest = config.has("is_test") && config["is_test"].asBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ abstract class AbstractJdbcSource<Datatype>(
* @return a map by StreamName to associated list of primary keys
*/
@VisibleForTesting
@JvmStatic
fun aggregatePrimateKeys(
entries: List<PrimaryKeyAttributesFromDb>
): Map<String, MutableList<String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object DbSourceDiscoverUtil {
* underlying table schema changed between syncs (ii) The source connector's mapping of datatypes to
* Airbyte types changed between runs
*/
@JvmStatic
fun <DataType> logSourceSchemaChange(
fullyQualifiedTableNameToInfo: Map<String?, TableInfo<CommonField<DataType>>>,
catalog: ConfiguredAirbyteCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory
object RelationalDbQueryUtils {
private val LOGGER: Logger = LoggerFactory.getLogger(RelationalDbQueryUtils::class.java)

@JvmStatic
fun getIdentifierWithQuoting(identifier: String, quoteString: String): String {
// double-quoted values within a database name or column name should be wrapped with extra
// quoteString
Expand Down Expand Up @@ -53,11 +54,13 @@ object RelationalDbQueryUtils {
}

/** @return fully qualified table name with the schema (if a schema exists) without quotes. */
@JvmStatic
fun getFullyQualifiedTableName(schemaName: String?, tableName: String): String {
return if (schemaName != null) "$schemaName.$tableName" else tableName
}

/** @return the input identifier with quotes. */
@JvmStatic
fun enquoteIdentifier(identifier: String?, quoteString: String?): String {
return quoteString + identifier + quoteString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ object StateGeneratorUtils {
* @Param supportedStateType the [AirbyteStateType] supported by this connector.
* @return The deserialized object representation of the state.
*/
@JvmStatic
fun deserializeInitialState(
initialStateJson: JsonNode?,
supportedStateType: AirbyteStateMessage.AirbyteStateType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import org.slf4j.LoggerFactory
abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
protected var testdb: T = createTestDatabase()

protected fun createTableSqlFmt(): String {
open protected fun createTableSqlFmt(): String {
return "CREATE TABLE %s.%s(%s);"
}

protected fun createSchemaSqlFmt(): String {
open protected fun createSchemaSqlFmt(): String {
return "CREATE SCHEMA %s;"
}

protected fun modelsSchema(): String {
open protected fun modelsSchema(): String {
return "models_schema"
}

/** The schema of a random table which is used as a new table in snapshot test */
protected fun randomSchema(): String {
open protected fun randomSchema(): String {
return "models_schema_random"
}

Expand Down Expand Up @@ -203,7 +203,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
writeRecords(recordJson, modelsSchema(), MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL)
}

protected fun writeRecords(
open protected fun writeRecords(
recordJson: JsonNode,
dbName: String?,
streamName: String?,
Expand All @@ -224,15 +224,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
)
}

protected fun deleteMessageOnIdCol(streamName: String?, idCol: String?, idValue: Int) {
open protected fun deleteMessageOnIdCol(streamName: String?, idCol: String?, idValue: Int) {
testdb!!.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), streamName, idCol, idValue)
}

protected fun deleteCommand(streamName: String?) {
open protected fun deleteCommand(streamName: String?) {
testdb!!.with("DELETE FROM %s.%s", modelsSchema(), streamName)
}

protected fun updateCommand(
open protected fun updateCommand(
streamName: String?,
modelCol: String?,
modelVal: String?,
Expand Down Expand Up @@ -1079,6 +1079,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
}
.toList()

@JvmStatic
protected fun removeDuplicates(
messages: Set<AirbyteRecordMessage>
): Set<AirbyteRecordMessage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

@Test
@Throws(Exception::class)
fun testSpec() {
open fun testSpec() {
val actual = source()!!.spec()
val resourceString = MoreResources.readResource("spec.json")
val expected = Jsons.deserialize(resourceString, ConnectorSpecification::class.java)
Expand Down Expand Up @@ -1312,7 +1312,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
)
)

protected fun createExpectedTestMessages(
open protected fun createExpectedTestMessages(
states: List<DbStreamState>,
numRecords: Long
): List<AirbyteMessage> {
Expand Down Expand Up @@ -1342,7 +1342,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
.collect(Collectors.toList())
}

protected fun createState(states: List<DbStreamState>): List<AirbyteStateMessage> {
open protected fun createState(states: List<DbStreamState>): List<AirbyteStateMessage> {
return states
.stream()
.map { s: DbStreamState ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() {
*/
@Test
@Throws(Exception::class)
fun testDataContent() {
open fun testDataContent() {
// Class used to make easier the error reporting
class MissedRecords( // Stream that is missing any value
var streamName:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ object AutoCloseableIterators {
}

@SafeVarargs
@JvmStatic
fun <T> concatWithEagerClose(
airbyteStreamStatusConsumer: Consumer<AirbyteStreamStatusHolder>?,
vararg iterators: AutoCloseableIterator<T>
Expand All @@ -232,6 +233,7 @@ object AutoCloseableIterators {
}

@SafeVarargs
@JvmStatic
fun <T> concatWithEagerClose(vararg iterators: AutoCloseableIterator<T>): CompositeIterator<T> {
return concatWithEagerClose(java.util.List.of(*iterators), null)
}
Expand All @@ -245,13 +247,15 @@ object AutoCloseableIterators {
* @return A [CompositeIterator].
* @param <T> The type of data contained in each iterator. </T>
*/
@JvmStatic
fun <T> concatWithEagerClose(
iterators: List<AutoCloseableIterator<T>>,
airbyteStreamStatusConsumer: Consumer<AirbyteStreamStatusHolder>?
): CompositeIterator<T> {
return CompositeIterator(iterators, airbyteStreamStatusConsumer)
}

@JvmStatic
fun <T> concatWithEagerClose(iterators: List<AutoCloseableIterator<T>>): CompositeIterator<T> {
return concatWithEagerClose(iterators, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.23.8'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private static List<JsonNode> getTableEstimate(final JdbcDatabase database, fina
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, TABLE_SIZE_BYTES_COL, AVG_ROW_LENGTH, namespace, name);
LOGGER.info("Querying for table size estimate: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, quoteString);
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, getQuoteString());
final CursorBasedStreams cursorBasedStreams =
new CursorBasedStreams(MySqlInitialReadUtil.identifyStreamsForCursorBased(catalog, initialLoadStreams.streamsForInitialLoad()),
pairToCursorBasedStatus);
Expand All @@ -411,7 +411,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final

final MySqlInitialLoadStreamStateManager mySqlInitialLoadStreamStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString));
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
final MySqlInitialLoadHandler initialLoadHandler =
new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), mySqlInitialLoadStreamStateManager,
namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair))),
Expand Down Expand Up @@ -502,7 +502,7 @@ private boolean convertToBoolean(final String value) {
}

private boolean cloudDeploymentMode() {
return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(featureFlags.deploymentMode());
return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(getFeatureFlags().deploymentMode());
}

@Override
Expand Down Expand Up @@ -539,7 +539,7 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept
sourceOperations,
streamingQueryConfigProvider);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
setQuoteString((getQuoteString() == null ? database.getMetaData().getIdentifierQuoteString() : getQuoteString()));
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(jdbcConfig);
return database;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,

assert !offset.isEmpty();
assert Objects.nonNull(schemaHistory);
assert Objects.nonNull(schemaHistory.schema());
assert Objects.nonNull(schemaHistory.getSchema());

final JsonNode asJson = serialize(offset, schemaHistory);
LOGGER.info("Initial Debezium state constructed: {}", asJson);
Expand All @@ -296,7 +296,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
public static JsonNode serialize(final Map<String, String> offset, final SchemaHistory dbHistory) {
final Map<String, Object> state = new HashMap<>();
state.put(MysqlCdcStateConstants.MYSQL_CDC_OFFSET, offset);
state.put(MysqlCdcStateConstants.MYSQL_DB_HISTORY, dbHistory.schema());
state.put(MysqlCdcStateConstants.MYSQL_DB_HISTORY, dbHistory.getSchema());
state.put(MysqlCdcStateConstants.IS_COMPRESSED, dbHistory.isCompressed());

return Jsons.jsonNode(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

package io.airbyte.integrations.source.mysql.initialsync;

import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.mysql.cj.MysqlType;
Expand Down Expand Up @@ -123,7 +120,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {
final String tableName = pair.getName();
final String schemaName = pair.getNamespace();
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
final String fullTableName = RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
quoteString);

final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
Expand All @@ -132,7 +129,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {

if (pkLoadStatus == null) {
LOGGER.info("pkLoadStatus is null");
final String quotedCursorField = enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
final String quotedCursorField = RelationalDbQueryUtils.enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
final String sql;
// We cannot load in chunks for a composite key load, since each field might not have distinct
// values.
Expand All @@ -148,7 +145,7 @@ private PreparedStatement getPkPreparedStatement(final Connection connection) {
return preparedStatement;
} else {
LOGGER.info("pkLoadStatus value is : {}", pkLoadStatus.getPkVal());
final String quotedCursorField = enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
final String quotedCursorField = RelationalDbQueryUtils.enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
final String sql;
// We cannot load in chunks for a composite key load, since each field might not have distinct
// values. Furthermore, we have to issue a >=
Expand Down
Loading
Loading