Skip to content

Commit e02cc57

Browse files
committed
track stream statuses
1 parent c0c6173 commit e02cc57

File tree

14 files changed

+380
-124
lines changed

14 files changed

+380
-124
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@
33
*/
44
package io.airbyte.cdk.integrations.destination
55

6-
import java.util.*
6+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
7+
import java.util.Optional
78

89
/**
910
* @param recordsWritten The number of records written to the stream, or empty if the caller does
1011
* not track this information. (this is primarily for backwards-compatibility with the legacy
1112
* destinations framework; new implementations should always provide this information). If this
1213
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
1314
*/
14-
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {
15-
16-
companion object {
17-
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
18-
}
19-
}
15+
data class StreamSyncSummary(
16+
val recordsWritten: Optional<Long>,
17+
val statusFromSource: AirbyteStreamStatus,
18+
)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
1717
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
1818
import io.airbyte.commons.json.Jsons
1919
import io.airbyte.protocol.models.v0.AirbyteMessage
20+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
2021
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
2122
import io.airbyte.protocol.models.v0.StreamDescriptor
2223
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -72,6 +73,8 @@ constructor(
7273

7374
// Note that this map will only be populated for streams with nonzero records.
7475
private val recordCounts: ConcurrentMap<StreamDescriptor, AtomicLong> = ConcurrentHashMap()
76+
private val streamStatusesFromSource: ConcurrentMap<StreamDescriptor, AirbyteStreamStatus> =
77+
ConcurrentHashMap()
7578

7679
private var hasStarted = false
7780
private var hasClosed = false
@@ -104,12 +107,43 @@ constructor(
104107
airbyteMessageDeserializer.deserializeAirbyteMessage(
105108
message,
106109
)
107-
if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) {
108-
validateRecord(partialAirbyteMessage)
109-
110-
partialAirbyteMessage.record?.streamDescriptor?.let {
111-
getRecordCounter(it).incrementAndGet()
110+
when (partialAirbyteMessage.type) {
111+
AirbyteMessage.Type.RECORD -> {
112+
validateRecord(partialAirbyteMessage)
113+
114+
partialAirbyteMessage.record?.streamDescriptor?.let {
115+
getRecordCounter(it).incrementAndGet()
116+
117+
if (streamStatusesFromSource.containsKey(it)) {
118+
throw IllegalStateException(
119+
"Received a record message after a terminal stream status for stream ${it.namespace}.${it.name}"
120+
)
121+
}
122+
}
123+
}
124+
AirbyteMessage.Type.TRACE -> {
125+
// There are many types of trace messages, but we only care about stream status
126+
// messages with status=COMPLETE or INCOMPLETE.
127+
// INCOMPLETE is a slightly misleading name - it actually means "Stream has stopped
128+
// due to an interruption or error", i.e. failure
129+
partialAirbyteMessage.trace?.streamStatus?.let {
130+
val isTerminalStatus =
131+
it.status == AirbyteStreamStatus.COMPLETE ||
132+
it.status == AirbyteStreamStatus.INCOMPLETE
133+
if (isTerminalStatus) {
134+
val conflictsWithExistingStatus =
135+
streamStatusesFromSource.containsKey(it.streamDescriptor) &&
136+
streamStatusesFromSource[it.streamDescriptor] != it.status
137+
if (conflictsWithExistingStatus) {
138+
throw IllegalStateException(
139+
"Received conflicting stream statuses for stream ${it.streamDescriptor.namespace}.${it.streamDescriptor.name}"
140+
)
141+
}
142+
streamStatusesFromSource[it.streamDescriptor] = it.status
143+
}
144+
}
112145
}
146+
else -> {}
113147
}
114148
bufferEnqueue.addRecord(
115149
partialAirbyteMessage,
@@ -132,12 +166,18 @@ constructor(
132166

133167
val streamSyncSummaries =
134168
streamNames.associate { streamDescriptor ->
169+
// If we didn't receive a stream status message, assume success.
170+
// Platform won't send us any stream status messages yet (since we're not declaring
171+
// supportsRefresh in metadata), so we will always hit this case.
172+
val streamStatusFromSource =
173+
streamStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
135174
StreamDescriptorUtils.withDefaultNamespace(
136175
streamDescriptor,
137176
defaultNamespace,
138177
) to
139178
StreamSyncSummary(
140179
Optional.of(getRecordCounter(streamDescriptor).get()),
180+
streamStatusFromSource,
141181
)
142182
}
143183
onClose.accept(hasFailed, streamSyncSummaries)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,22 @@ class BufferEnqueue(
3333
message: PartialAirbyteMessage,
3434
sizeInBytes: Int,
3535
) {
36-
if (message.type == AirbyteMessage.Type.RECORD) {
37-
handleRecord(message, sizeInBytes)
38-
} else if (message.type == AirbyteMessage.Type.STATE) {
39-
stateManager.trackState(message, sizeInBytes.toLong())
36+
when (message.type) {
37+
AirbyteMessage.Type.RECORD -> {
38+
handleRecord(message, sizeInBytes)
39+
}
40+
AirbyteMessage.Type.STATE -> {
41+
stateManager.trackState(message, sizeInBytes.toLong())
42+
}
43+
else -> {}
4044
}
4145
}
4246

4347
private fun handleRecord(
4448
message: PartialAirbyteMessage,
4549
sizeInBytes: Int,
4650
) {
47-
val streamDescriptor = extractStateFromRecord(message)
51+
val streamDescriptor = extractStreamDescriptorFromRecord(message)
4852
val queue =
4953
buffers.computeIfAbsent(
5054
streamDescriptor,
@@ -87,7 +91,9 @@ class BufferEnqueue(
8791
}
8892

8993
companion object {
90-
private fun extractStateFromRecord(message: PartialAirbyteMessage): StreamDescriptor {
94+
private fun extractStreamDescriptorFromRecord(
95+
message: PartialAirbyteMessage
96+
): StreamDescriptor {
9197
return StreamDescriptor()
9298
.withNamespace(message.record?.namespace)
9399
.withName(message.record?.stream)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class AirbyteMessageDeserializer(
5353
partial.record?.data = null
5454
} else if (AirbyteMessage.Type.STATE == msgType) {
5555
partial.withSerialized(message)
56-
} else {
56+
} else if (AirbyteMessage.Type.TRACE != msgType) {
5757
throw RuntimeException(String.format("Unsupported message type: %s", msgType))
5858
}
5959

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.cdk.integrations.destination.async.model
77
import com.fasterxml.jackson.annotation.JsonProperty
88
import com.fasterxml.jackson.annotation.JsonPropertyDescription
99
import io.airbyte.protocol.models.v0.AirbyteMessage
10+
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
1011
import java.util.Objects
1112

1213
class PartialAirbyteMessage {
@@ -26,6 +27,12 @@ class PartialAirbyteMessage {
2627
@JsonProperty("state")
2728
var state: PartialAirbyteStateMessage? = null
2829

30+
@get:JsonProperty("trace")
31+
@set:JsonProperty("trace")
32+
@JsonProperty("trace")
33+
// These messages don't contain arbitrary blobs, so just directly reference the protocol struct.
34+
var trace: AirbyteTraceMessage? = null
35+
2936
/**
3037
* For record messages, this stores the serialized data blob (i.e.
3138
* `Jsons.serialize(message.getRecord().getData())`). For state messages, this stores the

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ internal constructor(
288288
* hasFailed=false, then it could be full success. if hasFailed=true, then going for partial
289289
* success.
290290
*/
291-
onClose.accept(false, null)
291+
// TODO what to do here?
292+
onClose.accept(false, HashMap())
292293
}
293294

294295
stateManager.listCommitted()!!.forEach(outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
66

77
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8-
import io.airbyte.commons.functional.CheckedBiConsumer
98
import io.airbyte.protocol.models.v0.StreamDescriptor
109

1110
/**
1211
* Interface allowing destination to specify clean up logic that must be executed after all
1312
* record-related logic has finished.
1413
*
15-
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
16-
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
17-
*
1814
* The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java
1915
* Map<StreamDescriptor, StreamSyncSummary> rather than becoming a Map<StreamDescriptor, ? extends
2016
* StreamSyncSummary>
2117
*/
22-
fun interface OnCloseFunction :
23-
CheckedBiConsumer<
24-
Boolean, @JvmSuppressWildcards Map<StreamDescriptor, StreamSyncSummary>, Exception>
18+
fun interface OnCloseFunction {
19+
@JvmSuppressWildcards
20+
@Throws(Exception::class)
21+
fun accept(
22+
hasFailed: Boolean,
23+
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>,
24+
)
25+
}

0 commit comments

Comments
 (0)