Skip to content

🐛 Snowflake destination: snowflake s3 destination COPY is writing records from different table in the same raw table fix #5924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.13",
"dockerImageTag": "0.3.14",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.13",
"dockerImageTag": "0.3.14",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.13
dockerImageTag: 0.3.14
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
Expand All @@ -52,7 +52,7 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.13
dockerImageTag: 0.3.14
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@
}
}
}
},
{
"name": "stream_name",
"json_schema": {
"properties": {
"some_id": {
"type": "integer"
},
"some_field": {
"type": "string"
},
"some_next_field": {
"type": "string"
}
}
}
},
{
"name": "stream_name_next",
"json_schema": {
"properties": {
"some_id": {
"type": "integer"
},
"next_field_name": {
"type": "string"
}
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,11 @@
{"type": "RECORD", "record": {"stream": "reserved_keywords", "emitted_at": 1602637589000, "data": { "order" : "ascending" }}}
{"type": "RECORD", "record": {"stream": "groups", "emitted_at": 1602637589000, "data": { "authorization" : "into" }}}
{"type": "RECORD", "record": {"stream": "ProperCase", "emitted_at": 1602637589000, "data": { "ProperCase" : true }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589200, "data": { "some_id" : 101, "some_field" : "some_field_1", "some_next_field" : "some_next_field_1" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589250, "data": { "some_id" : 102, "some_field" : "some_field_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589300, "data": { "some_id" : 103, "some_next_field" : "some_next_field_3" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589350, "data": { "some_id" : 104 }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589400, "data": { "some_id" : 201, "next_field_name" : "next_field_name_1" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : 202, "next_field_name" : "next_field_name_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public S3StreamCopier(String stagingFolder,
this.s3Client = client;
this.s3Config = s3Config;

this.s3StagingFile = String.join("/", stagingFolder, schemaName, streamName);
this.s3StagingFile = prepareS3StagingFile(stagingFolder, streamName);
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
// have support for streaming multipart uploads;
Expand All @@ -118,6 +118,47 @@ public S3StreamCopier(String stagingFolder,
}
}

public S3StreamCopier(String stagingFolder,
DestinationSyncMode destSyncMode,
String schema,
String streamName,
String s3FileName,
AmazonS3 client,
JdbcDatabase db,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.s3Client = client;
this.s3Config = s3Config;

this.s3StagingFile = prepareS3StagingFile(stagingFolder, s3FileName);
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
this.multipartUploadManager =
new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.partSize(s3Config.getPartSize());
this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0);

var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String prepareS3StagingFile(String stagingFolder, String s3FileName) {
return String.join("/", stagingFolder, schemaName, s3FileName);
}

@Override
public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception {
csvPrinter.printRecord(id, jsonDataString, emittedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.13
LABEL io.airbyte.version=0.3.14
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.redshift;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
Expand All @@ -44,7 +45,8 @@ public RedshiftStreamCopier(String stagingFolder,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations);
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config,
nameTransformer, sqlOperations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.13
LABEL io.airbyte.version=0.3.14
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
package io.airbyte.integrations.destination.snowflake;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeS3StreamCopier extends S3StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class);

public SnowflakeS3StreamCopier(String stagingFolder,
DestinationSyncMode destSyncMode,
String schema,
Expand All @@ -44,7 +49,8 @@ public SnowflakeS3StreamCopier(String stagingFolder,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations);
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config,
nameTransformer, sqlOperations);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ See [docs](https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance |
| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs |
| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours |
| 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
Expand Down