5
5
package io.airbyte.cdk.load.data
6
6
7
7
import com.fasterxml.jackson.databind.JsonNode
8
+ import io.airbyte.cdk.load.util.Jsons
9
+ import io.github.oshai.kotlinlogging.KotlinLogging
10
+
11
+ private val logger = KotlinLogging .logger {}
12
+
13
+ sealed class AirbyteType {
14
+ /* *
15
+ * Utility method for database/warehouse destinations, which assume that the top-level schema is
16
+ * an object.
17
+ */
18
+ open fun asColumns (): LinkedHashMap <String , FieldType > {
19
+ return linkedMapOf()
20
+ }
8
21
9
- sealed interface AirbyteType
22
+ open val isObject: Boolean = false
23
+ open val isArray: Boolean = false
24
+ }
10
25
11
- data object StringType : AirbyteType
26
+ data object StringType : AirbyteType ()
12
27
13
- data object BooleanType : AirbyteType
28
+ data object BooleanType : AirbyteType ()
14
29
15
- data object IntegerType : AirbyteType
30
+ data object IntegerType : AirbyteType ()
16
31
17
- data object NumberType : AirbyteType
32
+ data object NumberType : AirbyteType ()
18
33
19
- data object DateType : AirbyteType
34
+ data object DateType : AirbyteType ()
20
35
21
- data object TimestampTypeWithTimezone : AirbyteType
36
+ data object TimestampTypeWithTimezone : AirbyteType ()
22
37
23
- data object TimestampTypeWithoutTimezone : AirbyteType
38
+ data object TimestampTypeWithoutTimezone : AirbyteType ()
24
39
25
- data object TimeTypeWithTimezone : AirbyteType
40
+ data object TimeTypeWithTimezone : AirbyteType ()
26
41
27
- data object TimeTypeWithoutTimezone : AirbyteType
42
+ data object TimeTypeWithoutTimezone : AirbyteType ()
28
43
29
- data class ArrayType (val items : FieldType ) : AirbyteType
44
+ data class ArrayType (val items : FieldType ) : AirbyteType() {
45
+ override val isArray = true
46
+ }
47
+
48
+ data object ArrayTypeWithoutSchema : AirbyteType () {
49
+ override val isArray = true
50
+ }
30
51
31
- data object ArrayTypeWithoutSchema : AirbyteType
52
+ data class ObjectType (val properties : LinkedHashMap <String , FieldType >) : AirbyteType() {
53
+ override fun asColumns (): LinkedHashMap <String , FieldType > {
54
+ return properties
55
+ }
32
56
33
- data class ObjectType (val properties : LinkedHashMap <String , FieldType >) : AirbyteType
57
+ override val isObject = true
58
+ }
34
59
35
- data object ObjectTypeWithEmptySchema : AirbyteType
60
+ data object ObjectTypeWithEmptySchema : AirbyteType () {
61
+ override val isObject = true
62
+ }
36
63
37
- data object ObjectTypeWithoutSchema : AirbyteType
64
+ data object ObjectTypeWithoutSchema : AirbyteType () {
65
+ override val isObject = true
66
+ }
38
67
39
68
data class UnionType (
40
69
val options : Set <AirbyteType >,
41
70
val isLegacyUnion : Boolean ,
42
- ) : AirbyteType {
71
+ ) : AirbyteType() {
72
+ /* *
73
+ * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level
74
+ * schema looks like this, we still want to be able to extract the object properties (i.e. treat
75
+ * it as though the string option didn't exist).
76
+ *
77
+ * @throws IllegalArgumentException if we cannot extract columns from this schema
78
+ */
79
+ override fun asColumns (): LinkedHashMap <String , FieldType > {
80
+ logger.warn { " asColumns options=$options " }
81
+ val numObjectOptions = options.count { it.isObject }
82
+ if (numObjectOptions > 1 ) {
83
+ logger.error { " Can't extract columns from a schema with multiple object options" }
84
+ return LinkedHashMap ()
85
+ }
86
+
87
+ var retVal: LinkedHashMap <String , FieldType >
88
+ try {
89
+ retVal = options.first { it.isObject }.asColumns()
90
+ } catch (_: NoSuchElementException ) {
91
+ logger.error { " Can't extract columns from a schema with no object options" }
92
+ retVal = LinkedHashMap ()
93
+ }
94
+ logger.warn { " Union.asColumns retVal=$retVal " }
95
+ return retVal
96
+ }
97
+
98
+ /* *
99
+ * This matches legacy behavior. Some destinations handle legacy unions by choosing the "best"
100
+ * type from amongst the options. This is... not great, but it would be painful to change.
101
+ */
102
+ fun chooseType (): AirbyteType {
103
+ check(isLegacyUnion) { " Cannot chooseType for a non-legacy union type" }
104
+ if (options.isEmpty()) {
105
+ return UnknownType (Jsons .createObjectNode())
106
+ }
107
+ return options.minBy {
108
+ when (it) {
109
+ is ArrayType ,
110
+ ArrayTypeWithoutSchema -> - 2
111
+ is ObjectType ,
112
+ ObjectTypeWithEmptySchema ,
113
+ ObjectTypeWithoutSchema -> - 1
114
+ StringType -> 0
115
+ DateType -> 1
116
+ TimeTypeWithoutTimezone -> 2
117
+ TimeTypeWithTimezone -> 3
118
+ TimestampTypeWithoutTimezone -> 4
119
+ TimestampTypeWithTimezone -> 5
120
+ NumberType -> 6
121
+ IntegerType -> 7
122
+ BooleanType -> 8
123
+ is UnknownType -> 9
124
+ is UnionType -> Int .MAX_VALUE
125
+ }
126
+ }
127
+ }
128
+
43
129
companion object {
44
130
fun of (options : Set <AirbyteType >, isLegacyUnion : Boolean = false): AirbyteType {
45
131
if (options.size == 1 ) {
@@ -56,6 +142,6 @@ data class UnionType(
56
142
}
57
143
}
58
144
59
- data class UnknownType (val schema : JsonNode ) : AirbyteType
145
+ data class UnknownType (val schema : JsonNode ) : AirbyteType()
60
146
61
147
data class FieldType (val type : AirbyteType , val nullable : Boolean )
0 commit comments