Skip to content

Commit 71f51e0

Browse files
🐛 Snowflake destination: snowflake s3 destination COPY is writing records from different table in the same raw table fix (#5924)
* snowflake s3 destination COPY is writing records from different table in the same raw table fix * updated snowflake s3 file name * updated snowflake documentation * updated snowflake documentation * updated snowflake documentation * updated code style * updated code style * updated redshift destination
1 parent 6ec45b3 commit 71f51e0

File tree

13 files changed

+97
-10
lines changed

13 files changed

+97
-10
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
33
"name": "Snowflake",
44
"dockerRepository": "airbyte/destination-snowflake",
5-
"dockerImageTag": "0.3.13",
5+
"dockerImageTag": "0.3.14",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
33
"name": "Redshift",
44
"dockerRepository": "airbyte/destination-redshift",
5-
"dockerImageTag": "0.3.13",
5+
"dockerImageTag": "0.3.14",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
77
"icon": "redshift.svg"
88
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
4343
name: Snowflake
4444
dockerRepository: airbyte/destination-snowflake
45-
dockerImageTag: 0.3.13
45+
dockerImageTag: 0.3.14
4646
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
4747
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
4848
name: S3
@@ -52,7 +52,7 @@
5252
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
5353
name: Redshift
5454
dockerRepository: airbyte/destination-redshift
55-
dockerImageTag: 0.3.13
55+
dockerImageTag: 0.3.14
5656
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
5757
icon: redshift.svg
5858
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e

airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json

+29
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,35 @@
9696
}
9797
}
9898
}
99+
},
100+
{
101+
"name": "stream_name",
102+
"json_schema": {
103+
"properties": {
104+
"some_id": {
105+
"type": "integer"
106+
},
107+
"some_field": {
108+
"type": "string"
109+
},
110+
"some_next_field": {
111+
"type": "string"
112+
}
113+
}
114+
}
115+
},
116+
{
117+
"name": "stream_name_next",
118+
"json_schema": {
119+
"properties": {
120+
"some_id": {
121+
"type": "integer"
122+
},
123+
"next_field_name": {
124+
"type": "string"
125+
}
126+
}
127+
}
99128
}
100129
]
101130
}

airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt

+7
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,11 @@
77
{"type": "RECORD", "record": {"stream": "reserved_keywords", "emitted_at": 1602637589000, "data": { "order" : "ascending" }}}
88
{"type": "RECORD", "record": {"stream": "groups", "emitted_at": 1602637589000, "data": { "authorization" : "into" }}}
99
{"type": "RECORD", "record": {"stream": "ProperCase", "emitted_at": 1602637589000, "data": { "ProperCase" : true }}}
10+
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589200, "data": { "some_id" : 101, "some_field" : "some_field_1", "some_next_field" : "some_next_field_1" }}}
11+
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589250, "data": { "some_id" : 102, "some_field" : "some_field_2" }}}
12+
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589300, "data": { "some_id" : 103, "some_next_field" : "some_next_field_3" }}}
13+
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589350, "data": { "some_id" : 104 }}}
14+
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589400, "data": { "some_id" : 201, "next_field_name" : "next_field_name_1" }}}
15+
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : 202, "next_field_name" : "next_field_name_2" }}}
16+
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}}
1017
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}

airbyte-integrations/connectors/destination-jdbc/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.3
11+
LABEL io.airbyte.version=0.3.4
1212
LABEL io.airbyte.name=airbyte/destination-jdbc

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public S3StreamCopier(String stagingFolder,
9292
this.s3Client = client;
9393
this.s3Config = s3Config;
9494

95-
this.s3StagingFile = String.join("/", stagingFolder, schemaName, streamName);
95+
this.s3StagingFile = prepareS3StagingFile(stagingFolder, streamName);
9696
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
9797
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
9898
// have support for streaming multipart uploads;
@@ -118,6 +118,47 @@ public S3StreamCopier(String stagingFolder,
118118
}
119119
}
120120

121+
public S3StreamCopier(String stagingFolder,
122+
DestinationSyncMode destSyncMode,
123+
String schema,
124+
String streamName,
125+
String s3FileName,
126+
AmazonS3 client,
127+
JdbcDatabase db,
128+
S3Config s3Config,
129+
ExtendedNameTransformer nameTransformer,
130+
SqlOperations sqlOperations) {
131+
this.destSyncMode = destSyncMode;
132+
this.schemaName = schema;
133+
this.streamName = streamName;
134+
this.db = db;
135+
this.nameTransformer = nameTransformer;
136+
this.sqlOperations = sqlOperations;
137+
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
138+
this.s3Client = client;
139+
this.s3Config = s3Config;
140+
141+
this.s3StagingFile = prepareS3StagingFile(stagingFolder, s3FileName);
142+
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
143+
this.multipartUploadManager =
144+
new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client)
145+
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
146+
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
147+
.partSize(s3Config.getPartSize());
148+
this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0);
149+
150+
var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
151+
try {
152+
this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
153+
} catch (IOException e) {
154+
throw new RuntimeException(e);
155+
}
156+
}
157+
158+
private String prepareS3StagingFile(String stagingFolder, String s3FileName) {
159+
return String.join("/", stagingFolder, schemaName, s3FileName);
160+
}
161+
121162
@Override
122163
public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception {
123164
csvPrinter.printRecord(id, jsonDataString, emittedAt);

airbyte-integrations/connectors/destination-redshift/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.13
11+
LABEL io.airbyte.version=0.3.14
1212
LABEL io.airbyte.name=airbyte/destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package io.airbyte.integrations.destination.redshift;
2626

2727
import com.amazonaws.services.s3.AmazonS3;
28+
import io.airbyte.commons.string.Strings;
2829
import io.airbyte.db.jdbc.JdbcDatabase;
2930
import io.airbyte.integrations.destination.ExtendedNameTransformer;
3031
import io.airbyte.integrations.destination.jdbc.SqlOperations;
@@ -44,7 +45,8 @@ public RedshiftStreamCopier(String stagingFolder,
4445
S3Config s3Config,
4546
ExtendedNameTransformer nameTransformer,
4647
SqlOperations sqlOperations) {
47-
super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations);
48+
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config,
49+
nameTransformer, sqlOperations);
4850
}
4951

5052
@Override

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.13
11+
LABEL io.airbyte.version=0.3.14
1212
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
package io.airbyte.integrations.destination.snowflake;
2626

2727
import com.amazonaws.services.s3.AmazonS3;
28+
import io.airbyte.commons.string.Strings;
2829
import io.airbyte.db.jdbc.JdbcDatabase;
2930
import io.airbyte.integrations.destination.ExtendedNameTransformer;
3031
import io.airbyte.integrations.destination.jdbc.SqlOperations;
3132
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
3233
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
3334
import io.airbyte.protocol.models.DestinationSyncMode;
3435
import java.sql.SQLException;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3538

3639
public class SnowflakeS3StreamCopier extends S3StreamCopier {
3740

41+
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class);
42+
3843
public SnowflakeS3StreamCopier(String stagingFolder,
3944
DestinationSyncMode destSyncMode,
4045
String schema,
@@ -44,7 +49,8 @@ public SnowflakeS3StreamCopier(String stagingFolder,
4449
S3Config s3Config,
4550
ExtendedNameTransformer nameTransformer,
4651
SqlOperations sqlOperations) {
47-
super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations);
52+
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config,
53+
nameTransformer, sqlOperations);
4854
}
4955

5056
@Override

docs/integrations/destinations/redshift.md

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ See [docs](https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html
107107

108108
| Version | Date | Pull Request | Subject |
109109
| :------ | :-------- | :----- | :------ |
110+
| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
110111
| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance |
111112
| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs |
112113
| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |

docs/integrations/destinations/snowflake.md

+1
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email.
189189

190190
| Version | Date | Pull Request | Subject |
191191
| :------ | :-------- | :----- | :------ |
192+
| 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
192193
| 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours |
193194
| 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
194195
| 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |

0 commit comments

Comments
 (0)