-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Destination Snowflake Execute COPY in parallel #10212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
93d53ac
ede8d38
27be3fe
ca75033
72ab46f
aec3384
7efd5aa
6a773f9
fa18537
b0ba37b
fa31a0d
67e0bd6
ec0d1bd
23598ec
59d63e6
e2cb62c
632583a
2bc76ce
9baf2e1
ed4f1ba
64f823f
6cea9ab
a2cb023
475a34c
f75c1c1
3078535
cc4e646
2b8d8c1
9349a29
a90eaba
90fed75
6ccb1e4
fa5df62
1e41129
9e3a776
4769a3d
a20a7dd
16322ee
76cee29
7126276
fb9b413
f9afb81
94ae8ba
5ffdc8c
d05eb8a
30e58a7
9743ea5
5ec8cc2
a651001
f5aca25
31394df
1a183ac
facc90b
ec75edf
48d24fc
ef3bb56
68c8401
9d66d1d
8c8af18
cb67a38
1520489
fb3d9b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.snowflake; | ||
|
||
import java.util.List; | ||
import java.util.StringJoiner; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.stream.Collectors; | ||
|
||
interface SnowflakeParallelCopyStreamCopier { | ||
|
||
/** | ||
* Generates list of staging files. See more | ||
* https://docs.snowflake.com/en/user-guide/data-load-considerations-load.html#lists-of-files | ||
*/ | ||
default String generateFilesList(List<String> files) { | ||
StringJoiner joiner = new StringJoiner(","); | ||
files.forEach(filename -> joiner.add("'" + filename.substring(filename.lastIndexOf("/") + 1) + "'")); | ||
return joiner.toString(); | ||
} | ||
|
||
/** | ||
* Executes async copying of staging files.This method should block until the copy/upload has | ||
* completed. | ||
*/ | ||
default void copyFilesInParallel(List<List<String>> partitions) { | ||
ExecutorService executorService = Executors.newFixedThreadPool(5); | ||
List<CompletableFuture<Void>> futures = partitions.stream() | ||
.map(partition -> CompletableFuture.runAsync(() -> copyIntoStage(partition), executorService)) | ||
.collect(Collectors.toList()); | ||
|
||
try { | ||
// wait until all futures ready | ||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to copy files from stage to tmp table {}" + e); | ||
} finally { | ||
executorService.shutdown(); | ||
} | ||
} | ||
|
||
/** | ||
* Copies staging files to the temporary table using <COPY INTO> statement | ||
*/ | ||
void copyIntoStage(List<String> files); | ||
|
||
/** | ||
* Generates full bucket/container path to staging files | ||
*/ | ||
String generateBucketPath(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,23 +6,33 @@ | |
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import java.sql.SQLException; | ||
import java.sql.Timestamp; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class SnowflakeS3StreamCopier extends S3StreamCopier { | ||
public class SnowflakeS3StreamCopier extends S3StreamCopier implements SnowflakeParallelCopyStreamCopier { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class); | ||
|
||
// From https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html | ||
// "Split your load data files so that the files are about equal size, between 1 MB and 1 GB after | ||
// compression" | ||
public static final int MAX_PARTS_PER_FILE = 4; | ||
public static final int MAX_FILES_PER_COPY = 1000; | ||
|
||
public SnowflakeS3StreamCopier(final String stagingFolder, | ||
final String schema, | ||
|
@@ -54,6 +64,7 @@ public SnowflakeS3StreamCopier(final String stagingFolder, | |
final SqlOperations sqlOperations, | ||
final Timestamp uploadTime, | ||
final ConfiguredAirbyteStream configuredAirbyteStream) { | ||
|
||
super(stagingFolder, | ||
schema, | ||
client, | ||
|
@@ -66,24 +77,47 @@ public SnowflakeS3StreamCopier(final String stagingFolder, | |
MAX_PARTS_PER_FILE); | ||
} | ||
|
||
@Override | ||
public void copyStagingFileToTemporaryTable() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this completely identical to the methods in the GCS copier? if yes, let's extract them to a shared implementation rather than copy-pasting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @edgao extracted shared implementation into SnowflakeParallelCopyStreamCopier interface |
||
List<List<String>> partitions = Lists.partition(new ArrayList<>(stagingWritersByFile.keySet()), MAX_FILES_PER_COPY); | ||
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName, | ||
schemaName, partitions.size()); | ||
|
||
copyFilesInParallel(partitions); | ||
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); | ||
} | ||
|
||
@Override | ||
public void copyIntoStage(List<String> files) { | ||
final var copyQuery = String.format( | ||
"COPY INTO %s.%s FROM '%s' " | ||
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') " | ||
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') " | ||
+ "files = (" + generateFilesList(files) + " );", | ||
schemaName, | ||
tmpTableName, | ||
generateBucketPath(), | ||
s3Config.getAccessKeyId(), | ||
s3Config.getSecretAccessKey()); | ||
|
||
Exceptions.toRuntime(() -> db.execute(copyQuery)); | ||
} | ||
|
||
@Override | ||
public String generateBucketPath() { | ||
return "s3://" + s3Config.getBucketName() + "/" | ||
+ S3OutputPathHelper.getOutputPrefix(s3Config.getBucketPath(), configuredAirbyteStream.getStream()) + "/"; | ||
} | ||
|
||
@Override | ||
public void copyS3CsvFileIntoTable(final JdbcDatabase database, | ||
final String s3FileLocation, | ||
final String schema, | ||
final String tableName, | ||
final S3DestinationConfig s3Config) | ||
throws SQLException { | ||
final var copyQuery = String.format( | ||
"COPY INTO %s.%s FROM '%s' " | ||
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') " | ||
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');", | ||
schema, | ||
tableName, | ||
s3FileLocation, | ||
s3Config.getAccessKeyId(), | ||
s3Config.getSecretAccessKey()); | ||
throw new RuntimeException("Snowflake Stream Copier should not copy individual files without use of a parallel copy"); | ||
|
||
database.execute(copyQuery); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add javadoc comments to the methods in this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done