Skip to content

Commit 0651902

Browse files
committed
generation id / sync id in cdk
1 parent ae04fdc commit 0651902

File tree

14 files changed

+82
-19
lines changed

14 files changed

+82
-19
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package io.airbyte.cdk.integrations.base
55

6-
import java.util.*
6+
import java.util.Locale
77
import org.apache.commons.lang3.StringUtils
88

99
fun upperQuoted(column: String): String {
@@ -24,41 +24,60 @@ object JavaBaseConstants {
2424
const val COLUMN_NAME_DATA: String = "_airbyte_data"
2525
@JvmField
2626
val LEGACY_RAW_TABLE_COLUMNS: List<String> =
27-
java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
27+
listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
2828

2929
// destination v2
3030
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
3131
const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at"
3232
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
3333
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
34+
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"
35+
36+
const val AIRBYTE_META_SYNC_ID_KEY = "sync_id"
3437

3538
// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
3639
// use this column list.
3740
@JvmField
3841
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
39-
java.util.List.of(
42+
listOf(
4043
COLUMN_NAME_AB_RAW_ID,
4144
COLUMN_NAME_AB_EXTRACTED_AT,
4245
COLUMN_NAME_AB_LOADED_AT,
4346
COLUMN_NAME_DATA,
4447
)
4548
@JvmField
4649
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
47-
java.util.List.of(
50+
listOf(
51+
COLUMN_NAME_AB_RAW_ID,
52+
COLUMN_NAME_AB_EXTRACTED_AT,
53+
COLUMN_NAME_AB_LOADED_AT,
54+
COLUMN_NAME_DATA,
55+
COLUMN_NAME_AB_META,
56+
)
57+
@JvmField
58+
val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List<String> =
59+
listOf(
4860
COLUMN_NAME_AB_RAW_ID,
4961
COLUMN_NAME_AB_EXTRACTED_AT,
5062
COLUMN_NAME_AB_LOADED_AT,
5163
COLUMN_NAME_DATA,
5264
COLUMN_NAME_AB_META,
65+
COLUMN_NAME_AB_GENERATION_ID,
5366
)
5467
@JvmField
5568
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
56-
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
69+
listOf(
70+
COLUMN_NAME_AB_RAW_ID,
71+
COLUMN_NAME_AB_EXTRACTED_AT,
72+
COLUMN_NAME_AB_META,
73+
COLUMN_NAME_AB_GENERATION_ID
74+
)
5775

5876
const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
5977
enum class DestinationColumns(val rawColumns: List<String>) {
6078
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
6179
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
80+
V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION),
6281
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
6382
}
6483
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
5757
protected abstract fun writeRecord(
5858
recordString: String,
5959
airbyteMetaString: String,
60-
emittedAt: Long
60+
generationId: Long,
61+
emittedAt: Long,
6162
)
6263

6364
/**
@@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
99100
}
100101

101102
@Throws(Exception::class)
102-
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
103+
override fun accept(
104+
recordString: String,
105+
airbyteMetaString: String,
106+
generationId: Long,
107+
emittedAt: Long
108+
): Long {
103109
if (!isStarted) {
104110
if (useCompression) {
105111
compressedBuffer = GzipCompressorOutputStream(byteCounter)
@@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
111117
}
112118
if (inputStream == null && !isClosed) {
113119
val startCount = byteCounter.count
114-
writeRecord(recordString, airbyteMetaString, emittedAt)
120+
writeRecord(recordString, airbyteMetaString, generationId, emittedAt)
115121
return byteCounter.count - startCount
116122
} else {
117123
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ interface SerializableBuffer : AutoCloseable {
3737
* @throws Exception
3838
*/
3939
@Throws(Exception::class)
40-
fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long
40+
fun accept(recordString: String, airbyteMetaString: String, generationId: Long, emittedAt: Long): Long
4141

4242
/** Flush a buffer implementation. */
4343
@Throws(Exception::class) fun flush()

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class AvroSerializedBuffer(
5858

5959
@Throws(IOException::class)
6060
@Suppress("DEPRECATION")
61-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
61+
override fun writeRecord(recordString: String, airbyteMetaString: String, generationId: Long, emittedAt: Long) {
6262
// TODO Remove this double deserialization when S3 Destinations moves to Async.
6363
writeRecord(
6464
Jsons.deserialize(

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator {
2828
id: UUID,
2929
formattedString: String,
3030
emittedAt: Long,
31-
formattedAirbyteMetaString: String
31+
formattedAirbyteMetaString: String,
32+
generationId: Long,
3233
): List<Any> {
3334
// TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or
3435
// RootLevelFlatteningSheetGenerator

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ class CsvSerializedBuffer(
6868
}
6969

7070
@Throws(IOException::class)
71-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
71+
override fun writeRecord(
72+
recordString: String,
73+
airbyteMetaString: String,
74+
generationId: Long,
75+
emittedAt: Long
76+
) {
7277
csvPrinter!!.printRecord(
7378
csvSheetGenerator.getDataRow(
7479
UUID.randomUUID(),

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ interface CsvSheetGenerator {
2626
id: UUID,
2727
formattedString: String,
2828
emittedAt: Long,
29-
formattedAirbyteMetaString: String
29+
formattedAirbyteMetaString: String,
30+
generationId: Long = 0,
3031
): List<Any>
3132

3233
object Factory {

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator
2929
@JvmOverloads
3030
constructor(
3131
private val destinationColumns: JavaBaseConstants.DestinationColumns =
32-
JavaBaseConstants.DestinationColumns.LEGACY
32+
JavaBaseConstants.DestinationColumns.LEGACY,
3333
) : CsvSheetGenerator {
3434
override fun getHeaderRow(): List<String> {
3535
return destinationColumns.rawColumns
@@ -52,7 +52,8 @@ constructor(
5252
id: UUID,
5353
formattedString: String,
5454
emittedAt: Long,
55-
formattedAirbyteMetaString: String
55+
formattedAirbyteMetaString: String,
56+
generationId: Long,
5657
): List<Any> {
5758
return when (destinationColumns) {
5859
JavaBaseConstants.DestinationColumns.LEGACY ->
@@ -67,6 +68,16 @@ constructor(
6768
)
6869
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
6970
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
71+
72+
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION ->
73+
listOf(
74+
id,
75+
Instant.ofEpochMilli(emittedAt),
76+
"",
77+
formattedString,
78+
formattedAirbyteMetaString,
79+
generationId
80+
)
7081
}
7182
}
7283
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class JsonLSerializedBuffer(
6060
}
6161

6262
@Suppress("DEPRECATION")
63-
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
63+
override fun writeRecord(recordString: String, airbyteMetaString: String, generationId: Long, emittedAt: Long) {
6464
// TODO Remove this double deserialization when S3 Destinations moves to Async.
6565
writeRecord(
6666
Jsons.deserialize(

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class ParquetSerializedBuffer(
122122
}
123123

124124
@Throws(Exception::class)
125-
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
125+
override fun accept(recordString: String, airbyteMetaString: String, generationId: Long, emittedAt: Long): Long {
126126
throw UnsupportedOperationException(
127127
"This method is not supported for ParquetSerializedBuffer"
128128
)

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ internal class AsyncFlush(
6161
writer.accept(
6262
record!!.serialized!!,
6363
Jsons.serialize(record.record!!.meta),
64+
// Destinations that want to use generations should switch to the new
65+
// structure (e.g. StagingStreamOperations)
66+
0,
6467
record.record!!.emittedAt
6568
)
6669
} catch (e: Exception) {

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ private constructor(
277277
val tableName: String
278278
when (destinationColumns) {
279279
JavaBaseConstants.DestinationColumns.V2_WITH_META,
280-
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
280+
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META,
281+
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION-> {
281282
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
282283
outputSchema = streamId.rawNamespace
283284
tableName = streamId.rawName

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
4242
it.accept(
4343
record.serialized!!,
4444
Jsons.serialize(record.record!!.meta),
45+
streamConfig.generationId,
4546
record.record!!.emittedAt
4647
)
4748
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandl
1313
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
1414
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1515
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
16+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY
1617
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
1718
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
19+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1820
import io.airbyte.protocol.models.v0.StreamDescriptor
1921
import io.github.oshai.kotlinlogging.KotlinLogging
2022
import java.util.concurrent.CompletableFuture
@@ -102,7 +104,20 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
102104
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
103105
val streamConfig =
104106
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
105-
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
107+
streamOpsMap[streamConfig.id]?.writeRecords(
108+
streamConfig,
109+
stream.map { record ->
110+
if (record.record!!.meta == null) {
111+
record.record!!.meta = AirbyteRecordMessageMeta()
112+
}
113+
record.also {
114+
it.record!!.meta!!.setAdditionalProperty(
115+
AIRBYTE_META_SYNC_ID_KEY,
116+
streamConfig.syncId,
117+
)
118+
}
119+
},
120+
)
106121
}
107122

108123
override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {

0 commit comments

Comments
 (0)