Skip to content

Commit 1844494

Browse files
CDK test failures
1 parent 97cc7a6 commit 1844494

File tree

24 files changed

+102
-109
lines changed

24 files changed

+102
-109
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
4242
*/
4343
@Deprecated("")
4444
@Throws(IOException::class)
45-
protected abstract fun writeRecord(record: AirbyteRecordMessage, generationId: Long = 0)
45+
protected abstract fun writeRecord(
46+
record: AirbyteRecordMessage,
47+
generationId: Long = 0,
48+
syncId: Long = 0
49+
)
4650

4751
/**
4852
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire
@@ -80,7 +84,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
8084

8185
@Deprecated("")
8286
@Throws(Exception::class)
83-
override fun accept(record: AirbyteRecordMessage, generationId: Long): Long {
87+
override fun accept(record: AirbyteRecordMessage, generationId: Long, syncId: Long): Long {
8488
if (!isStarted) {
8589
if (useCompression) {
8690
compressedBuffer = GzipCompressorOutputStream(byteCounter)
@@ -92,7 +96,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
9296
}
9397
if (inputStream == null && !isClosed) {
9498
val startCount = byteCounter.count
95-
@Suppress("deprecation") writeRecord(record, generationId)
99+
@Suppress("deprecation") writeRecord(record, generationId, syncId)
96100
return byteCounter.count - startCount
97101
} else {
98102
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ interface BufferingStrategy : AutoCloseable {
3030
fun addRecord(
3131
stream: AirbyteStreamNameNamespacePair,
3232
message: AirbyteMessage,
33-
generationId: Long = 0
33+
generationId: Long = 0,
34+
syncId: Long = 0
3435
): Optional<BufferFlushType>
3536

3637
/** Flush the buffered messages from a single stream */

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class InMemoryRecordBufferingStrategy(
4242
override fun addRecord(
4343
stream: AirbyteStreamNameNamespacePair,
4444
message: AirbyteMessage,
45-
generationId: Long
45+
generationId: Long,
46+
syncId: Long
4647
): Optional<BufferFlushType> {
4748
var flushed: Optional<BufferFlushType> = Optional.empty()
4849

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ interface SerializableBuffer : AutoCloseable {
2626
*/
2727
@Deprecated("")
2828
@Throws(Exception::class)
29-
fun accept(record: AirbyteRecordMessage, generationId: Long = 0): Long
29+
fun accept(record: AirbyteRecordMessage, generationId: Long = 0, syncId: Long = 0): Long
3030

3131
/**
3232
* TODO: (ryankfu) Move all destination connectors to pass the serialized record string instead

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ class SerializedBufferingStrategy
4747
override fun addRecord(
4848
stream: AirbyteStreamNameNamespacePair,
4949
message: AirbyteMessage,
50-
generationId: Long
50+
generationId: Long,
51+
syncId: Long
5152
): Optional<BufferFlushType> {
5253
var flushed: Optional<BufferFlushType> = Optional.empty()
5354

5455
val buffer = getOrCreateBuffer(stream)
5556

5657
@Suppress("DEPRECATION")
57-
val actualMessageSizeInBytes = buffer.accept(message.record, generationId)
58+
val actualMessageSizeInBytes = buffer.accept(message.record, generationId, syncId)
5859
totalBufferSizeInBytes += actualMessageSizeInBytes
5960
// Flushes buffer when either the buffer was completely filled or only a single stream was
6061
// filled

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test
2424
import org.mockito.ArgumentMatchers
2525
import org.mockito.Mockito
2626
import org.mockito.kotlin.any
27+
import org.mockito.kotlin.eq
2728
import org.mockito.kotlin.mock
2829

2930
class BufferedStreamConsumerTest {
@@ -361,7 +362,7 @@ class BufferedStreamConsumerTest {
361362
// The first two records that we push will not trigger any flushes, but the third record
362363
// _will_
363364
// trigger a flush
364-
Mockito.`when`(strategy.addRecord(any(), any()))
365+
Mockito.`when`(strategy.addRecord(any(), any(), eq(0)))
365366
.thenReturn(
366367
Optional.empty(),
367368
Optional.empty(),
@@ -463,7 +464,7 @@ class BufferedStreamConsumerTest {
463464
// The first two records that we push will not trigger any flushes, but the third record
464465
// _will_
465466
// trigger a flush
466-
Mockito.`when`(strategy.addRecord(any(), any()))
467+
Mockito.`when`(strategy.addRecord(any(), any(), eq(0)))
467468
.thenReturn(
468469
Optional.empty(),
469470
Optional.empty(),

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.junit.jupiter.api.BeforeEach
1414
import org.junit.jupiter.api.Test
1515
import org.mockito.Mockito
1616
import org.mockito.kotlin.any
17+
import org.mockito.kotlin.eq
1718

1819
class SerializedBufferingStrategyTest {
1920
private val catalog: ConfiguredAirbyteCatalog =
@@ -37,7 +38,7 @@ class SerializedBufferingStrategyTest {
3738

3839
@Throws(Exception::class)
3940
private fun setupMock(mockObject: SerializableBuffer) {
40-
Mockito.`when`(mockObject.accept(any())).thenReturn(10L)
41+
Mockito.`when`(mockObject.accept(any(), eq(0L))).thenReturn(10L)
4142
Mockito.`when`(mockObject.byteCount).thenReturn(10L)
4243
Mockito.`when`(mockObject.maxTotalBufferSizeInBytes).thenReturn(MAX_TOTAL_BUFFER_SIZE_BYTES)
4344
Mockito.`when`(mockObject.maxPerStreamBufferSizeInBytes)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+24-22
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog
3333
import io.airbyte.protocol.models.v0.AirbyteMessage
3434
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
3535
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
36-
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
3736
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
3837
import io.airbyte.protocol.models.v0.AirbyteStateMessage
3938
import io.airbyte.protocol.models.v0.AirbyteStateStats
@@ -1581,13 +1580,13 @@ abstract class DestinationAcceptanceTest(
15811580
return record
15821581
}
15831582

1584-
private fun getChanges(record: JsonNode): MutableList<AirbyteRecordMessageMetaChange> {
1583+
private fun getMeta(record: JsonNode): ObjectNode {
15851584
val meta = record.get(JavaBaseConstants.COLUMN_NAME_AB_META)
15861585

15871586
val asString = if (meta.isTextual) meta.asText() else Jsons.serialize(meta)
1588-
val asMeta = Jsons.deserialize(asString, AirbyteRecordMessageMeta::class.java)
1587+
val asMeta = Jsons.deserialize(asString)
15891588

1590-
return asMeta.changes
1589+
return asMeta as ObjectNode
15911590
}
15921591

15931592
@Test
@@ -1628,11 +1627,6 @@ abstract class DestinationAcceptanceTest(
16281627
Assertions.assertEquals(message.record.emittedAt, record.get(abTsKey).asLong())
16291628

16301629
if (useV2Fields) {
1631-
// Validate that the loadTs at at least >= the time the test started
1632-
Assertions.assertTrue(
1633-
record.get(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT).asLong() >=
1634-
preRunTime.toEpochMilli()
1635-
)
16361630
// Generation id should match the one from the catalog
16371631
Assertions.assertEquals(
16381632
generationId,
@@ -1642,18 +1636,22 @@ abstract class DestinationAcceptanceTest(
16421636
}
16431637

16441638
// Regardless of whether change failures are capatured, all v2
1645-
// destinations should pass upstream changes through.
1639+
// destinations should pass upstream changes through and set sync id.
16461640
if (useV2Fields) {
1647-
val changes = getChanges(destinationOutput[2])
1641+
val metas = destinationOutput.map { getMeta(it) }
1642+
val syncIdsAllValid = metas.map { it["sync_id"].asLong() }.all { it == 100L }
1643+
Assertions.assertTrue(syncIdsAllValid)
1644+
1645+
val changes = metas[2]["changes"].elements().asSequence().toList()
16481646
Assertions.assertEquals(changes.size, 1)
1649-
Assertions.assertEquals(changes[0].field, "name")
1647+
Assertions.assertEquals(changes[0]["field"].asText(), "name")
16501648
Assertions.assertEquals(
1651-
changes[0].change,
1652-
AirbyteRecordMessageMetaChange.Change.TRUNCATED
1649+
changes[0]["change"].asText(),
1650+
AirbyteRecordMessageMetaChange.Change.TRUNCATED.value()
16531651
)
16541652
Assertions.assertEquals(
1655-
changes[0].reason,
1656-
AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION
1653+
changes[0]["reason"].asText(),
1654+
AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION.value()
16571655
)
16581656
}
16591657

@@ -1665,14 +1663,17 @@ abstract class DestinationAcceptanceTest(
16651663
val data = getData(badRow)
16661664

16671665
Assertions.assertTrue(data["id"] == null || data["id"].isNull)
1668-
val changes = getChanges(badRow)
1666+
val changes = getMeta(badRow)["changes"].elements().asSequence().toList()
16691667

16701668
Assertions.assertEquals(1, changes.size)
1671-
Assertions.assertEquals("id", changes[0].field)
1672-
Assertions.assertEquals(AirbyteRecordMessageMetaChange.Change.NULLED, changes[0].change)
1669+
Assertions.assertEquals("id", changes[0]["field"].asText())
1670+
Assertions.assertEquals(
1671+
AirbyteRecordMessageMetaChange.Change.NULLED.value(),
1672+
changes[0]["change"].asText()
1673+
)
16731674
Assertions.assertEquals(
1674-
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR,
1675-
changes[0].reason
1675+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR.value(),
1676+
changes[0]["reason"].asText()
16761677
)
16771678

16781679
// Expect the third message to have added a new change to an old one
@@ -1681,7 +1682,8 @@ abstract class DestinationAcceptanceTest(
16811682
Assertions.assertTrue(
16821683
dataWithPreviousChange["id"] == null || dataWithPreviousChange["id"].isNull
16831684
)
1684-
val twoChanges = getChanges(badRowWithPreviousChange)
1685+
val twoChanges =
1686+
getMeta(badRowWithPreviousChange)["changes"].elements().asSequence().toList()
16851687
Assertions.assertEquals(2, twoChanges.size)
16861688
}
16871689
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"cursor_field": [],
1919
"destination_sync_mode": "overwrite",
2020
"primary_key": [],
21-
"generation_id": 10
21+
"generation_id": 10,
22+
"minimum_generation_id": 10,
23+
"sync_id": 100
2224
}
2325
]
2426
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,13 @@ class S3ConsumerFactory {
167167
// Buffer creation function: yields a file buffer that converts
168168
// incoming data to the correct format for the destination.
169169

170-
val generationIds =
170+
val generationAndSyncIds =
171171
catalog.streams.associate { stream ->
172172
val descriptor =
173173
StreamDescriptor()
174174
.withNamespace(stream.stream.namespace)
175175
.withName(stream.stream.name)
176-
descriptor to stream.generationId
176+
descriptor to Pair(stream.generationId, stream.syncId)
177177
}
178178

179179
val createFunction =
@@ -201,7 +201,7 @@ class S3ConsumerFactory {
201201
flushBufferFunction(storageOps, writeConfigs, catalog)
202202
)
203203
},
204-
generationIds
204+
generationAndSyncIds
205205
),
206206
catalog,
207207
// S3 has no concept of default namespace

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import java.util.stream.Stream
1818
class S3DestinationFlushFunction(
1919
override val optimalBatchSizeBytes: Long,
2020
private val strategyProvider: () -> BufferingStrategy,
21-
private val generationIds: Map<StreamDescriptor, Long> = emptyMap()
21+
private val generationAndSyncIds: Map<StreamDescriptor, Pair<Long, Long>> = emptyMap()
2222
) : DestinationFlushFunction {
2323

2424
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
@@ -47,8 +47,8 @@ class S3DestinationFlushFunction(
4747
.withData(data)
4848
val completeMessage =
4949
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(completeRecord)
50-
val generationId = generationIds[streamDescriptor] ?: 0
51-
strategy.addRecord(nameAndNamespace, completeMessage, generationId)
50+
val (generationId, syncId) = generationAndSyncIds[streamDescriptor] ?: Pair(0L, 0L)
51+
strategy.addRecord(nameAndNamespace, completeMessage, generationId, syncId)
5252
}
5353
strategy.flushSingleStream(nameAndNamespace)
5454
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroFieldConversionFailureListener.kt

+3-18
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,14 @@ package io.airbyte.cdk.integrations.destination.s3.avro
77
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
88
import java.lang.Exception
99
import org.apache.avro.Schema
10-
import org.apache.avro.SchemaBuilder
1110
import org.apache.avro.generic.GenericData
1211
import org.apache.avro.generic.GenericRecordBuilder
1312
import tech.allegro.schema.json2avro.converter.FieldConversionFailureListener
1413

1514
class AvroFieldConversionFailureListener : FieldConversionFailureListener() {
16-
val CHANGE_SCHEMA: Schema =
17-
SchemaBuilder.builder()
18-
.record("change")
19-
.fields()
20-
.name("field")
21-
.type()
22-
.stringType()
23-
.noDefault()
24-
.name("change")
25-
.type()
26-
.stringType()
27-
.noDefault()
28-
.name("reason")
29-
.type()
30-
.stringType()
31-
.noDefault()
32-
.endRecord()
15+
companion object {
16+
val CHANGE_SCHEMA: Schema = AvroConstants.AVRO_CHANGES_SCHEMA
17+
}
3318

3419
override fun onFieldConversionFailure(
3520
avroName: String,

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,16 @@ class AvroRecordFactory(private val schema: Schema?, private val converter: Json
6363
fun getAvroRecordV2(
6464
id: UUID,
6565
generationId: Long,
66+
syncId: Long,
6667
recordMessage: AirbyteRecordMessage
6768
): GenericData.Record {
6869
val jsonRecord = MAPPER.createObjectNode()
6970
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString())
7071
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.emittedAt)
71-
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, System.currentTimeMillis())
7272
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
73-
jsonRecord.replace(
74-
JavaBaseConstants.COLUMN_NAME_AB_META,
75-
MAPPER.valueToTree(recordMessage.meta) as ObjectNode
76-
)
77-
73+
val meta = MAPPER.valueToTree(recordMessage.meta) as ObjectNode
74+
meta.put("sync_id", syncId)
75+
jsonRecord.replace(JavaBaseConstants.COLUMN_NAME_AB_META, meta)
7876
jsonRecord.setAll<JsonNode>(recordMessage.data as ObjectNode)
7977

8078
return converter!!.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema)

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ class AvroSerializedBuffer(
5656

5757
@Deprecated("Deprecated in Java")
5858
@Throws(IOException::class)
59-
override fun writeRecord(record: AirbyteRecordMessage, generationId: Long) {
59+
override fun writeRecord(record: AirbyteRecordMessage, generationId: Long, syncId: Long) {
6060
if (this.useV2FieldNames) {
6161
dataFileWriter!!.append(
62-
avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, record)
62+
avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, syncId, record)
6363
)
6464
} else {
6565
dataFileWriter!!.append(avroRecordFactory.getAvroRecord(UUID.randomUUID(), record))

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt

+4-22
Original file line numberDiff line numberDiff line change
@@ -121,39 +121,21 @@ class JsonToAvroSchemaConverter {
121121
.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)
122122
.type(TIMESTAMP_MILLIS_SCHEMA)
123123
.noDefault()
124-
assembler
125-
.name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)
126-
.type(TIMESTAMP_MILLIS_SCHEMA)
127-
.noDefault()
128124
assembler
129125
.name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
130126
.type(Schema.create(Schema.Type.LONG))
131127
.noDefault()
132-
val changeSchema: Schema =
133-
SchemaBuilder.builder()
134-
.record("change")
135-
.fields()
136-
.name("field")
137-
.type()
138-
.stringType()
139-
.noDefault()
140-
.name("change")
141-
.type()
142-
.stringType()
143-
.noDefault()
144-
.name("reason")
145-
.type()
146-
.stringType()
147-
.noDefault()
148-
.endRecord()
149128
assembler
150129
.name(JavaBaseConstants.COLUMN_NAME_AB_META)
151130
.type(
152131
SchemaBuilder.builder()
153132
.record(JavaBaseConstants.COLUMN_NAME_AB_META)
154133
.fields()
155134
.name("changes")
156-
.type(Schema.createArray(changeSchema))
135+
.type(Schema.createArray(AvroConstants.AVRO_CHANGES_SCHEMA))
136+
.noDefault()
137+
.name("sync_id")
138+
.type(Schema.create(Schema.Type.LONG))
157139
.noDefault()
158140
.endRecord()
159141
)

0 commit comments

Comments
 (0)