Skip to content

Destination S3 (avro/parquet format): handle empty schemas #44933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ class JsonRecordAvroPreprocessor : JsonRecordIdentityMapper() {
override fun mapArrayWithoutItems(record: JsonNode?, schema: ObjectNode): JsonNode? {
return serializeToJsonNode(record)
}

override fun mapJson(record: JsonNode?, schema: ObjectNode): JsonNode? {
return serializeToJsonNode(record)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ class JsonSchemaAvroPreprocessor : JsonSchemaIdentityMapper() {

return super.mapArrayWithItem(schema)
}

override fun mapJson(schema: ObjectNode): ObjectNode {
return STRING_TYPE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ enum class AirbyteJsonSchemaType {
OBJECT_WITHOUT_PROPERTIES,
OBJECT_WITH_PROPERTIES,
UNION,
COMBINED;
COMBINED,
JSON;

fun matchesValue(tree: JsonNode): Boolean {
return when (this) {
Expand All @@ -54,6 +55,7 @@ enum class AirbyteJsonSchemaType {
OBJECT_WITH_PROPERTIES -> tree.isObject
UNION,
COMBINED -> throw IllegalArgumentException("Union type cannot be matched")
JSON -> true
}
}

Expand Down Expand Up @@ -175,7 +177,7 @@ enum class AirbyteJsonSchemaType {
// Usually the root node
return OBJECT_WITH_PROPERTIES
} else {
throw IllegalArgumentException("Unspecified schema type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be explicit about the fact that we're matching an empty {}? Or do you want to use this as a catch-all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more a catchall. So, I guess I should rename it to unknown in that case 🤔

return JSON
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,8 @@ open class JsonRecordIdentityMapper : JsonRecordMapper<JsonNode?>() {
val match = AirbyteJsonSchemaType.getMatchingValueForType(record, options)
return mapRecordWithSchema(record, match)
}

override fun mapJson(record: JsonNode?, schema: ObjectNode): JsonNode? {
return record?.deepCopy()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ abstract class JsonRecordMapper<R> {
AirbyteJsonSchemaType.OBJECT_WITH_PROPERTIES -> mapObjectWithProperties(record, schema)
AirbyteJsonSchemaType.UNION -> mapUnion(record, schema)
AirbyteJsonSchemaType.COMBINED -> mapCombined(record, schema)
AirbyteJsonSchemaType.JSON -> mapJson(record, schema)
}
}

Expand All @@ -53,4 +54,5 @@ abstract class JsonRecordMapper<R> {
abstract fun mapObjectWithProperties(record: JsonNode?, schema: ObjectNode): R
abstract fun mapUnion(record: JsonNode?, schema: ObjectNode): R
abstract fun mapCombined(record: JsonNode?, schema: ObjectNode): R
abstract fun mapJson(record: JsonNode?, schema: ObjectNode): R
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.s3.jsonschema

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons

open class JsonSchemaIdentityMapper : JsonSchemaMapper() {
private fun makeType(
Expand Down Expand Up @@ -146,4 +147,8 @@ open class JsonSchemaIdentityMapper : JsonSchemaMapper() {

return newUnionSchema
}

override fun mapJson(schema: ObjectNode): ObjectNode {
return Jsons.emptyObject() as ObjectNode
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ abstract class JsonSchemaMapper {
AirbyteJsonSchemaType.NUMBER -> mapNumber(schema)
AirbyteJsonSchemaType.COMBINED -> mapCombined(schema)
AirbyteJsonSchemaType.UNION -> mapUnion(schema)
AirbyteJsonSchemaType.JSON -> mapJson(schema)
}
}

Expand All @@ -50,4 +51,5 @@ abstract class JsonSchemaMapper {
abstract fun mapNumber(schema: ObjectNode): ObjectNode
abstract fun mapCombined(schema: ObjectNode): ObjectNode
abstract fun mapUnion(schema: ObjectNode): ObjectNode
abstract fun mapJson(schema: ObjectNode): ObjectNode
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class JsonSchemaParquetPreprocessor : JsonSchemaIdentityMapper() {
AirbyteJsonSchemaType.UNION,
AirbyteJsonSchemaType.COMBINED ->
throw IllegalStateException("Nested unions are not supported")
// TODO is this true?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true in the sense that Parquet doesn't have a Json type per se. It has a json-logical type that can be applied to strings, which we don't use, because there's no way to represent it in the Avro schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. I'll update the comment to reflect that

AirbyteJsonSchemaType.JSON ->
throw IllegalStateException(
"JSON fields should be converted to string upstream of this processor"
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityMapper
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonRecordParquetPreprocessor
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -162,4 +163,50 @@ class JsonRecordTransformerTest {
Assertions.assertEquals(jsonExpectedOut[index], transformedRecord)
}
}

@Test
fun testStringifyJsonValues() {
val inputSchema =
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {},
"bar": {
"type": "array",
"items": {}
}
}
}
""".trimIndent()
) as ObjectNode
val inputRecord =
Jsons.deserialize(
"""
{
"foo": {"a": 42},
"bar": [1, null, {}]
}
""".trimIndent()
)

val avroMappedRecord =
JsonRecordAvroPreprocessor().mapRecordWithSchema(inputRecord, inputSchema)

Assertions.assertEquals(
Jsons.deserialize(
// The "foo" field is completely serialized
// and the "bar" field has each entry serialized,
// except for the null entry, which is unchanged.
"""
{
"foo": "{\"a\":42}",
"bar": ["1", null, "{}"]
}
""".trimIndent()
),
avroMappedRecord
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaIdentityM
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaUnionMerger
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonSchemaParquetPreprocessor
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -139,4 +141,42 @@ class JsonSchemaTransformerTest {
Assertions.assertEquals(nullType, properties["redundant_null"])
Assertions.assertEquals(stringType, properties["combined_null_string"])
}

@Test
fun testJsonType() {
val inputSchema =
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {},
"bar": {
"type": "array",
"items": {}
}
}
}
""".trimIndent()
) as ObjectNode
val mapped = JsonSchemaAvroPreprocessor().mapSchema(inputSchema)

assertEquals(
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {"type": "string"},
"bar": {
"type": "array",
"items": {"type": "string"}
}
}
}
""".trimIndent()
),
mapped
)
}
}
Loading