Skip to content

Commit e2e12d8

Browse files
committed
throw exception on bad generation id
1 parent d45851c commit e2e12d8

File tree

3 files changed

+16
-9
lines changed

3 files changed

+16
-9
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,10 @@ constructor(
164164

165165
val streamSyncSummaries =
166166
streamNames.associate { streamDescriptor ->
167-
// If we didn't receive a stream status message, assume success.
168-
// Platform won't send us any stream status messages yet (since we're not declaring
169-
// supportsRefresh in metadata), so we will always hit this case.
167+
// If we didn't receive a stream status message, assume failure.
168+
// This is possible if e.g. the orchestrator crashes before sending us the message.
170169
val terminalStatusFromSource =
171-
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
170+
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.INCOMPLETE
172171
StreamDescriptorUtils.withDefaultNamespace(
173172
streamDescriptor,
174173
bufferManager.defaultNamespace,

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,8 +553,7 @@ class AsyncStreamConsumerTest {
553553
ArgumentCaptor.captor()
554554
Mockito.verify(onClose).accept(any(), capture(captor))
555555
assertEquals(
556-
// All streams have a COMPLETE status.
557-
// TODO: change this to INCOMPLETE after we switch the default behavior.
556+
// All streams have an INCOMPLETE status.
558557
mapOf(
559558
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to
560559
StreamSyncSummary(

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ package io.airbyte.integrations.base.destination.typing_deduping
66
import com.google.common.annotations.VisibleForTesting
77
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9+
import io.airbyte.commons.exceptions.ConfigErrorException
910
import io.airbyte.commons.json.Jsons
1011
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1112
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
13+
import io.airbyte.protocol.models.v0.DestinationSyncMode
1214
import java.util.Optional
1315
import java.util.function.Consumer
1416
import org.apache.commons.codec.digest.DigestUtils
@@ -31,6 +33,12 @@ constructor(
3133
if (it.stream.namespace.isNullOrEmpty()) {
3234
it.stream.namespace = defaultNamespace
3335
}
36+
// The refreshes project is the beginning of the end for OVERWRITE syncs.
37+
// The sync mode still exists, but we are fully dependent on min_generation to trigger
38+
// overwrite logic.
39+
if (it.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
40+
it.destinationSyncMode = DestinationSyncMode.APPEND
41+
}
3442
}
3543

3644
// this code is bad and I feel bad
@@ -125,9 +133,10 @@ constructor(
125133
@VisibleForTesting
126134
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
127135
if (stream.generationId == null) {
128-
stream.generationId = 0
129-
stream.minimumGenerationId = 0
130-
stream.syncId = 0
136+
// TODO set platform version
137+
throw ConfigErrorException(
138+
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to X.Y.Z"
139+
)
131140
}
132141
if (
133142
stream.minimumGenerationId != 0.toLong() &&

0 commit comments

Comments
 (0)