Skip to content

Commit b17df3f

Browse files
Bulk Load CDK: AirbyteType & AirbyteValue, marshaling from json (#45430)
1 parent ace4e82 commit b17df3f

16 files changed

+1195
-34
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ import jakarta.inject.Singleton
1212
* Internal representation of destination streams. This is intended to be a case class specialized
1313
* for usability.
1414
*/
15-
data class DestinationCatalog(
16-
val streams: List<DestinationStream> = emptyList(),
17-
) {
15+
data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()) {
1816
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
1917
streams.associateBy { it.descriptor }
2018

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package io.airbyte.cdk.command
66

7-
import com.fasterxml.jackson.databind.node.ObjectNode
7+
import io.airbyte.cdk.data.AirbyteType
8+
import io.airbyte.cdk.data.AirbyteTypeToJsonSchema
9+
import io.airbyte.cdk.data.JsonSchemaToAirbyteType
810
import io.airbyte.protocol.models.v0.AirbyteStream
911
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1012
import io.airbyte.protocol.models.v0.DestinationSyncMode
@@ -22,7 +24,7 @@ import jakarta.inject.Singleton
2224
data class DestinationStream(
2325
val descriptor: Descriptor,
2426
val importType: ImportType,
25-
val schema: ObjectNode,
27+
val schema: AirbyteType,
2628
val generationId: Long,
2729
val minimumGenerationId: Long,
2830
val syncId: Long,
@@ -44,7 +46,7 @@ data class DestinationStream(
4446
AirbyteStream()
4547
.withNamespace(descriptor.namespace)
4648
.withName(descriptor.name)
47-
.withJsonSchema(schema)
49+
.withJsonSchema(AirbyteTypeToJsonSchema().convert(schema))
4850
)
4951
.withGenerationId(generationId)
5052
.withMinimumGenerationId(minimumGenerationId)
@@ -83,10 +85,10 @@ class DestinationStreamFactory {
8385
DestinationSyncMode.APPEND_DEDUP ->
8486
Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField)
8587
},
86-
schema = stream.stream.jsonSchema as ObjectNode,
8788
generationId = stream.generationId,
8889
minimumGenerationId = stream.minimumGenerationId,
8990
syncId = stream.syncId,
91+
schema = JsonSchemaToAirbyteType().convert(stream.stream.jsonSchema)
9092
)
9193
}
9294
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.data
6+
7+
sealed interface AirbyteType
8+
9+
data object NullType : AirbyteType
10+
11+
data object StringType : AirbyteType
12+
13+
data object BooleanType : AirbyteType
14+
15+
data object IntegerType : AirbyteType
16+
17+
data object NumberType : AirbyteType
18+
19+
data object DateType : AirbyteType
20+
21+
data class TimestampType(val hasTimezone: Boolean) : AirbyteType
22+
23+
data class TimeType(val hasTimezone: Boolean) : AirbyteType
24+
25+
data class ArrayType(val items: FieldType) : AirbyteType
26+
27+
data object ArrayTypeWithoutSchema : AirbyteType
28+
29+
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType
30+
31+
data object ObjectTypeWithEmptySchema : AirbyteType
32+
33+
data object ObjectTypeWithoutSchema : AirbyteType
34+
35+
data class UnionType(val options: List<AirbyteType>) : AirbyteType
36+
37+
data class UnknownType(val what: String) : AirbyteType
38+
39+
data class FieldType(val type: AirbyteType, val nullable: Boolean)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.data
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import com.fasterxml.jackson.databind.node.JsonNodeFactory
9+
import com.fasterxml.jackson.databind.node.ObjectNode
10+
11+
class AirbyteTypeToJsonSchema {
12+
private fun ofType(typeName: String): ObjectNode {
13+
return JsonNodeFactory.instance.objectNode().put("type", typeName)
14+
}
15+
16+
fun convert(airbyteType: AirbyteType): JsonNode {
17+
return when (airbyteType) {
18+
is NullType -> ofType("null")
19+
is StringType -> ofType("string")
20+
is BooleanType -> ofType("boolean")
21+
is IntegerType -> ofType("integer")
22+
is NumberType -> ofType("number")
23+
is ArrayType ->
24+
JsonNodeFactory.instance
25+
.objectNode()
26+
.put("type", "array")
27+
.set("items", fromFieldType(airbyteType.items))
28+
is ArrayTypeWithoutSchema -> ofType("array")
29+
is ObjectType -> {
30+
val objNode = ofType("object")
31+
val properties = objNode.putObject("properties")
32+
airbyteType.properties.forEach { (name, field) ->
33+
properties.replace(name, fromFieldType(field))
34+
}
35+
objNode
36+
}
37+
is ObjectTypeWithoutSchema -> ofType("object")
38+
is ObjectTypeWithEmptySchema -> {
39+
val objectNode = ofType("object")
40+
objectNode.putObject("properties")
41+
objectNode
42+
}
43+
is UnionType -> {
44+
val unionNode = JsonNodeFactory.instance.objectNode()
45+
val unionOptions = unionNode.putArray("oneOf")
46+
airbyteType.options.forEach { unionOptions.add(convert(it)) }
47+
unionNode
48+
}
49+
is DateType -> ofType("string").put("format", "date")
50+
is TimeType -> {
51+
val timeNode = ofType("string").put("format", "time")
52+
if (airbyteType.hasTimezone) {
53+
timeNode.put("airbyte_type", "time_with_timezone")
54+
} else {
55+
timeNode.put("airbyte_type", "time_without_timezone")
56+
}
57+
}
58+
is TimestampType -> {
59+
val timestampNode = ofType("string").put("format", "date-time")
60+
if (airbyteType.hasTimezone) {
61+
timestampNode.put("airbyte_type", "timestamp_with_timezone")
62+
} else {
63+
timestampNode.put("airbyte_type", "timestamp_without_timezone")
64+
}
65+
}
66+
else -> throw IllegalArgumentException("Unknown type: $airbyteType")
67+
}
68+
}
69+
70+
private fun fromFieldType(field: FieldType): JsonNode {
71+
if (field.nullable) {
72+
if (field.type is UnionType) {
73+
return convert(UnionType(options = field.type.options + NullType))
74+
}
75+
return convert(UnionType(options = listOf(field.type, NullType)))
76+
}
77+
return convert(field.type)
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.data
6+
7+
import java.math.BigDecimal
8+
9+
sealed interface AirbyteValue
10+
11+
data object NullValue : AirbyteValue
12+
13+
@JvmInline value class StringValue(val value: String) : AirbyteValue
14+
15+
@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue
16+
17+
@JvmInline value class IntegerValue(val value: Long) : AirbyteValue
18+
19+
@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue
20+
21+
@JvmInline value class DateValue(val value: String) : AirbyteValue
22+
23+
@JvmInline value class TimestampValue(val value: String) : AirbyteValue
24+
25+
@JvmInline value class TimeValue(val value: String) : AirbyteValue
26+
27+
@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue
28+
29+
@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue
30+
31+
@JvmInline value class UnknownValue(val what: String) : AirbyteValue
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.data
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import com.fasterxml.jackson.databind.node.JsonNodeFactory
9+
10+
class AirbyteValueToJson {
11+
fun convert(value: AirbyteValue): JsonNode {
12+
return when (value) {
13+
is ArrayValue ->
14+
JsonNodeFactory.instance.arrayNode().addAll(value.values.map { convert(it) })
15+
is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value)
16+
is DateValue -> JsonNodeFactory.instance.textNode(value.value)
17+
is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value)
18+
is NullValue -> JsonNodeFactory.instance.nullNode()
19+
is NumberValue -> JsonNodeFactory.instance.numberNode(value.value)
20+
is ObjectValue -> {
21+
val objNode = JsonNodeFactory.instance.objectNode()
22+
value.values.forEach { (name, field) -> objNode.replace(name, convert(field)) }
23+
objNode
24+
}
25+
is StringValue -> JsonNodeFactory.instance.textNode(value.value)
26+
is TimeValue -> JsonNodeFactory.instance.textNode(value.value)
27+
is TimestampValue -> JsonNodeFactory.instance.textNode(value.value)
28+
is UnknownValue -> throw IllegalArgumentException("Unknown value: $value")
29+
}
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.data
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import com.fasterxml.jackson.databind.node.JsonNodeFactory
9+
import com.fasterxml.jackson.databind.node.ObjectNode
10+
11+
class JsonSchemaToAirbyteType {
12+
fun convert(schema: JsonNode): AirbyteType {
13+
// try {
14+
if (schema.isObject && schema.has("type")) {
15+
// Normal json object with {"type": ..., ...}
16+
val schemaType = (schema as ObjectNode).get("type")
17+
return if (schemaType.isTextual) {
18+
// {"type": <string>, ...}
19+
when (schema.get("type").asText()) {
20+
"string" -> fromString(schema)
21+
"boolean" -> BooleanType
22+
"integer" -> IntegerType
23+
"number" -> fromNumber(schema)
24+
"array" -> fromArray(schema)
25+
"object" -> fromObject(schema)
26+
"null" -> NullType
27+
else ->
28+
throw IllegalArgumentException(
29+
"Unknown type: ${
30+
schema.get("type").asText()
31+
}"
32+
)
33+
}
34+
} else if (schemaType.isArray) {
35+
// {"type": [...], ...}
36+
unionFromCombinedTypes(schemaType.toList(), schema)
37+
} else {
38+
UnknownType("unspported type for 'type' field: $schemaType")
39+
}
40+
} else if (schema.isObject) {
41+
// {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...}
42+
val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf")
43+
return if (options != null) {
44+
UnionType(options.map { convert(it as ObjectNode) })
45+
} else {
46+
// Default to object if no type and not a union type
47+
convert((schema as ObjectNode).put("type", "object"))
48+
}
49+
} else if (schema.isTextual) {
50+
// "<typename>"
51+
val typeSchema = JsonNodeFactory.instance.objectNode().put("type", schema.asText())
52+
return convert(typeSchema)
53+
} else {
54+
return UnknownType("Unknown schema type: $schema")
55+
}
56+
} // catch (t: Throwable) {
57+
// return UnknownType(t.message ?: "Unknown error")
58+
// }
59+
// }
60+
61+
private fun fromString(schema: ObjectNode): AirbyteType =
62+
when (schema.get("format")?.asText()) {
63+
"date" -> DateType
64+
"time" ->
65+
TimeType(
66+
hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone"
67+
)
68+
"date-time" ->
69+
TimestampType(
70+
hasTimezone =
71+
schema.get("airbyte_type")?.asText() != "timestamp_without_timezone"
72+
)
73+
null -> StringType
74+
else ->
75+
throw IllegalArgumentException(
76+
"Unknown string format: ${
77+
schema.get("format").asText()
78+
}"
79+
)
80+
}
81+
82+
private fun fromNumber(schema: ObjectNode): AirbyteType =
83+
if (schema.get("airbyte_type")?.asText() == "integer") {
84+
IntegerType
85+
} else {
86+
NumberType
87+
}
88+
89+
private fun fromArray(schema: ObjectNode): AirbyteType {
90+
val items = schema.get("items") ?: return ArrayTypeWithoutSchema
91+
if (items.isArray) {
92+
if (items.isEmpty) {
93+
return ArrayTypeWithoutSchema
94+
}
95+
val itemOptions = UnionType(items.map { convert(it) })
96+
return ArrayType(fieldFromUnion(itemOptions))
97+
}
98+
return ArrayType(fieldFromSchema(items as ObjectNode))
99+
}
100+
101+
private fun fromObject(schema: ObjectNode): AirbyteType {
102+
val properties = schema.get("properties") ?: return ObjectTypeWithoutSchema
103+
if (properties.isEmpty) {
104+
return ObjectTypeWithEmptySchema
105+
}
106+
val requiredFields = schema.get("required")?.map { it.asText() } ?: emptyList()
107+
return objectFromProperties(properties as ObjectNode, requiredFields)
108+
}
109+
110+
private fun fieldFromSchema(
111+
fieldSchema: ObjectNode,
112+
onRequiredList: Boolean = false
113+
): FieldType {
114+
val markedRequired = fieldSchema.get("required")?.asBoolean() ?: false
115+
val nullable = !(onRequiredList || markedRequired)
116+
val airbyteType = convert(fieldSchema)
117+
if (airbyteType is UnionType) {
118+
return fieldFromUnion(airbyteType, nullable)
119+
} else {
120+
return FieldType(airbyteType, nullable)
121+
}
122+
}
123+
124+
private fun fieldFromUnion(unionType: UnionType, nullable: Boolean = false): FieldType {
125+
if (unionType.options.contains(NullType)) {
126+
val filtered = unionType.options.filter { it != NullType }
127+
return FieldType(UnionType(filtered), nullable = true)
128+
}
129+
return FieldType(unionType, nullable = nullable)
130+
}
131+
132+
private fun objectFromProperties(schema: ObjectNode, requiredFields: List<String>): ObjectType {
133+
val properties =
134+
schema
135+
.fields()
136+
.asSequence()
137+
.map { (name, node) ->
138+
name to fieldFromSchema(node as ObjectNode, requiredFields.contains(name))
139+
}
140+
.toMap(LinkedHashMap())
141+
return ObjectType(properties)
142+
}
143+
144+
private fun unionFromCombinedTypes(
145+
options: List<JsonNode>,
146+
parentSchema: ObjectNode
147+
): UnionType {
148+
// Denormalize the properties across each type (the converter only checks what matters
149+
// per type).
150+
val unionOptions =
151+
options.map {
152+
if (it.isTextual) {
153+
val schema = parentSchema.deepCopy()
154+
schema.put("type", it.textValue())
155+
convert(schema)
156+
} else {
157+
convert(it)
158+
}
159+
}
160+
return UnionType(unionOptions)
161+
}
162+
}

0 commit comments

Comments
 (0)