Skip to content

Resumable Full Refresh sync for mssql #37451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 69 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
000d09e
poc pr
xiaohansong Mar 29, 2024
701f879
test
xiaohansong Apr 1, 2024
6961a94
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 1, 2024
3d7e013
merge to head change
xiaohansong Apr 1, 2024
41848ed
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 4, 2024
3907d9d
save work
xiaohansong Apr 4, 2024
9164bfd
save work
xiaohansong Apr 5, 2024
92b71b6
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 5, 2024
651f088
save work
xiaohansong Apr 5, 2024
8183126
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 5, 2024
566e344
cdk change for rfr
xiaohansong Apr 5, 2024
c81fa4c
some clean up
xiaohansong Apr 5, 2024
ff6ce90
update interface
xiaohansong Apr 16, 2024
908d1da
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 16, 2024
760777a
Merge branch 'master' into xiaohan/cdk-rfr-interface
xiaohansong Apr 17, 2024
653c19b
mssql rfr implementation
rodireich Apr 19, 2024
c469c99
mssql rfr implementation
rodireich Apr 19, 2024
4ffe6b2
mssql rfr implementation
rodireich Apr 19, 2024
37fd000
mssql rfr implementation
rodireich Apr 20, 2024
09488ab
mssql rfr implementation
rodireich Apr 20, 2024
4d97b2d
update jdbc
xiaohansong Apr 23, 2024
eee5db6
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 23, 2024
6bd2b75
format
xiaohansong Apr 23, 2024
12997ec
Merge branch 'master' into xiaohan/cdk-rfr-interface
xiaohansong Apr 23, 2024
137dd86
Merge branch 'xiaohan/cdk-rfr-interface' into rodi/5th-7016
rodireich Apr 23, 2024
1d60fe6
test
rodireich Apr 23, 2024
f98f02f
mssql rfr
rodireich Apr 24, 2024
470e432
mssql rfr
rodireich Apr 26, 2024
8f118ad
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 26, 2024
90b9ca8
mssql rfr
rodireich Apr 28, 2024
ed4e7fe
Merge branch 'xiaohan/cdk-rfr-interface' into rodi/5th-7016
rodireich Apr 28, 2024
7cca19e
mssql rfr
rodireich Apr 28, 2024
0d8d3f8
mssql rfr
rodireich Apr 29, 2024
266ff96
mssql rfr
rodireich Apr 29, 2024
9a118ff
mssql rfr
rodireich Apr 29, 2024
34f4d47
mssql rfr
rodireich Apr 29, 2024
da2fbe1
mssql rfr
rodireich Apr 29, 2024
7e2e66e
mssql rfr
rodireich Apr 29, 2024
730b64c
mssql rfr
rodireich Apr 29, 2024
ccf2a3c
mssql rfr
rodireich Apr 29, 2024
f22f33c
mssql rfr
rodireich Apr 29, 2024
cb24220
mssql rfr
rodireich Apr 29, 2024
886c348
mssql rfr
rodireich Apr 29, 2024
d1800e5
mssql rfr
rodireich Apr 29, 2024
6e9299b
mssql rfr
rodireich Apr 29, 2024
05f470e
mssql rfr
rodireich Apr 29, 2024
e78b56e
mssql rfr
rodireich Apr 30, 2024
984c8a4
mssql rfr
rodireich Apr 30, 2024
18843f2
mssql rfr
rodireich Apr 30, 2024
3f04061
mssql rfr
rodireich Apr 30, 2024
d325718
mssql rfr
rodireich Apr 30, 2024
b25a0d6
mssql rfr
rodireich Apr 30, 2024
22aab75
mssql rfr
rodireich Apr 30, 2024
4462470
mssql rfr
rodireich Apr 30, 2024
f3b887d
mssql rfr
rodireich Apr 30, 2024
39f4b2d
mssql rfr
rodireich Apr 30, 2024
ca257d1
mssql rfr
rodireich Apr 30, 2024
afbb193
mssql rfr
rodireich Apr 30, 2024
b94f8e1
mssql rfr
rodireich Apr 30, 2024
2868a08
code sanity
rodireich May 2, 2024
0e4ccb5
Merge branch 'master' into rodi/5th-7016
rodireich May 3, 2024
3a941b8
mssql rfr
rodireich May 3, 2024
b337bd4
Merge branch 'master' into rodi/5th-7016
rodireich May 5, 2024
9fefb7a
mssql rfr
rodireich May 5, 2024
f9ba78c
mssql rfr
rodireich May 5, 2024
468972b
mssql rfr
rodireich May 5, 2024
dd61263
mssql rfr
rodireich May 6, 2024
7e8a773
mssql rfr
rodireich May 7, 2024
8b22e62
mssql rfr
rodireich May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import io.airbyte.cdk.integrations.base.Source
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
Expand All @@ -54,6 +55,7 @@ import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand All @@ -62,6 +64,7 @@ import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
Expand All @@ -84,7 +87,7 @@ import org.slf4j.LoggerFactory
abstract class AbstractJdbcSource<Datatype>(
driverClass: String,
@JvmField val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
sourceOperations: JdbcCompatibleSourceOperations<Datatype>
sourceOperations: JdbcCompatibleSourceOperations<Datatype>,
) : AbstractDbSource<Datatype, JdbcDatabase>(driverClass), Source {
@JvmField val sourceOperations: JdbcCompatibleSourceOperations<Datatype>

Expand All @@ -95,6 +98,55 @@ abstract class AbstractJdbcSource<Datatype>(
this.sourceOperations = sourceOperations
}

open fun supportResumableFullRefresh(airbyteStream: ConfiguredAirbyteStream): Boolean {
return false
}

open fun getInitialLoadHandler(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?
): InitialLoadHandler<Datatype>? {
return null
}

override fun getFullRefreshStream(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<Datatype>>,
emittedAt: Instant,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<AirbyteMessage> {
if (supportResumableFullRefresh(airbyteStream) && syncMode == SyncMode.FULL_REFRESH) {
val initialLoadHandler =
getInitialLoadHandler(database, airbyteStream, catalog, stateManager)
?: throw IllegalStateException(
"Must provide initialLoadHandler for resumable full refresh."
)
return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
}

// If flag is off, fall back to legacy non-resumable refresh
return super.getFullRefreshStream(
database,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
emittedAt,
syncMode,
cursorField,
)
}

override fun queryTableFullRefresh(
database: JdbcDatabase,
columnNames: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ protected constructor(driverClassName: String) :
this.getAirbyteType(columnType)
}

initializeForStateManager(database, catalog, fullyQualifiedTableNameToInfo, stateManager)

val incrementalIterators =
getIncrementalIterators(
database,
Expand Down Expand Up @@ -188,6 +190,15 @@ protected constructor(driverClassName: String) :
}
}

// Optional - perform any initialization logic before read. For example, source connector
// can choose to load up state manager here.
protected open fun initializeForStateManager(
database: Database,
catalog: ConfiguredAirbyteCatalog,
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
stateManager: StateManager
) {}

@Throws(SQLException::class)
protected fun validateCursorFieldForIncrementalTables(
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
Expand Down Expand Up @@ -380,7 +391,14 @@ protected constructor(driverClassName: String) :

val table = tableNameToTable[fullyQualifiedTableName]!!
val tableReadIterator =
createReadIterator(database, airbyteStream, table, stateManager, emittedAt)
createReadIterator(
database,
airbyteStream,
catalog,
table,
stateManager,
emittedAt
)
iteratorList.add(tableReadIterator)
}
}
Expand All @@ -401,6 +419,7 @@ protected constructor(driverClassName: String) :
private fun createReadIterator(
database: Database,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
table: TableInfo<CommonField<DataType>>,
stateManager: StateManager?,
emittedAt: Instant
Expand Down Expand Up @@ -442,7 +461,9 @@ protected constructor(driverClassName: String) :
airbyteMessageIterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -475,7 +496,9 @@ protected constructor(driverClassName: String) :
iterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -560,18 +583,22 @@ protected constructor(driverClassName: String) :
* Creates a AirbyteMessageIterator that contains all records for a database source connection
*
* @param database Source Database
* @param streamName name of an individual stream in which a stream represents a source (e.g.
* @param airbyteStream name of an individual stream in which a stream represents a source (e.g.
* API endpoint or database table)
* @param catalog List of streams (e.g. database tables or API endpoints) with settings on sync
* @param stateManager tracking the state from previous sync; used for resumable full refresh.
* @param namespace Namespace of the database (e.g. public)
* @param selectedDatabaseFields List of all interested database column names
* @param table information in tabular format
* @param emittedAt Time when data was emitted from the Source database
* @param syncMode The sync mode that this full refresh stream should be associated with.
* @return AirbyteMessageIterator with all records for a database source
*/
private fun getFullRefreshStream(
protected open fun getFullRefreshStream(
database: Database,
streamName: String,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<DataType>>,
Expand All @@ -588,7 +615,12 @@ protected constructor(driverClassName: String) :
syncMode,
cursorField
)
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli())
return getMessageIterator(
queryStream,
airbyteStream.stream.name,
namespace,
emittedAt.toEpochMilli()
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb

import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.time.Instant

interface InitialLoadHandler<T> {
fun getIteratorForStream(
airbyteStream: ConfiguredAirbyteStream,
table: TableInfo<CommonField<T>>,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage>
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class DefaultJdbcSourceAcceptanceTest :
JdbcSourceAcceptanceTest<
DefaultJdbcSourceAcceptanceTest.PostgresTestSource, BareBonesTestDatabase>() {
override fun config(): JsonNode {
return testdb.testConfigBuilder().build()
return testdb?.testConfigBuilder()?.build()!!
}

override fun source(): PostgresTestSource {
Expand Down Expand Up @@ -181,12 +181,15 @@ internal class DefaultJdbcSourceAcceptanceTest :
fun testCustomParametersOverwriteDefaultParametersExpectException() {
val connectionPropertiesUrl = "ssl=false"
val config =
getConfigWithConnectionProperties(
PSQL_CONTAINER,
testdb.databaseName,
connectionPropertiesUrl
)
val customParameters = parseJdbcParameters(config, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
testdb?.let {
getConfigWithConnectionProperties(
PSQL_CONTAINER,
it.databaseName,
connectionPropertiesUrl
)
}
val customParameters =
parseJdbcParameters(config!!, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
val defaultParameters = mapOf("ssl" to "true", "sslmode" to "require")
Assertions.assertThrows(IllegalArgumentException::class.java) {
JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters(
Expand Down
Loading
Loading