Skip to content

Commit b9dc205

Browse files
Mysql to emit stream initial loader (#36932)
Co-authored-by: Aaron ("AJ") Steers <[email protected]>
1 parent 7c0a6c5 commit b9dc205

File tree

20 files changed

+1029
-276
lines changed

20 files changed

+1029
-276
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
177+
| 0.31.8 | 2024-05-03 | [\#36932](https://github.com/airbytehq/airbyte/pull/36932) | CDK changes on resumable full refresh |
177178
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
178179
| 0.31.6 | 2024-05-02 | [\#37746](https://github.com/airbytehq/airbyte/pull/37746) | debuggability improvements. |
179180
| 0.31.5 | 2024-04-30 | [\#37758](https://github.com/airbytehq/airbyte/pull/37758) | Set debezium max retries to zero |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.31.7
1+
version=0.31.8

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

+59-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import io.airbyte.cdk.integrations.base.Source
4242
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
4343
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
4444
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
45+
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
4546
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
4647
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
4748
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
@@ -54,6 +55,7 @@ import io.airbyte.commons.util.AutoCloseableIterator
5455
import io.airbyte.commons.util.AutoCloseableIterators
5556
import io.airbyte.protocol.models.CommonField
5657
import io.airbyte.protocol.models.JsonSchemaType
58+
import io.airbyte.protocol.models.v0.AirbyteMessage
5759
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
5860
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
5961
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
@@ -62,6 +64,7 @@ import java.sql.Connection
6264
import java.sql.PreparedStatement
6365
import java.sql.ResultSet
6466
import java.sql.SQLException
67+
import java.time.Instant
6568
import java.util.*
6669
import java.util.function.Consumer
6770
import java.util.function.Function
@@ -84,7 +87,7 @@ import org.slf4j.LoggerFactory
8487
abstract class AbstractJdbcSource<Datatype>(
8588
driverClass: String,
8689
@JvmField val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
87-
sourceOperations: JdbcCompatibleSourceOperations<Datatype>
90+
sourceOperations: JdbcCompatibleSourceOperations<Datatype>,
8891
) : AbstractDbSource<Datatype, JdbcDatabase>(driverClass), Source {
8992
@JvmField val sourceOperations: JdbcCompatibleSourceOperations<Datatype>
9093

@@ -95,6 +98,61 @@ abstract class AbstractJdbcSource<Datatype>(
9598
this.sourceOperations = sourceOperations
9699
}
97100

101+
open fun supportResumableFullRefresh(
102+
database: JdbcDatabase,
103+
airbyteStream: ConfiguredAirbyteStream
104+
): Boolean {
105+
return false
106+
}
107+
108+
open fun getInitialLoadHandler(
109+
database: JdbcDatabase,
110+
airbyteStream: ConfiguredAirbyteStream,
111+
catalog: ConfiguredAirbyteCatalog?,
112+
stateManager: StateManager?
113+
): InitialLoadHandler<Datatype>? {
114+
return null
115+
}
116+
117+
override fun getFullRefreshStream(
118+
database: JdbcDatabase,
119+
airbyteStream: ConfiguredAirbyteStream,
120+
catalog: ConfiguredAirbyteCatalog?,
121+
stateManager: StateManager?,
122+
namespace: String,
123+
selectedDatabaseFields: List<String>,
124+
table: TableInfo<CommonField<Datatype>>,
125+
emittedAt: Instant,
126+
syncMode: SyncMode,
127+
cursorField: Optional<String>
128+
): AutoCloseableIterator<AirbyteMessage> {
129+
if (
130+
supportResumableFullRefresh(database, airbyteStream) &&
131+
syncMode == SyncMode.FULL_REFRESH
132+
) {
133+
val initialLoadHandler =
134+
getInitialLoadHandler(database, airbyteStream, catalog, stateManager)
135+
?: throw IllegalStateException(
136+
"Must provide initialLoadHandler for resumable full refresh."
137+
)
138+
return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
139+
}
140+
141+
// If flag is off, fall back to legacy non-resumable refresh
142+
return super.getFullRefreshStream(
143+
database,
144+
airbyteStream,
145+
catalog,
146+
stateManager,
147+
namespace,
148+
selectedDatabaseFields,
149+
table,
150+
emittedAt,
151+
syncMode,
152+
cursorField,
153+
)
154+
}
155+
98156
override fun queryTableFullRefresh(
99157
database: JdbcDatabase,
100158
columnNames: List<String>,

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt

+39-7
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ protected constructor(driverClassName: String) :
155155
this.getAirbyteType(columnType)
156156
}
157157

158+
initializeForStateManager(database, catalog, fullyQualifiedTableNameToInfo, stateManager)
159+
158160
val incrementalIterators =
159161
getIncrementalIterators(
160162
database,
@@ -188,6 +190,15 @@ protected constructor(driverClassName: String) :
188190
}
189191
}
190192

193+
// Optional - perform any initialization logic before read. For example, source connector
194+
// can choose to load up state manager here.
195+
protected open fun initializeForStateManager(
196+
database: Database,
197+
catalog: ConfiguredAirbyteCatalog,
198+
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
199+
stateManager: StateManager
200+
) {}
201+
191202
@Throws(SQLException::class)
192203
protected fun validateCursorFieldForIncrementalTables(
193204
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
@@ -380,7 +391,14 @@ protected constructor(driverClassName: String) :
380391

381392
val table = tableNameToTable[fullyQualifiedTableName]!!
382393
val tableReadIterator =
383-
createReadIterator(database, airbyteStream, table, stateManager, emittedAt)
394+
createReadIterator(
395+
database,
396+
airbyteStream,
397+
catalog,
398+
table,
399+
stateManager,
400+
emittedAt
401+
)
384402
iteratorList.add(tableReadIterator)
385403
}
386404
}
@@ -401,6 +419,7 @@ protected constructor(driverClassName: String) :
401419
private fun createReadIterator(
402420
database: Database,
403421
airbyteStream: ConfiguredAirbyteStream,
422+
catalog: ConfiguredAirbyteCatalog?,
404423
table: TableInfo<CommonField<DataType>>,
405424
stateManager: StateManager?,
406425
emittedAt: Instant
@@ -442,7 +461,9 @@ protected constructor(driverClassName: String) :
442461
airbyteMessageIterator =
443462
getFullRefreshStream(
444463
database,
445-
streamName,
464+
airbyteStream,
465+
catalog,
466+
stateManager,
446467
namespace,
447468
selectedDatabaseFields,
448469
table,
@@ -475,7 +496,9 @@ protected constructor(driverClassName: String) :
475496
iterator =
476497
getFullRefreshStream(
477498
database,
478-
streamName,
499+
airbyteStream,
500+
catalog,
501+
stateManager,
479502
namespace,
480503
selectedDatabaseFields,
481504
table,
@@ -560,18 +583,22 @@ protected constructor(driverClassName: String) :
560583
* Creates a AirbyteMessageIterator that contains all records for a database source connection
561584
*
562585
* @param database Source Database
563-
* @param streamName name of an individual stream in which a stream represents a source (e.g.
586+
* @param airbyteStream name of an individual stream in which a stream represents a source (e.g.
564587
* API endpoint or database table)
588+
* @param catalog List of streams (e.g. database tables or API endpoints) with settings on sync
589+
* @param stateManager tracking the state from previous sync; used for resumable full refresh.
565590
* @param namespace Namespace of the database (e.g. public)
566591
* @param selectedDatabaseFields List of all interested database column names
567592
* @param table information in tabular format
568593
* @param emittedAt Time when data was emitted from the Source database
569594
* @param syncMode The sync mode that this full refresh stream should be associated with.
570595
* @return AirbyteMessageIterator with all records for a database source
571596
*/
572-
private fun getFullRefreshStream(
597+
protected open fun getFullRefreshStream(
573598
database: Database,
574-
streamName: String,
599+
airbyteStream: ConfiguredAirbyteStream,
600+
catalog: ConfiguredAirbyteCatalog?,
601+
stateManager: StateManager?,
575602
namespace: String,
576603
selectedDatabaseFields: List<String>,
577604
table: TableInfo<CommonField<DataType>>,
@@ -588,7 +615,12 @@ protected constructor(driverClassName: String) :
588615
syncMode,
589616
cursorField
590617
)
591-
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli())
618+
return getMessageIterator(
619+
queryStream,
620+
airbyteStream.stream.name,
621+
namespace,
622+
emittedAt.toEpochMilli()
623+
)
592624
}
593625

594626
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.source.relationaldb
6+
7+
import io.airbyte.commons.util.AutoCloseableIterator
8+
import io.airbyte.protocol.models.CommonField
9+
import io.airbyte.protocol.models.v0.AirbyteMessage
10+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
11+
import java.time.Instant
12+
13+
interface InitialLoadHandler<T> {
14+
fun getIteratorForStream(
15+
airbyteStream: ConfiguredAirbyteStream,
16+
table: TableInfo<CommonField<T>>,
17+
emittedAt: Instant
18+
): AutoCloseableIterator<AirbyteMessage>
19+
}

0 commit comments

Comments
 (0)