4
4
5
5
package io.airbyte.integrations.destination.s3_v2
6
6
7
+ import io.airbyte.cdk.load.command.Append
8
+ import io.airbyte.cdk.load.command.DestinationStream
7
9
import io.airbyte.cdk.load.command.aws.asMicronautProperties
10
+ import io.airbyte.cdk.load.data.*
8
11
import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper
12
+ import io.airbyte.cdk.load.message.InputRecord
9
13
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
10
14
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
15
+ import io.airbyte.cdk.load.test.util.OutputRecord
11
16
import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper
17
+ import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException
12
18
import io.airbyte.cdk.load.write.AllTypesBehavior
13
19
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
14
20
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
15
21
import io.airbyte.cdk.load.write.StronglyTyped
16
22
import io.airbyte.cdk.load.write.UnionBehavior
17
23
import io.airbyte.cdk.load.write.Untyped
18
24
import java.util.concurrent.TimeUnit
25
+ import org.junit.jupiter.api.Assumptions
19
26
import org.junit.jupiter.api.Disabled
20
27
import org.junit.jupiter.api.Test
21
28
import org.junit.jupiter.api.Timeout
29
+ import org.junit.jupiter.api.assertThrows
22
30
23
31
@Timeout(60 , unit = TimeUnit .MINUTES )
24
32
abstract class S3V2WriteTest (
@@ -34,6 +42,8 @@ abstract class S3V2WriteTest(
34
42
allTypesBehavior : AllTypesBehavior ,
35
43
nullEqualsUnset : Boolean = false ,
36
44
nullUnknownTypes : Boolean = false ,
45
+ envVars : Map <String , String > = emptyMap(),
46
+ private val mergesUnions : Boolean = false
37
47
) :
38
48
BasicFunctionalityIntegrationTest (
39
49
S3V2TestUtils .getConfig(path),
@@ -67,6 +77,233 @@ abstract class S3V2WriteTest(
67
77
override fun testBasicWriteFile () {
68
78
super .testBasicWriteFile()
69
79
}
80
+
81
+ @Test
82
+ fun testMergeUnions () {
83
+ Assumptions .assumeTrue(mergesUnions)
84
+ // Avro and parquet merges unions, merging schemas. Validate the behavior by ensuring
85
+ // that fields not matching the schema are dropped.
86
+ val streamName = " stream"
87
+ val stream =
88
+ DestinationStream (
89
+ descriptor = DestinationStream .Descriptor (randomizedNamespace, streamName),
90
+ importType = Append ,
91
+ generationId = 1L ,
92
+ minimumGenerationId = 0L ,
93
+ syncId = 101L ,
94
+ schema =
95
+ ObjectType (
96
+ linkedMapOf(
97
+ " id" to FieldType (IntegerType , nullable = true ),
98
+ " union_of_objects" to
99
+ FieldType (
100
+ type =
101
+ UnionType .of(
102
+ ObjectType (
103
+ linkedMapOf(
104
+ " field1" to FieldType (StringType , true ),
105
+ " field2" to FieldType (IntegerType , true ),
106
+ " field4" to FieldType (StringType , true )
107
+ )
108
+ ),
109
+ ObjectType (
110
+ linkedMapOf(
111
+ " field1" to
112
+ FieldType (
113
+ StringType ,
114
+ true
115
+ ), // merges to String
116
+ " field3" to FieldType (NumberType , true ),
117
+ " field4" to
118
+ FieldType (
119
+ BooleanType ,
120
+ true
121
+ ) // merges to String|Boolean
122
+ )
123
+ )
124
+ ),
125
+ nullable = true
126
+ )
127
+ )
128
+ )
129
+ )
130
+ runSync(
131
+ configContents,
132
+ stream,
133
+ listOf (
134
+ """ {"id": 1, "union_of_objects": {"field1": "a", "field2": 1, "field3": 3.14, "field4": "boo", "field5": "extra"}}""" ,
135
+ """ {"id": 2, "union_of_objects": {"field1": "b", "field2": 2, "field3": 2.71, "field4": true, "field5": "extra"}}"""
136
+ )
137
+ .map { InputRecord (randomizedNamespace, streamName, it, 1L ) }
138
+ )
139
+ val field4a: Any =
140
+ if (unionBehavior == UnionBehavior .PROMOTE_TO_OBJECT ) {
141
+ mapOf (" type" to " string" , " string" to " boo" )
142
+ } else {
143
+ " boo"
144
+ }
145
+ val field4b: Any =
146
+ if (unionBehavior == UnionBehavior .PROMOTE_TO_OBJECT ) {
147
+ mapOf (" type" to " boolean" , " boolean" to true )
148
+ } else {
149
+ true
150
+ }
151
+ dumpAndDiffRecords(
152
+ config = parsedConfig,
153
+ canonicalExpectedRecords =
154
+ listOf (
155
+ mapOf (
156
+ " id" to 1 ,
157
+ " union_of_objects" to
158
+ mutableMapOf (
159
+ " field1" to " a" ,
160
+ " field2" to 1 ,
161
+ " field3" to 3.14 ,
162
+ " field4" to field4a
163
+ )
164
+ ),
165
+ mapOf (
166
+ " id" to 2 ,
167
+ " union_of_objects" to
168
+ mapOf (
169
+ " field1" to " b" ,
170
+ " field2" to 2 ,
171
+ " field3" to 2.71 ,
172
+ " field4" to field4b
173
+ )
174
+ )
175
+ )
176
+ .map {
177
+ OutputRecord (
178
+ extractedAt = 1L ,
179
+ generationId = 1L ,
180
+ data = it,
181
+ airbyteMeta = OutputRecord .Meta (syncId = 101L )
182
+ )
183
+ },
184
+ stream,
185
+ primaryKey = listOf (listOf (" id" )),
186
+ cursor = listOf (" id" )
187
+ )
188
+ }
189
+
190
+ @Test
191
+ fun conflictingTypesInMappedUnions () {
192
+ Assumptions .assumeTrue(unionBehavior == UnionBehavior .PROMOTE_TO_OBJECT )
193
+ val stream =
194
+ DestinationStream (
195
+ descriptor = DestinationStream .Descriptor (randomizedNamespace, " stream" ),
196
+ importType = Append ,
197
+ generationId = 1L ,
198
+ minimumGenerationId = 0L ,
199
+ syncId = 101L ,
200
+ schema =
201
+ ObjectType (
202
+ linkedMapOf(
203
+ " id" to FieldType (IntegerType , nullable = true ),
204
+ " union_of_objects" to
205
+ FieldType (
206
+ type =
207
+ UnionType .of(
208
+ ObjectType (
209
+ linkedMapOf(
210
+ " field1" to FieldType (StringType , true ),
211
+ )
212
+ ),
213
+ ObjectTypeWithoutSchema
214
+ ),
215
+ nullable = true
216
+ )
217
+ )
218
+ )
219
+ )
220
+
221
+ assertThrows<DestinationUncleanExitException > {
222
+ runSync(
223
+ configContents,
224
+ stream,
225
+ listOf (
226
+ """ {"id": 1, "union_of_objects": {"field1": "a"}}""" ,
227
+ """ {"id": 2, "union_of_objects": {"undeclared": "field"}}"""
228
+ )
229
+ .map { InputRecord (randomizedNamespace, " stream" , it, 1L ) }
230
+ )
231
+ }
232
+ }
233
+
234
+ @Test
235
+ fun testMappableTypesNestedInUnions () {
236
+ // Avro and parquet both merge unions and map complex types to other types. Validate
237
+ // that the behavior still works as expected when nested within a union.
238
+ Assumptions .assumeTrue(mergesUnions)
239
+ val stream =
240
+ DestinationStream (
241
+ descriptor = DestinationStream .Descriptor (randomizedNamespace, " stream" ),
242
+ importType = Append ,
243
+ generationId = 1L ,
244
+ minimumGenerationId = 0L ,
245
+ syncId = 101L ,
246
+ schema =
247
+ ObjectType (
248
+ linkedMapOf(
249
+ " id" to FieldType (IntegerType , nullable = true ),
250
+ " union_of_objects" to
251
+ FieldType (
252
+ type =
253
+ UnionType .of(
254
+ ObjectType (
255
+ linkedMapOf(
256
+ " field1" to FieldType (StringType , true ),
257
+ " field2" to
258
+ FieldType (
259
+ ObjectType (
260
+ linkedMapOf(
261
+ " nested_schemaless" to
262
+ FieldType (
263
+ ObjectTypeWithoutSchema ,
264
+ true
265
+ ),
266
+ " nested_union" to
267
+ FieldType (
268
+ UnionType .of(
269
+ StringType ,
270
+ BooleanType
271
+ ),
272
+ true
273
+ )
274
+ )
275
+ ),
276
+ true
277
+ )
278
+ )
279
+ ),
280
+ StringType
281
+ ),
282
+ nullable = true
283
+ ),
284
+ )
285
+ )
286
+ )
287
+
288
+ val expectedRecords =
289
+ if (unionBehavior == UnionBehavior .PROMOTE_TO_OBJECT ) {
290
+ listOf (
291
+ """ {"id": 1, "union_of_objects": {"field1": "a", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": {"type": "string", "string": "string"}}}}""" ,
292
+ """ {"id": 2, "union_of_objects": {"field1": "b", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": {"type": "boolean", "boolean": true}}}}"""
293
+ )
294
+ } else {
295
+ listOf (
296
+ """ {"id": 1, "union_of_objects": {"field1": "a", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": "string"}}}""" ,
297
+ """ {"id": 2, "union_of_objects": {"field1": "b", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": true}}}"""
298
+ )
299
+ }
300
+
301
+ runSync(
302
+ configContents,
303
+ stream,
304
+ expectedRecords.map { InputRecord (randomizedNamespace, " stream" , it, 1L ) }
305
+ )
306
+ }
70
307
}
71
308
72
309
class S3V2WriteTestJsonUncompressed :
@@ -160,17 +397,8 @@ class S3V2WriteTestAvroUncompressed :
160
397
allTypesBehavior = StronglyTyped (integerCanBeLarge = false ),
161
398
nullEqualsUnset = true ,
162
399
nullUnknownTypes = true ,
163
- ) {
164
- @Test
165
- override fun testUnknownTypes () {
166
- super .testUnknownTypes()
167
- }
168
-
169
- @Test
170
- override fun testFunkyCharacters () {
171
- super .testFunkyCharacters()
172
- }
173
- }
400
+ mergesUnions = true
401
+ )
174
402
175
403
class S3V2WriteTestAvroBzip2 :
176
404
S3V2WriteTest (
@@ -184,6 +412,7 @@ class S3V2WriteTestAvroBzip2 :
184
412
allTypesBehavior = StronglyTyped (integerCanBeLarge = false ),
185
413
nullEqualsUnset = true ,
186
414
nullUnknownTypes = true ,
415
+ mergesUnions = true
187
416
)
188
417
189
418
class S3V2WriteTestParquetUncompressed :
@@ -198,6 +427,7 @@ class S3V2WriteTestParquetUncompressed :
198
427
allTypesBehavior = StronglyTyped (integerCanBeLarge = false ),
199
428
nullEqualsUnset = true ,
200
429
nullUnknownTypes = true ,
430
+ mergesUnions = true
201
431
)
202
432
203
433
class S3V2WriteTestParquetSnappy :
@@ -212,6 +442,7 @@ class S3V2WriteTestParquetSnappy :
212
442
allTypesBehavior = StronglyTyped (integerCanBeLarge = false ),
213
443
nullEqualsUnset = true ,
214
444
nullUnknownTypes = true ,
445
+ mergesUnions = true
215
446
)
216
447
217
448
class S3V2WriteTestEndpointURL :
0 commit comments