Skip to content

Commit 2991827

Browse files
committed
interface changes
1 parent e2e12d8 commit 2991827

File tree

5 files changed

+51
-18
lines changed

5 files changed

+51
-18
lines changed

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
3333
) {
3434

3535
private val log = KotlinLogging.logger {}
36-
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
36+
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
3737
val writeBuffer =
3838
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)
3939

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
2424
private val log = KotlinLogging.logger {}
2525

2626
// State maintained to make decision between async calls
27+
private val rawTableSuffix: String
2728
private val finalTmpTableSuffix: String
2829
private val initialRawTableStatus: InitialRawTableStatus =
2930
destinationInitialStatus.initialRawTableStatus
@@ -36,7 +37,8 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
3637

3738
init {
3839
val stream = destinationInitialStatus.streamConfig
39-
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
40+
rawTableSuffix = NO_SUFFIX
41+
storageOperation.prepareStage(stream.id, NO_SUFFIX)
4042
if (!disableTypeDedupe) {
4143
// Prepare final tables based on sync mode.
4244
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
@@ -114,8 +116,16 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
114116
return NO_SUFFIX
115117
}
116118

119+
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
120+
// redirect to the appropriate raw table (potentially the temp raw table).
121+
writeRecordsImpl(
122+
streamConfig.copy(id = streamConfig.id.copy(rawName = streamConfig.id.rawName + rawTableSuffix)),
123+
stream,
124+
)
125+
}
126+
117127
/** Write records will be destination type specific, Insert vs staging based on format */
118-
abstract override fun writeRecords(
128+
abstract fun writeRecordsImpl(
119129
streamConfig: StreamConfig,
120130
stream: Stream<PartialAirbyteMessage>
121131
)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
2424
destinationInitialStatus,
2525
disableTypeDedupe
2626
) {
27-
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
27+
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
2828
storageOperation.writeToStage(streamConfig, stream)
2929
}
3030
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation
66

77
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
88
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
9-
import io.airbyte.protocol.models.v0.DestinationSyncMode
109
import java.time.Instant
1110
import java.util.Optional
1211

@@ -16,9 +15,33 @@ interface StorageOperation<Data> {
1615
*/
1716

1817
/**
19-
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
18+
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
19+
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction
20+
* with [overwriteStage].
2021
*/
21-
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)
22+
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)
23+
24+
/**
25+
* Swap the "temporary" stage into the "real" stage. For example,
26+
* `DROP TABLE airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
27+
*/
28+
fun overwriteStage(streamId: StreamId, suffix: String)
29+
30+
/**
31+
* Copy all records from the temporary stage into the real stage, then drop the temporary stage.
32+
* For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP
33+
* TABLE airbyte_internal.foo_tmp`.
34+
*/
35+
fun transferFromTempStage(streamId: StreamId, suffix: String)
36+
37+
/**
38+
* Get the generation of a single record in the stage. Not necessarily the min or max generation,
39+
* just _any_ record.
40+
*
41+
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
42+
* always contains exactly one generation.
43+
*/
44+
fun getStageGeneration(streamId: StreamId, suffix: String): Long
2245

2346
/** Delete previously staged data, using deterministic information from streamId. */
2447
fun cleanupStage(streamId: StreamId)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class AbstractStreamOperationTest {
5252
storageOperation,
5353
destinationInitialStatus,
5454
) {
55-
override fun writeRecords(
55+
override fun writeRecordsImpl(
5656
streamConfig: StreamConfig,
5757
stream: Stream<PartialAirbyteMessage>
5858
) {
@@ -97,7 +97,7 @@ class AbstractStreamOperationTest {
9797
val streamOperations = TestStreamOperation(storageOperation, initialState)
9898

9999
verifySequence {
100-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
100+
storageOperation.prepareStage(streamId, "")
101101
storageOperation.createFinalTable(streamConfig, "", false)
102102
}
103103
confirmVerified(storageOperation)
@@ -144,7 +144,7 @@ class AbstractStreamOperationTest {
144144
val streamOperations = TestStreamOperation(storageOperation, initialState)
145145

146146
verifySequence {
147-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
147+
storageOperation.prepareStage(streamId, "")
148148
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
149149
}
150150
confirmVerified(storageOperation)
@@ -189,7 +189,7 @@ class AbstractStreamOperationTest {
189189
val streamOperations = TestStreamOperation(storageOperation, initialState)
190190

191191
verifySequence {
192-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
192+
storageOperation.prepareStage(streamId, "")
193193
// No table creation - we can just reuse the existing table.
194194
}
195195
confirmVerified(storageOperation)
@@ -232,7 +232,7 @@ class AbstractStreamOperationTest {
232232
val streamOperations = TestStreamOperation(storageOperation, initialState)
233233

234234
verifySequence {
235-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
235+
storageOperation.prepareStage(streamId, "")
236236
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
237237
}
238238
confirmVerified(storageOperation)
@@ -276,7 +276,7 @@ class AbstractStreamOperationTest {
276276
val streamOperations = TestStreamOperation(storageOperation, initialState)
277277

278278
verifySequence {
279-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
279+
storageOperation.prepareStage(streamId, "")
280280
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
281281
}
282282
confirmVerified(storageOperation)
@@ -320,7 +320,7 @@ class AbstractStreamOperationTest {
320320
val streamOperations = TestStreamOperation(storageOperation, initialState)
321321

322322
verifySequence {
323-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
323+
storageOperation.prepareStage(streamId, "")
324324
storageOperation.createFinalTable(streamConfig, "", false)
325325
}
326326
confirmVerified(storageOperation)
@@ -366,7 +366,7 @@ class AbstractStreamOperationTest {
366366
val streamOperations = TestStreamOperation(storageOperation, initialState)
367367

368368
verifySequence {
369-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
369+
storageOperation.prepareStage(streamId, "")
370370
storageOperation.softResetFinalTable(streamConfig)
371371
}
372372
confirmVerified(storageOperation)
@@ -410,7 +410,7 @@ class AbstractStreamOperationTest {
410410
val streamOperations = TestStreamOperation(storageOperation, initialState)
411411

412412
verifySequence {
413-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
413+
storageOperation.prepareStage(streamId, "")
414414
// No soft reset - we can just reuse the existing table.
415415
}
416416
confirmVerified(storageOperation)
@@ -450,7 +450,7 @@ class AbstractStreamOperationTest {
450450
val streamOperations = TestStreamOperation(storageOperation, initialState)
451451

452452
verifySequence {
453-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
453+
storageOperation.prepareStage(streamId, "")
454454
storageOperation.softResetFinalTable(streamConfig)
455455
}
456456
confirmVerified(storageOperation)
@@ -505,7 +505,7 @@ class AbstractStreamOperationTest {
505505
val streamOperations = TestStreamOperation(storageOperation, initialState)
506506

507507
verifySequence {
508-
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
508+
storageOperation.prepareStage(streamId, "")
509509
}
510510
confirmVerified(storageOperation)
511511

0 commit comments

Comments
 (0)