Skip to content

Commit c9bd5e9

Browse files
VitaliiMaltsevvmaltsev
and
vmaltsev
authored
Destination Snowflake Execute COPY in parallel (#10212)
* fix for jdk 17 * add parallel chunk copy S3 * add parallel chunk copy GCS * fixed checkstyle * refactoring * add unit tests * updated CHANGELOG * fixed S3 bucket path generation * refactoring * refactoring * fixed compilation error after merge * add multitheading into S3 and GCS stream copiers * fixed checkstyle * fixed checkstyle * update parallel copy with CompletableFuture * refactoring * add javadoc * bump version * update destination_specs.yaml Co-authored-by: vmaltsev <[email protected]>
1 parent 3db3e88 commit c9bd5e9

File tree

12 files changed

+251
-44
lines changed

12 files changed

+251
-44
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@
185185
- name: Snowflake
186186
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
187187
dockerRepository: airbyte/destination-snowflake
188-
dockerImageTag: 0.4.12
188+
dockerImageTag: 0.4.13
189189
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
190190
icon: snowflake.svg
191191
- name: MariaDB ColumnStore

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3825,7 +3825,7 @@
38253825
supported_destination_sync_modes:
38263826
- "overwrite"
38273827
- "append"
3828-
- dockerImage: "airbyte/destination-snowflake:0.4.12"
3828+
- dockerImage: "airbyte/destination-snowflake:0.4.13"
38293829
spec:
38303830
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
38313831
connectionSpecification:

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.google.cloud.storage.BlobInfo;
1111
import com.google.cloud.storage.Storage;
1212
import com.google.cloud.storage.StorageOptions;
13+
import com.google.common.annotations.VisibleForTesting;
1314
import io.airbyte.commons.json.Jsons;
1415
import io.airbyte.db.jdbc.JdbcDatabase;
1516
import io.airbyte.integrations.destination.ExtendedNameTransformer;
@@ -50,21 +51,20 @@ public abstract class GcsStreamCopier implements StreamCopier {
5051
// QUERY_TIMEOUT when
5152
// the records from the file are copied to the staging table.
5253
public static final int MAX_PARTS_PER_FILE = 1000;
53-
54+
protected final GcsConfig gcsConfig;
55+
protected final String tmpTableName;
56+
protected final String schemaName;
57+
protected final String streamName;
58+
protected final JdbcDatabase db;
59+
protected final Set<String> gcsStagingFiles = new HashSet<>();
60+
protected final String stagingFolder;
61+
protected StagingFilenameGenerator filenameGenerator;
5462
private final Storage storageClient;
55-
private final GcsConfig gcsConfig;
56-
private final String tmpTableName;
5763
private final DestinationSyncMode destSyncMode;
58-
private final String schemaName;
59-
private final String streamName;
60-
private final JdbcDatabase db;
6164
private final ExtendedNameTransformer nameTransformer;
6265
private final SqlOperations sqlOperations;
63-
private final Set<String> gcsStagingFiles = new HashSet<>();
6466
private final HashMap<String, WriteChannel> channels = new HashMap<>();
6567
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
66-
private final String stagingFolder;
67-
protected StagingFilenameGenerator filenameGenerator;
6868

6969
public GcsStreamCopier(final String stagingFolder,
7070
final DestinationSyncMode destSyncMode,
@@ -234,6 +234,16 @@ public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOExcep
234234
.getService();
235235
}
236236

237+
@VisibleForTesting
238+
public String getTmpTableName() {
239+
return tmpTableName;
240+
}
241+
242+
@VisibleForTesting
243+
public Set<String> getGcsStagingFiles() {
244+
return gcsStagingFiles;
245+
}
246+
237247
public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database,
238248
String gcsFileLocation,
239249
String schema,

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@ public abstract class S3StreamCopier implements StreamCopier {
4141
protected final AmazonS3 s3Client;
4242
protected final S3DestinationConfig s3Config;
4343
protected final String tmpTableName;
44-
private final DestinationSyncMode destSyncMode;
4544
protected final String schemaName;
4645
protected final String streamName;
4746
protected final JdbcDatabase db;
47+
protected final ConfiguredAirbyteStream configuredAirbyteStream;
48+
protected final String stagingFolder;
49+
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
50+
private final DestinationSyncMode destSyncMode;
4851
private final ExtendedNameTransformer nameTransformer;
4952
private final SqlOperations sqlOperations;
50-
private final ConfiguredAirbyteStream configuredAirbyteStream;
5153
private final Timestamp uploadTime;
52-
protected final String stagingFolder;
53-
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
5454
protected final Set<String> activeStagingWriterFileNames = new HashSet<>();
5555
protected final Set<String> stagingFileNames = new LinkedHashSet<>();
5656
private final boolean purgeStagingData;

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
1818

1919
RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

21-
ENV APPLICATION_VERSION 0.4.12
21+
ENV APPLICATION_VERSION 0.4.13
2222
ENV ENABLE_SENTRY true
2323

24-
LABEL io.airbyte.version=0.4.12
24+
LABEL io.airbyte.version=0.4.13
2525
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java

+41-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
package io.airbyte.integrations.destination.snowflake;
66

7+
import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StreamCopier.MAX_FILES_PER_COPY;
8+
79
import com.google.cloud.storage.Storage;
10+
import com.google.common.collect.Lists;
11+
import io.airbyte.commons.lang.Exceptions;
812
import io.airbyte.db.jdbc.JdbcDatabase;
913
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1014
import io.airbyte.integrations.destination.jdbc.SqlOperations;
@@ -13,8 +17,14 @@
1317
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier;
1418
import io.airbyte.protocol.models.DestinationSyncMode;
1519
import java.sql.SQLException;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
public class SnowflakeGcsStreamCopier extends GcsStreamCopier implements SnowflakeParallelCopyStreamCopier {
1626

17-
public class SnowflakeGcsStreamCopier extends GcsStreamCopier {
27+
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStreamCopier.class);
1828

1929
public SnowflakeGcsStreamCopier(final String stagingFolder,
2030
final DestinationSyncMode destSyncMode,
@@ -30,20 +40,44 @@ public SnowflakeGcsStreamCopier(final String stagingFolder,
3040
this.filenameGenerator = stagingFilenameGenerator;
3141
}
3242

43+
@Override
44+
public void copyStagingFileToTemporaryTable() throws Exception {
45+
List<List<String>> partitions = Lists.partition(new ArrayList<>(gcsStagingFiles), MAX_FILES_PER_COPY);
46+
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName,
47+
schemaName, partitions.size());
48+
49+
copyFilesInParallel(partitions);
50+
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
51+
}
52+
53+
@Override
54+
public void copyIntoStage(List<String> files) {
55+
56+
final var copyQuery = String.format(
57+
"COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration "
58+
+ " file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') "
59+
+ "files = (" + generateFilesList(files) + " );",
60+
schemaName,
61+
tmpTableName,
62+
generateBucketPath());
63+
64+
Exceptions.toRuntime(() -> db.execute(copyQuery));
65+
}
66+
67+
@Override
68+
public String generateBucketPath() {
69+
return "gcs://" + gcsConfig.getBucketName() + "/" + stagingFolder + "/" + schemaName + "/";
70+
}
71+
3372
@Override
3473
public void copyGcsCsvFileIntoTable(final JdbcDatabase database,
3574
final String gcsFileLocation,
3675
final String schema,
3776
final String tableName,
3877
final GcsConfig gcsConfig)
3978
throws SQLException {
40-
final var copyQuery = String.format(
41-
"COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');",
42-
schema,
43-
tableName,
44-
gcsFileLocation);
79+
throw new RuntimeException("Snowflake GCS Stream Copier should not copy individual files without use of a parallel copy");
4580

46-
database.execute(copyQuery);
4781
}
4882

4983
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake;
6+
7+
import java.util.List;
8+
import java.util.StringJoiner;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.stream.Collectors;
13+
14+
interface SnowflakeParallelCopyStreamCopier {
15+
16+
/**
17+
* Generates list of staging files. See more
18+
* https://docs.snowflake.com/en/user-guide/data-load-considerations-load.html#lists-of-files
19+
*/
20+
default String generateFilesList(List<String> files) {
21+
StringJoiner joiner = new StringJoiner(",");
22+
files.forEach(filename -> joiner.add("'" + filename.substring(filename.lastIndexOf("/") + 1) + "'"));
23+
return joiner.toString();
24+
}
25+
26+
/**
27+
* Executes async copying of staging files.This method should block until the copy/upload has
28+
* completed.
29+
*/
30+
default void copyFilesInParallel(List<List<String>> partitions) {
31+
ExecutorService executorService = Executors.newFixedThreadPool(5);
32+
List<CompletableFuture<Void>> futures = partitions.stream()
33+
.map(partition -> CompletableFuture.runAsync(() -> copyIntoStage(partition), executorService))
34+
.collect(Collectors.toList());
35+
36+
try {
37+
// wait until all futures ready
38+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
39+
} catch (Exception e) {
40+
throw new RuntimeException("Failed to copy files from stage to tmp table {}" + e);
41+
} finally {
42+
executorService.shutdown();
43+
}
44+
}
45+
46+
/**
47+
* Copies staging files to the temporary table using <COPY INTO> statement
48+
*/
49+
void copyIntoStage(List<String> files);
50+
51+
/**
52+
* Generates full bucket/container path to staging files
53+
*/
54+
String generateBucketPath();
55+
56+
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java

+45-11
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,33 @@
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import com.google.common.annotations.VisibleForTesting;
9+
import com.google.common.collect.Lists;
10+
import io.airbyte.commons.lang.Exceptions;
911
import io.airbyte.db.jdbc.JdbcDatabase;
1012
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1113
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1214
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
1315
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
1416
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
17+
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper;
1518
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1619
import java.sql.SQLException;
1720
import java.sql.Timestamp;
1821
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
1926

20-
public class SnowflakeS3StreamCopier extends S3StreamCopier {
27+
public class SnowflakeS3StreamCopier extends S3StreamCopier implements SnowflakeParallelCopyStreamCopier {
28+
29+
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class);
2130

2231
// From https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html
2332
// "Split your load data files so that the files are about equal size, between 1 MB and 1 GB after
2433
// compression"
2534
public static final int MAX_PARTS_PER_FILE = 4;
35+
public static final int MAX_FILES_PER_COPY = 1000;
2636

2737
public SnowflakeS3StreamCopier(final String stagingFolder,
2838
final String schema,
@@ -54,6 +64,7 @@ public SnowflakeS3StreamCopier(final String stagingFolder,
5464
final SqlOperations sqlOperations,
5565
final Timestamp uploadTime,
5666
final ConfiguredAirbyteStream configuredAirbyteStream) {
67+
5768
super(stagingFolder,
5869
schema,
5970
client,
@@ -66,24 +77,47 @@ public SnowflakeS3StreamCopier(final String stagingFolder,
6677
MAX_PARTS_PER_FILE);
6778
}
6879

80+
@Override
81+
public void copyStagingFileToTemporaryTable() throws Exception {
82+
List<List<String>> partitions = Lists.partition(new ArrayList<>(stagingWritersByFile.keySet()), MAX_FILES_PER_COPY);
83+
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName,
84+
schemaName, partitions.size());
85+
86+
copyFilesInParallel(partitions);
87+
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
88+
}
89+
90+
@Override
91+
public void copyIntoStage(List<String> files) {
92+
final var copyQuery = String.format(
93+
"COPY INTO %s.%s FROM '%s' "
94+
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') "
95+
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') "
96+
+ "files = (" + generateFilesList(files) + " );",
97+
schemaName,
98+
tmpTableName,
99+
generateBucketPath(),
100+
s3Config.getAccessKeyId(),
101+
s3Config.getSecretAccessKey());
102+
103+
Exceptions.toRuntime(() -> db.execute(copyQuery));
104+
}
105+
106+
@Override
107+
public String generateBucketPath() {
108+
return "s3://" + s3Config.getBucketName() + "/"
109+
+ S3OutputPathHelper.getOutputPrefix(s3Config.getBucketPath(), configuredAirbyteStream.getStream()) + "/";
110+
}
111+
69112
@Override
70113
public void copyS3CsvFileIntoTable(final JdbcDatabase database,
71114
final String s3FileLocation,
72115
final String schema,
73116
final String tableName,
74117
final S3DestinationConfig s3Config)
75118
throws SQLException {
76-
final var copyQuery = String.format(
77-
"COPY INTO %s.%s FROM '%s' "
78-
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') "
79-
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');",
80-
schema,
81-
tableName,
82-
s3FileLocation,
83-
s3Config.getAccessKeyId(),
84-
s3Config.getSecretAccessKey());
119+
throw new RuntimeException("Snowflake Stream Copier should not copy individual files without use of a parallel copy");
85120

86-
database.execute(copyQuery);
87121
}
88122

89123
}

0 commit comments

Comments
 (0)