Skip to content

Commit 53d7c23

Browse files
committed
start implementing refreshes orchestration
1 parent 0b3a0a5 commit 53d7c23

File tree

11 files changed

+612
-147
lines changed

11 files changed

+612
-147
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
325325
streamConfig,
326326
finalTableDefinition.isPresent,
327327
initialRawTableState,
328+
// TODO fix this
329+
// for now, no JDBC destinations actually do refreshes
330+
// so this is just to make our code compile
331+
InitialRawTableStatus(false, false, Optional.empty()),
328332
isSchemaMismatch,
329333
isFinalTableEmpty,
330334
destinationState

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

Lines changed: 141 additions & 61 deletions
Large diffs are not rendered by default.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ interface StorageOperation<Data> {
1818
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
1919
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction
2020
* with [overwriteStage].
21+
*
22+
* @param replace If true, then replace existing resources with empty e.g. tables. If false,
23+
* then leave existing resources untouched.
2124
*/
2225
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)
2326

@@ -40,8 +43,10 @@ interface StorageOperation<Data> {
4043
*
4144
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
4245
* always contains exactly one generation.
46+
*
47+
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
4348
*/
44-
fun getStageGeneration(streamId: StreamId, suffix: String): Long
49+
fun getStageGeneration(streamId: StreamId, suffix: String): Long?
4550

4651
/** Delete previously staged data, using deterministic information from streamId. */
4752
fun cleanupStage(streamId: StreamId)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStatus.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping
77
data class DestinationInitialStatus<DestinationState>(
88
val streamConfig: StreamConfig,
99
val isFinalTablePresent: Boolean,
10+
// TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists
1011
val initialRawTableStatus: InitialRawTableStatus,
12+
/**
13+
* The state of the temp raw table, or null if there is no temp raw table
14+
* at the start of the sync.
15+
*/
16+
val initialTempRawTableStatus: InitialRawTableStatus,
1117
val isSchemaMismatch: Boolean,
1218
val isFinalTableEmpty: Boolean,
1319
val destinationState: DestinationState,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,19 @@ import java.util.*
88

99
data class InitialRawTableStatus(
1010
val rawTableExists: Boolean,
11+
/**
12+
* Whether there were any records with null `_airbyte_loaded_at`, at the time
13+
* that this status was fetched.
14+
*/
1115
val hasUnprocessedRecords: Boolean,
16+
// TODO Make maxProcessedTimestamp just `Instant?` instead of Optional
17+
/**
18+
* The highest timestamp such that all records in
19+
* `SELECT * FROM raw_table WHERE _airbyte_extracted_at <= ?`
20+
* have a nonnull `_airbyte_loaded_at`.
21+
*
22+
* Destinations MAY use this value to only run T+D on records with
23+
* `_airbyte_extracted_at > ?` (note the strictly-greater comparison).
24+
*/
1225
val maxProcessedTimestamp: Optional<Instant>
1326
)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ data class StreamId(
4242
return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote"
4343
}
4444

45-
fun rawTableId(quote: String): String {
46-
return "$quote$rawNamespace$quote.$quote$rawName$quote"
45+
@JvmOverloads
46+
fun rawTableId(quote: String, suffix: String = ""): String {
47+
return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote"
4748
}
4849

4950
fun finalName(quote: String): String {

0 commit comments

Comments
 (0)