Skip to content

Commit b0a42bf

Browse files
authored
destination-s3-glue: Add TableType and fix race condition (#22220)
- The TableType attribute is not getting populated in the Glue catalog which has started to lead to errors when trying to query the table from e.g. Trino. The error message observed is `Cannot invoke "String.equals(Object)" because "tableType" is null` - There is a race condition in initializing the destination connector where a failure occurs if multiple connections attempt to initialize at the same time, because the test table that gets created is statically named. This adds a random suffix to the table to avoid that race condition. - The Airbyte sync ID and emitted_at fields are useful for deduplicating data. This adds those fields to the table definition since they are already included in the records as written.
1 parent 857bcac commit b0a42bf

File tree

7 files changed

+19
-10
lines changed

7 files changed

+19
-10
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@
334334
- name: S3 Glue
335335
destinationDefinitionId: 471e5cab-8ed1-49f3-ba11-79c687784737
336336
dockerRepository: airbyte/destination-s3-glue
337-
dockerImageTag: 0.1.1
337+
dockerImageTag: 0.1.2
338338
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-glue
339339
icon: s3-glue.svg
340340
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5869,7 +5869,7 @@
58695869
supported_destination_sync_modes:
58705870
- "overwrite"
58715871
- "append"
5872-
- dockerImage: "airbyte/destination-s3-glue:0.1.1"
5872+
- dockerImage: "airbyte/destination-s3-glue:0.1.2"
58735873
spec:
58745874
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
58755875
connectionSpecification:

airbyte-integrations/connectors/destination-s3-glue/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ ENV APPLICATION destination-s3-glue
1414

1515
COPY --from=build /airbyte /airbyte
1616

17-
LABEL io.airbyte.version=0.1.1
17+
LABEL io.airbyte.version=0.1.2
1818
LABEL io.airbyte.name=airbyte/destination-s3-glue

airbyte-integrations/connectors/destination-s3-glue/src/main/java/io/airbyte/integrations/destination/s3_glue/GlueOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void upsertTable(String databaseName,
5757
.withTableInput(
5858
new TableInput()
5959
.withName(tableName)
60-
// .withTableType("GOVERNED")
60+
.withTableType("EXTERNAL_TABLE")
6161
.withStorageDescriptor(
6262
new StorageDescriptor()
6363
.withLocation(location)
@@ -80,7 +80,7 @@ public void upsertTable(String databaseName,
8080
.withTableInput(
8181
new TableInput()
8282
.withName(tableName)
83-
// .withTableType("GOVERNED")
83+
.withTableType("EXTERNAL_TABLE")
8484
.withStorageDescriptor(
8585
new StorageDescriptor()
8686
.withLocation(location)

airbyte-integrations/connectors/destination-s3-glue/src/main/java/io/airbyte/integrations/destination/s3_glue/S3GlueConsumerFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
package io.airbyte.integrations.destination.s3_glue;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.fasterxml.jackson.databind.node.ObjectNode;
89
import com.google.common.base.Preconditions;
910
import io.airbyte.commons.functional.CheckedBiConsumer;
1011
import io.airbyte.commons.functional.CheckedBiFunction;
1112
import io.airbyte.commons.json.Jsons;
1213
import io.airbyte.integrations.base.AirbyteMessageConsumer;
14+
import io.airbyte.integrations.base.JavaBaseConstants;
1315
import io.airbyte.integrations.destination.NamingConventionTransformer;
1416
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
1517
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
@@ -85,6 +87,8 @@ private static Function<ConfiguredAirbyteStream, S3GlueWriteConfig> toWriteConfi
8587
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
8688
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
8789
final JsonNode jsonSchema = abStream.getJsonSchema();
90+
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_AB_ID, Map.of("type", "string"));
91+
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, Map.of("type", "string"));
8892
final String location = "s3://" + s3Config.getBucketName() + "/" +
8993
fullOutputPath.substring(0, fullOutputPath.lastIndexOf("/") + 1);
9094
final S3GlueWriteConfig writeConfig =

airbyte-integrations/connectors/destination-s3-glue/src/main/java/io/airbyte/integrations/destination/s3_glue/S3GlueDestination.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.airbyte.protocol.models.v0.AirbyteMessage;
2121
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
2222
import java.util.function.Consumer;
23+
24+
import org.apache.commons.lang3.RandomStringUtils;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527

@@ -43,7 +45,9 @@ public AirbyteConnectionStatus check(JsonNode config) {
4345
}
4446
final GlueDestinationConfig glueConfig = GlueDestinationConfig.getInstance(config);
4547
MetastoreOperations metastoreOperations = null;
46-
String tableName = "test_table";
48+
// If there are multiple syncs started at the same time a stataic test table name causes a resource collision and a failure to sync.
49+
String tableSuffix = RandomStringUtils.randomAlphabetic(9);
50+
String tableName = "test_table_" + tableSuffix;
4751
try {
4852
metastoreOperations = new GlueOperations(glueConfig.getAWSGlueInstance());
4953
metastoreOperations.upsertTable(glueConfig.getDatabase(), tableName, "s3://", Jsons.emptyObject(), glueConfig.getSerializationLibrary());

docs/integrations/destinations/s3-glue.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ Output files can be compressed. The default option is GZIP compression. If compr
243243

244244
## CHANGELOG
245245

246-
| Version | Date | Pull Request | Subject |
247-
| :------ | :--------- | :------------------------------------------------------- | :------------- |
248-
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
249-
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |
246+
| Version | Date | Pull Request | Subject |
247+
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------|
248+
| 0.1.2 | 2023-02-01 | [22220](https://github.com/airbytehq/airbyte/pull/22220) | Fix race condition in test, table metadata, add Airbyte sync fields to table definition |
249+
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
250+
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |

0 commit comments

Comments
 (0)