Skip to content

Commit 9cb852a

Browse files
committed
set default namespace in CatalogParser
1 parent e0225c1 commit 9cb852a

File tree

16 files changed

+129
-185
lines changed

16 files changed

+129
-185
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+10-42
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.integrations.destination.async
66

77
import com.google.common.base.Preconditions
8-
import com.google.common.base.Strings
98
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
109
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
1110
import io.airbyte.cdk.integrations.destination.async.buffers.BufferEnqueue
@@ -28,8 +27,6 @@ import java.util.concurrent.ExecutorService
2827
import java.util.concurrent.Executors
2928
import java.util.concurrent.atomic.AtomicLong
3029
import java.util.function.Consumer
31-
import java.util.stream.Collectors
32-
import kotlin.jvm.optionals.getOrNull
3330
import org.jetbrains.annotations.VisibleForTesting
3431

3532
private val logger = KotlinLogging.logger {}
@@ -52,7 +49,7 @@ constructor(
5249
onFlush: DestinationFlushFunction,
5350
private val catalog: ConfiguredAirbyteCatalog,
5451
private val bufferManager: BufferManager,
55-
private val defaultNamespace: Optional<String>,
52+
private val defaultNamespace: String,
5653
private val flushFailure: FlushFailure = FlushFailure(),
5754
workerPool: ExecutorService = Executors.newFixedThreadPool(5),
5855
private val airbyteMessageDeserializer: AirbyteMessageDeserializer =
@@ -80,28 +77,6 @@ constructor(
8077
private var hasClosed = false
8178
private var hasFailed = false
8279

83-
internal constructor(
84-
outputRecordCollector: Consumer<AirbyteMessage>,
85-
onStart: OnStartFunction,
86-
onClose: OnCloseFunction,
87-
flusher: DestinationFlushFunction,
88-
catalog: ConfiguredAirbyteCatalog,
89-
bufferManager: BufferManager,
90-
flushFailure: FlushFailure,
91-
defaultNamespace: Optional<String>,
92-
) : this(
93-
outputRecordCollector,
94-
onStart,
95-
onClose,
96-
flusher,
97-
catalog,
98-
bufferManager,
99-
defaultNamespace,
100-
flushFailure,
101-
Executors.newFixedThreadPool(5),
102-
AirbyteMessageDeserializer(),
103-
)
104-
10580
@Throws(Exception::class)
10681
override fun start() {
10782
Preconditions.checkState(!hasStarted, "Consumer has already been started.")
@@ -130,9 +105,6 @@ constructor(
130105
message,
131106
)
132107
if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) {
133-
if (Strings.isNullOrEmpty(partialAirbyteMessage.record?.namespace)) {
134-
partialAirbyteMessage.record?.namespace = defaultNamespace.getOrNull()
135-
}
136108
validateRecord(partialAirbyteMessage)
137109

138110
partialAirbyteMessage.record?.streamDescriptor?.let {
@@ -142,7 +114,6 @@ constructor(
142114
bufferEnqueue.addRecord(
143115
partialAirbyteMessage,
144116
sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES,
145-
defaultNamespace,
146117
)
147118
}
148119

@@ -160,18 +131,15 @@ constructor(
160131
bufferManager.close()
161132

162133
val streamSyncSummaries =
163-
streamNames
164-
.stream()
165-
.collect(
166-
Collectors.toMap(
167-
{ streamDescriptor: StreamDescriptor -> streamDescriptor },
168-
{ streamDescriptor: StreamDescriptor ->
169-
StreamSyncSummary(
170-
Optional.of(getRecordCounter(streamDescriptor).get()),
171-
)
172-
},
173-
),
174-
)
134+
streamNames.associate { streamDescriptor ->
135+
StreamDescriptorUtils.withDefaultNamespace(
136+
streamDescriptor,
137+
defaultNamespace,
138+
) to
139+
StreamSyncSummary(
140+
Optional.of(getRecordCounter(streamDescriptor).get()),
141+
)
142+
}
175143
onClose.accept(hasFailed, streamSyncSummaries)
176144

177145
// as this throws an exception, we need to be after all other close functions.

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt

+7
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ object StreamDescriptorUtils {
3434

3535
return pairs
3636
}
37+
38+
fun withDefaultNamespace(sd: StreamDescriptor, defaultNamespace: String) =
39+
if (sd.namespace.isNullOrEmpty()) {
40+
StreamDescriptor().withName(sd.name).withNamespace(defaultNamespace)
41+
} else {
42+
sd
43+
}
3744
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt

+24-5
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ package io.airbyte.cdk.integrations.destination.async.buffers
66

77
import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager
88
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
9+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
910
import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager
1011
import io.airbyte.protocol.models.v0.AirbyteMessage
1112
import io.airbyte.protocol.models.v0.StreamDescriptor
12-
import java.util.Optional
1313
import java.util.concurrent.ConcurrentMap
1414

1515
/**
@@ -20,6 +20,7 @@ class BufferEnqueue(
2020
private val memoryManager: GlobalMemoryManager,
2121
private val buffers: ConcurrentMap<StreamDescriptor, StreamAwareQueue>,
2222
private val stateManager: GlobalAsyncStateManager,
23+
private val defaultNamespace: String,
2324
) {
2425
/**
2526
* Buffer a record. Contains memory management logic to dynamically adjust queue size based via
@@ -31,12 +32,11 @@ class BufferEnqueue(
3132
fun addRecord(
3233
message: PartialAirbyteMessage,
3334
sizeInBytes: Int,
34-
defaultNamespace: Optional<String>,
3535
) {
3636
if (message.type == AirbyteMessage.Type.RECORD) {
3737
handleRecord(message, sizeInBytes)
3838
} else if (message.type == AirbyteMessage.Type.STATE) {
39-
stateManager.trackState(message, sizeInBytes.toLong(), defaultNamespace.orElse(""))
39+
stateManager.trackState(message, sizeInBytes.toLong())
4040
}
4141
}
4242

@@ -53,15 +53,34 @@ class BufferEnqueue(
5353
}
5454
val stateId = stateManager.getStateIdAndIncrementCounter(streamDescriptor)
5555

56-
var addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId)
56+
// We don't set the default namespace until after putting this message into the state
57+
// manager/etc.
58+
// All our internal handling is on the true (null) namespace,
59+
// we just set the default namespace when handing off to destination-specific code.
60+
val mangledMessage =
61+
if (message.record!!.namespace.isNullOrEmpty()) {
62+
PartialAirbyteMessage()
63+
.withRecord(
64+
PartialAirbyteRecordMessage()
65+
.withData(message.record!!.data)
66+
.withEmittedAt(message.record!!.emittedAt)
67+
.withMeta(message.record!!.meta)
68+
.withStream(message.record!!.stream)
69+
.withNamespace(defaultNamespace),
70+
)
71+
} else {
72+
message
73+
}
74+
75+
var addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId)
5776

5877
var i = 0
5978
while (!addedToQueue) {
6079
val newlyAllocatedMemory = memoryManager.requestMemory()
6180
if (newlyAllocatedMemory > 0) {
6281
queue.addMaxMemory(newlyAllocatedMemory)
6382
}
64-
addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId)
83+
addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId)
6584
i++
6685
if (i > 5) {
6786
try {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ private val logger = KotlinLogging.logger {}
2222
class BufferManager
2323
@JvmOverloads
2424
constructor(
25+
defaultNamespace: String,
2526
maxMemory: Long = (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO).toLong(),
2627
) {
2728
@get:VisibleForTesting val buffers: ConcurrentMap<StreamDescriptor, StreamAwareQueue>
@@ -46,7 +47,7 @@ constructor(
4647
memoryManager = GlobalMemoryManager(maxMemory)
4748
this.stateManager = GlobalAsyncStateManager(memoryManager)
4849
buffers = ConcurrentHashMap()
49-
bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager)
50+
bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager, defaultNamespace)
5051
bufferDequeue = BufferDequeue(memoryManager, buffers, stateManager)
5152
debugLoop = Executors.newSingleThreadScheduledExecutor()
5253
debugLoop.scheduleAtFixedRate(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+2-29
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.integrations.destination.async.state
66

77
import com.google.common.base.Preconditions
8-
import com.google.common.base.Strings
98
import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager
109
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
1110
import io.airbyte.commons.json.Jsons
@@ -104,7 +103,6 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
104103
fun trackState(
105104
message: PartialAirbyteMessage,
106105
sizeInBytes: Long,
107-
defaultNamespace: String,
108106
) {
109107
if (preState) {
110108
convertToGlobalIfNeeded(message)
@@ -113,7 +111,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
113111
// stateType should not change after a conversion.
114112
Preconditions.checkArgument(stateType == extractStateType(message))
115113

116-
closeState(message, sizeInBytes, defaultNamespace)
114+
closeState(message, sizeInBytes)
117115
}
118116

119117
/**
@@ -333,10 +331,9 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
333331
private fun closeState(
334332
message: PartialAirbyteMessage,
335333
sizeInBytes: Long,
336-
defaultNamespace: String,
337334
) {
338335
val resolvedDescriptor: StreamDescriptor =
339-
extractStream(message, defaultNamespace)
336+
extractStream(message)
340337
.orElse(
341338
SENTINEL_GLOBAL_DESC,
342339
)
@@ -434,38 +431,14 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
434431
UUID.randomUUID().toString(),
435432
)
436433

437-
/**
438-
* If the user has selected the Destination Namespace as the Destination default while
439-
* setting up the connector, the platform sets the namespace as null in the StreamDescriptor
440-
* in the AirbyteMessages (both record and state messages). The destination checks that if
441-
* the namespace is empty or null, if yes then re-populates it with the defaultNamespace.
442-
* See [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept] But
443-
* destination only does this for the record messages. So when state messages arrive without
444-
* a namespace and since the destination doesn't repopulate it with the default namespace,
445-
* there is a mismatch between the StreamDescriptor from record messages and state messages.
446-
* That breaks the logic of the state management class as [descToStateIdQ] needs to have
447-
* consistent StreamDescriptor. This is why while trying to extract the StreamDescriptor
448-
* from state messages, we check if the namespace is null, if yes then replace it with
449-
* defaultNamespace to keep it consistent with the record messages.
450-
*/
451434
private fun extractStream(
452435
message: PartialAirbyteMessage,
453-
defaultNamespace: String,
454436
): Optional<StreamDescriptor> {
455437
if (
456438
message.state?.type != null &&
457439
message.state?.type == AirbyteStateMessage.AirbyteStateType.STREAM
458440
) {
459441
val streamDescriptor: StreamDescriptor? = message.state?.stream?.streamDescriptor
460-
if (Strings.isNullOrEmpty(streamDescriptor?.namespace)) {
461-
return Optional.of(
462-
StreamDescriptor()
463-
.withName(
464-
streamDescriptor?.name,
465-
)
466-
.withNamespace(defaultNamespace),
467-
)
468-
}
469442
return streamDescriptor?.let { Optional.of(it) } ?: Optional.empty()
470443
}
471444
return Optional.empty()

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt

-24
This file was deleted.

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
3131
import java.io.IOException
3232
import java.math.BigDecimal
3333
import java.time.Instant
34-
import java.util.Optional
3534
import java.util.concurrent.Executors
3635
import java.util.concurrent.TimeUnit
3736
import java.util.concurrent.TimeoutException
@@ -61,7 +60,7 @@ class AsyncStreamConsumerTest {
6160
private val CATALOG: ConfiguredAirbyteCatalog =
6261
ConfiguredAirbyteCatalog()
6362
.withStreams(
64-
java.util.List.of(
63+
listOf(
6564
CatalogHelpers.createConfiguredAirbyteStream(
6665
STREAM_NAME,
6766
SCHEMA_NAME,
@@ -146,9 +145,9 @@ class AsyncStreamConsumerTest {
146145
onClose = onClose,
147146
onFlush = flushFunction,
148147
catalog = CATALOG,
149-
bufferManager = BufferManager(),
148+
bufferManager = BufferManager("default_ns"),
150149
flushFailure = flushFailure,
151-
defaultNamespace = Optional.of("default_ns"),
150+
defaultNamespace = "default_ns",
152151
airbyteMessageDeserializer = airbyteMessageDeserializer,
153152
workerPool = Executors.newFixedThreadPool(5),
154153
)
@@ -265,9 +264,9 @@ class AsyncStreamConsumerTest {
265264
Mockito.mock(OnCloseFunction::class.java),
266265
flushFunction,
267266
CATALOG,
268-
BufferManager((1024 * 10).toLong()),
267+
BufferManager("default_ns", (1024 * 10).toLong()),
268+
"default_ns",
269269
flushFailure,
270-
Optional.of("default_ns"),
271270
)
272271
Mockito.`when`(flushFunction.optimalBatchSizeBytes).thenReturn(0L)
273272

0 commit comments

Comments
 (0)