4
4
5
5
package io.airbyte.cdk.integrations.destination.async
6
6
7
- import com.google.common.annotations.VisibleForTesting
8
7
import com.google.common.base.Preconditions
9
8
import com.google.common.base.Strings
10
9
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
11
10
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
12
11
import io.airbyte.cdk.integrations.destination.async.buffers.BufferEnqueue
13
12
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
14
- import io.airbyte.cdk.integrations.destination.async.deser.DeserializationUtil
15
- import io.airbyte.cdk.integrations.destination.async.deser.IdentityDataTransformer
16
- import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
13
+ import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
17
14
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
18
15
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
19
16
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure
@@ -44,26 +41,23 @@ private val logger = KotlinLogging.logger {}
44
41
* memory limit governed by [GlobalMemoryManager]. Record writing is decoupled via [FlushWorkers].
45
42
* See the other linked class for more detail.
46
43
*/
47
- class AsyncStreamConsumer
48
- @VisibleForTesting
49
- constructor (
44
+ class AsyncStreamConsumer (
50
45
outputRecordCollector : Consumer <AirbyteMessage >,
51
46
private val onStart : OnStartFunction ,
52
47
private val onClose : OnCloseFunction ,
53
- flusher : DestinationFlushFunction ,
48
+ onFlush : DestinationFlushFunction ,
54
49
private val catalog : ConfiguredAirbyteCatalog ,
55
50
private val bufferManager : BufferManager ,
56
- private val flushFailure: FlushFailure ,
57
51
private val defaultNamespace : Optional <String >,
58
- workerPool : ExecutorService ,
59
- private val dataTransformer : StreamAwareDataTransformer ,
60
- private val deserializationUtil : DeserializationUtil ,
52
+ private val flushFailure : FlushFailure = FlushFailure () ,
53
+ workerPool : ExecutorService = Executors .newFixedThreadPool(5) ,
54
+ private val airbyteMessageDeserializer : AirbyteMessageDeserializer ,
61
55
) : SerializedAirbyteMessageConsumer {
62
56
private val bufferEnqueue: BufferEnqueue = bufferManager.bufferEnqueue
63
57
private val flushWorkers: FlushWorkers =
64
58
FlushWorkers (
65
59
bufferManager.bufferDequeue,
66
- flusher ,
60
+ onFlush ,
67
61
outputRecordCollector,
68
62
flushFailure,
69
63
bufferManager.stateManager,
@@ -81,73 +75,7 @@ constructor(
81
75
private var hasClosed = false
82
76
private var hasFailed = false
83
77
84
- constructor (
85
- outputRecordCollector: Consumer <AirbyteMessage >,
86
- onStart: OnStartFunction ,
87
- onClose: OnCloseFunction ,
88
- flusher: DestinationFlushFunction ,
89
- catalog: ConfiguredAirbyteCatalog ,
90
- bufferManager: BufferManager ,
91
- defaultNamespace: Optional <String >,
92
- ) : this (
93
- outputRecordCollector,
94
- onStart,
95
- onClose,
96
- flusher,
97
- catalog,
98
- bufferManager,
99
- FlushFailure (),
100
- defaultNamespace,
101
- )
102
-
103
- constructor (
104
- outputRecordCollector: Consumer <AirbyteMessage >,
105
- onStart: OnStartFunction ,
106
- onClose: OnCloseFunction ,
107
- flusher: DestinationFlushFunction ,
108
- catalog: ConfiguredAirbyteCatalog ,
109
- bufferManager: BufferManager ,
110
- defaultNamespace: Optional <String >,
111
- dataTransformer: StreamAwareDataTransformer ,
112
- ) : this (
113
- outputRecordCollector,
114
- onStart,
115
- onClose,
116
- flusher,
117
- catalog,
118
- bufferManager,
119
- FlushFailure (),
120
- defaultNamespace,
121
- Executors .newFixedThreadPool(5 ),
122
- dataTransformer,
123
- DeserializationUtil (),
124
- )
125
-
126
- constructor (
127
- outputRecordCollector: Consumer <AirbyteMessage >,
128
- onStart: OnStartFunction ,
129
- onClose: OnCloseFunction ,
130
- flusher: DestinationFlushFunction ,
131
- catalog: ConfiguredAirbyteCatalog ,
132
- bufferManager: BufferManager ,
133
- defaultNamespace: Optional <String >,
134
- workerPool: ExecutorService ,
135
- ) : this (
136
- outputRecordCollector,
137
- onStart,
138
- onClose,
139
- flusher,
140
- catalog,
141
- bufferManager,
142
- FlushFailure (),
143
- defaultNamespace,
144
- workerPool,
145
- IdentityDataTransformer (),
146
- DeserializationUtil (),
147
- )
148
-
149
- @VisibleForTesting
150
- constructor (
78
+ internal constructor (
151
79
outputRecordCollector: Consumer <AirbyteMessage >,
152
80
onStart: OnStartFunction ,
153
81
onClose: OnCloseFunction ,
@@ -163,11 +91,10 @@ constructor(
163
91
flusher,
164
92
catalog,
165
93
bufferManager,
166
- flushFailure,
167
94
defaultNamespace,
95
+ flushFailure,
168
96
Executors .newFixedThreadPool(5 ),
169
- IdentityDataTransformer (),
170
- DeserializationUtil (),
97
+ AirbyteMessageDeserializer (),
171
98
)
172
99
173
100
@Throws(Exception ::class )
@@ -183,7 +110,7 @@ constructor(
183
110
184
111
@Throws(Exception ::class )
185
112
override fun accept (
186
- messageString : String ,
113
+ message : String ,
187
114
sizeInBytes : Int ,
188
115
) {
189
116
Preconditions .checkState(hasStarted, " Cannot accept records until consumer has started" )
@@ -193,21 +120,22 @@ constructor(
193
120
* to try to use a thread pool to partially deserialize to get record type and stream name, we can
194
121
* do it without touching buffer manager.
195
122
*/
196
- val message =
197
- deserializationUtil.deserializeAirbyteMessage(
198
- messageString,
199
- dataTransformer,
123
+ val partialAirbyteMessage =
124
+ airbyteMessageDeserializer.deserializeAirbyteMessage(
125
+ message,
200
126
)
201
- if (AirbyteMessage .Type .RECORD == message .type) {
202
- if (Strings .isNullOrEmpty(message .record?.namespace)) {
203
- message .record?.namespace = defaultNamespace.getOrNull()
127
+ if (AirbyteMessage .Type .RECORD == partialAirbyteMessage .type) {
128
+ if (Strings .isNullOrEmpty(partialAirbyteMessage .record?.namespace)) {
129
+ partialAirbyteMessage .record?.namespace = defaultNamespace.getOrNull()
204
130
}
205
- validateRecord(message )
131
+ validateRecord(partialAirbyteMessage )
206
132
207
- message.record?.streamDescriptor?.let { getRecordCounter(it).incrementAndGet() }
133
+ partialAirbyteMessage.record?.streamDescriptor?.let {
134
+ getRecordCounter(it).incrementAndGet()
135
+ }
208
136
}
209
137
bufferEnqueue.addRecord(
210
- message ,
138
+ partialAirbyteMessage ,
211
139
sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES ,
212
140
defaultNamespace,
213
141
)
0 commit comments