Skip to content

Commit 0050274

Browse files
authored
Destination S3 (avro/parquet format): handle empty schemas (#44933)
1 parent 9934979 commit 0050274

File tree

15 files changed

+130
-9
lines changed

15 files changed

+130
-9
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,12 @@ corresponds to that version.
172172

173173
### Java CDK
174174

175-
| Version | Date | Pull Request | Subject |
176-
| :--------- | :--------- | :---------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
177-
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
175+
| Version | Date | Pull Request | Subject |
176+
| :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
177+
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
178+
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
178179
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
179-
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
180+
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
180181
| 0.44.16 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Destinations: add sqlgenerator testing for mixed-case stream name |
181182
| 0.44.15 | ?????????? | [\#?????](https://github.com/airbytehq/airbyte/pull/?????) | ????? |
182183
| 0.44.14 | 2024-08-19 | [\#42503](https://github.com/airbytehq/airbyte/pull/42503) | Destinations (refreshes) - correctly detect existing raw/final table of the correct generation during truncate sync |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.44.19
1+
version=0.44.20

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonRecordAvroPreprocessor.kt

+4
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ class JsonRecordAvroPreprocessor : JsonRecordIdentityMapper() {
2929
override fun mapArrayWithoutItems(record: JsonNode?, schema: ObjectNode): JsonNode? {
3030
return serializeToJsonNode(record)
3131
}
32+
33+
override fun mapUnknown(record: JsonNode?, schema: ObjectNode): JsonNode? {
34+
return serializeToJsonNode(record)
35+
}
3236
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaAvroPreprocessor.kt

+4
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ class JsonSchemaAvroPreprocessor : JsonSchemaIdentityMapper() {
5050

5151
return super.mapArrayWithItem(schema)
5252
}
53+
54+
override fun mapUnknown(schema: ObjectNode): ObjectNode {
55+
return STRING_TYPE
56+
}
5357
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonschema/AirbyteJsonSchemaType.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ enum class AirbyteJsonSchemaType {
2727
OBJECT_WITHOUT_PROPERTIES,
2828
OBJECT_WITH_PROPERTIES,
2929
UNION,
30-
COMBINED;
30+
COMBINED,
31+
UNKNOWN;
3132

3233
fun matchesValue(tree: JsonNode): Boolean {
3334
return when (this) {
@@ -54,6 +55,7 @@ enum class AirbyteJsonSchemaType {
5455
OBJECT_WITH_PROPERTIES -> tree.isObject
5556
UNION,
5657
COMBINED -> throw IllegalArgumentException("Union type cannot be matched")
58+
UNKNOWN -> true
5759
}
5860
}
5961

@@ -175,7 +177,7 @@ enum class AirbyteJsonSchemaType {
175177
// Usually the root node
176178
return OBJECT_WITH_PROPERTIES
177179
} else {
178-
throw IllegalArgumentException("Unspecified schema type")
180+
return UNKNOWN
179181
}
180182
}
181183

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonschema/JsonRecordIdentityMapper.kt

+4
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,8 @@ open class JsonRecordIdentityMapper : JsonRecordMapper<JsonNode?>() {
144144
val match = AirbyteJsonSchemaType.getMatchingValueForType(record, options)
145145
return mapRecordWithSchema(record, match)
146146
}
147+
148+
override fun mapUnknown(record: JsonNode?, schema: ObjectNode): JsonNode? {
149+
return record?.deepCopy()
150+
}
147151
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonschema/JsonRecordMapper.kt

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ abstract class JsonRecordMapper<R> {
3232
AirbyteJsonSchemaType.OBJECT_WITH_PROPERTIES -> mapObjectWithProperties(record, schema)
3333
AirbyteJsonSchemaType.UNION -> mapUnion(record, schema)
3434
AirbyteJsonSchemaType.COMBINED -> mapCombined(record, schema)
35+
AirbyteJsonSchemaType.UNKNOWN -> mapUnknown(record, schema)
3536
}
3637
}
3738

@@ -53,4 +54,5 @@ abstract class JsonRecordMapper<R> {
5354
abstract fun mapObjectWithProperties(record: JsonNode?, schema: ObjectNode): R
5455
abstract fun mapUnion(record: JsonNode?, schema: ObjectNode): R
5556
abstract fun mapCombined(record: JsonNode?, schema: ObjectNode): R
57+
abstract fun mapUnknown(record: JsonNode?, schema: ObjectNode): R
5658
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonschema/JsonSchemaIdentityMapper.kt

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.s3.jsonschema
66

77
import com.fasterxml.jackson.databind.node.ObjectNode
88
import io.airbyte.commons.jackson.MoreMappers
9+
import io.airbyte.commons.json.Jsons
910

1011
open class JsonSchemaIdentityMapper : JsonSchemaMapper() {
1112
private fun makeType(
@@ -146,4 +147,8 @@ open class JsonSchemaIdentityMapper : JsonSchemaMapper() {
146147

147148
return newUnionSchema
148149
}
150+
151+
override fun mapUnknown(schema: ObjectNode): ObjectNode {
152+
return Jsons.emptyObject() as ObjectNode
153+
}
149154
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonschema/JsonSchemaMapper.kt

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ abstract class JsonSchemaMapper {
2929
AirbyteJsonSchemaType.NUMBER -> mapNumber(schema)
3030
AirbyteJsonSchemaType.COMBINED -> mapCombined(schema)
3131
AirbyteJsonSchemaType.UNION -> mapUnion(schema)
32+
AirbyteJsonSchemaType.UNKNOWN -> mapUnknown(schema)
3233
}
3334
}
3435

@@ -50,4 +51,5 @@ abstract class JsonSchemaMapper {
5051
abstract fun mapNumber(schema: ObjectNode): ObjectNode
5152
abstract fun mapCombined(schema: ObjectNode): ObjectNode
5253
abstract fun mapUnion(schema: ObjectNode): ObjectNode
54+
abstract fun mapUnknown(schema: ObjectNode): ObjectNode
5355
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/JsonSchemaParquetPreprocessor.kt

+9
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ class JsonSchemaParquetPreprocessor : JsonSchemaIdentityMapper() {
3636
AirbyteJsonSchemaType.UNION,
3737
AirbyteJsonSchemaType.COMBINED ->
3838
throw IllegalStateException("Nested unions are not supported")
39+
// Parquet has a native JSON type, which we would ideally use here.
40+
// Unfortunately, we're currently building parquet schemas via
41+
// Avro schemas, and Avro doesn't have a native JSON type.
42+
// So for now, we assume that the JsonSchemaAvroPreprocessor
43+
// was invoked before this preprocessor.
44+
AirbyteJsonSchemaType.UNKNOWN ->
45+
throw IllegalStateException(
46+
"JSON fields should be converted to string upstream of this processor"
47+
)
3948
}
4049
}
4150
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonRecordTransformerTest.kt

+47
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
99
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityMapper
1010
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonRecordParquetPreprocessor
1111
import io.airbyte.commons.jackson.MoreMappers
12+
import io.airbyte.commons.json.Jsons
1213
import org.junit.jupiter.api.Assertions
1314
import org.junit.jupiter.api.Test
1415

@@ -162,4 +163,50 @@ class JsonRecordTransformerTest {
162163
Assertions.assertEquals(jsonExpectedOut[index], transformedRecord)
163164
}
164165
}
166+
167+
@Test
168+
fun testStringifyJsonValues() {
169+
val inputSchema =
170+
Jsons.deserialize(
171+
"""
172+
{
173+
"type": "object",
174+
"properties": {
175+
"foo": {},
176+
"bar": {
177+
"type": "array",
178+
"items": {}
179+
}
180+
}
181+
}
182+
""".trimIndent()
183+
) as ObjectNode
184+
val inputRecord =
185+
Jsons.deserialize(
186+
"""
187+
{
188+
"foo": {"a": 42},
189+
"bar": [1, null, {}]
190+
}
191+
""".trimIndent()
192+
)
193+
194+
val avroMappedRecord =
195+
JsonRecordAvroPreprocessor().mapRecordWithSchema(inputRecord, inputSchema)
196+
197+
Assertions.assertEquals(
198+
Jsons.deserialize(
199+
// The "foo" field is completely serialized
200+
// and the "bar" field has each entry serialized,
201+
// except for the null entry, which is unchanged.
202+
"""
203+
{
204+
"foo": "{\"a\":42}",
205+
"bar": ["1", null, "{}"]
206+
}
207+
""".trimIndent()
208+
),
209+
avroMappedRecord
210+
)
211+
}
165212
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaTransformerTest.kt

+40
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaIdentityM
99
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaUnionMerger
1010
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonSchemaParquetPreprocessor
1111
import io.airbyte.commons.jackson.MoreMappers
12+
import io.airbyte.commons.json.Jsons
13+
import kotlin.test.assertEquals
1214
import org.junit.jupiter.api.Assertions
1315
import org.junit.jupiter.api.Test
1416

@@ -139,4 +141,42 @@ class JsonSchemaTransformerTest {
139141
Assertions.assertEquals(nullType, properties["redundant_null"])
140142
Assertions.assertEquals(stringType, properties["combined_null_string"])
141143
}
144+
145+
@Test
146+
fun testJsonType() {
147+
val inputSchema =
148+
Jsons.deserialize(
149+
"""
150+
{
151+
"type": "object",
152+
"properties": {
153+
"foo": {},
154+
"bar": {
155+
"type": "array",
156+
"items": {}
157+
}
158+
}
159+
}
160+
""".trimIndent()
161+
) as ObjectNode
162+
val mapped = JsonSchemaAvroPreprocessor().mapSchema(inputSchema)
163+
164+
assertEquals(
165+
Jsons.deserialize(
166+
"""
167+
{
168+
"type": "object",
169+
"properties": {
170+
"foo": {"type": "string"},
171+
"bar": {
172+
"type": "array",
173+
"items": {"type": "string"}
174+
}
175+
}
176+
}
177+
""".trimIndent()
178+
),
179+
mapped
180+
)
181+
}
142182
}

airbyte-integrations/connectors/destination-s3/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.44.19'
7+
cdkVersionRequired = '0.44.20'
88
features = ['db-destinations', 's3-destinations']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-s3/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: file
33
connectorType: destination
44
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
5-
dockerImageTag: 1.0.3
5+
dockerImageTag: 1.0.4
66
dockerRepository: airbyte/destination-s3
77
githubIssueLabel: destination-s3
88
icon: s3.svg

docs/integrations/destinations/s3.md

+1
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
536536

537537
| Version | Date | Pull Request | Subject |
538538
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- |
539+
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
539540
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
540541
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
541542
| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. |

0 commit comments

Comments
 (0)