Skip to content

Destination Snowflake Execute COPY in parallel #10212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
93d53ac
fix for jdk 17
Dec 15, 2021
ede8d38
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
27be3fe
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
ca75033
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
72ab46f
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
aec3384
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
7efd5aa
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 20, 2021
6a773f9
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
fa18537
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
b0ba37b
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 22, 2021
fa31a0d
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 23, 2021
67e0bd6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 24, 2021
ec0d1bd
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
23598ec
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
59d63e6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 30, 2021
e2cb62c
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
632583a
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
2bc76ce
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 11, 2022
9baf2e1
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 12, 2022
ed4f1ba
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 13, 2022
64f823f
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 13, 2022
6cea9ab
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 14, 2022
a2cb023
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 14, 2022
475a34c
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 17, 2022
f75c1c1
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 18, 2022
3078535
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 19, 2022
cc4e646
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 20, 2022
2b8d8c1
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 21, 2022
9349a29
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 24, 2022
a90eaba
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 25, 2022
90fed75
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 31, 2022
6ccb1e4
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 1, 2022
fa5df62
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 2, 2022
1e41129
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 4, 2022
9e3a776
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 7, 2022
4769a3d
add parallel chunk copy S3
Feb 8, 2022
a20a7dd
add parallel chunk copy GCS
Feb 9, 2022
16322ee
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 9, 2022
76cee29
Merge branch 'master' into vmaltsev/9087-destination-snowflake-parral…
Feb 9, 2022
7126276
fixed checkstyle
Feb 9, 2022
fb9b413
refactoring
Feb 9, 2022
f9afb81
add unit tests
Feb 10, 2022
94ae8ba
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 10, 2022
5ffdc8c
Merge branch 'master' into vmaltsev/9087-destination-snowflake-parral…
Feb 10, 2022
d05eb8a
updated CHANGELOG
Feb 10, 2022
30e58a7
fixed S3 bucket path generation
Feb 10, 2022
9743ea5
refactoring
Feb 11, 2022
5ec8cc2
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 14, 2022
a651001
refactoring
Feb 15, 2022
f5aca25
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 16, 2022
31394df
Merge branch 'master' into vmaltsev/9087-destination-snowflake-parral…
Feb 16, 2022
1a183ac
fixed compilation error after merge
Feb 16, 2022
facc90b
add multitheading into S3 and GCS stream copiers
Feb 16, 2022
ec75edf
fixed checkstyle
Feb 16, 2022
48d24fc
fixed checkstyle
Feb 16, 2022
ef3bb56
update parallel copy with CompletableFuture
Feb 16, 2022
68c8401
refactoring
Feb 17, 2022
9d66d1d
add javadoc
Feb 17, 2022
8c8af18
bump version
Feb 17, 2022
cb67a38
Merge branch 'master' of github.com:airbytehq/airbyte
Feb 17, 2022
1520489
Merge branch 'master' into vmaltsev/9087-destination-snowflake-parral…
Feb 17, 2022
fb3d9b4
update destination_specs.yaml
Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.12
dockerImageTag: 0.4.13
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3825,7 +3825,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.12"
- dockerImage: "airbyte/destination-snowflake:0.4.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand Down Expand Up @@ -50,21 +51,20 @@ public abstract class GcsStreamCopier implements StreamCopier {
// QUERY_TIMEOUT when
// the records from the file are copied to the staging table.
public static final int MAX_PARTS_PER_FILE = 1000;

protected final GcsConfig gcsConfig;
protected final String tmpTableName;
protected final String schemaName;
protected final String streamName;
protected final JdbcDatabase db;
protected final Set<String> gcsStagingFiles = new HashSet<>();
protected final String stagingFolder;
protected StagingFilenameGenerator filenameGenerator;
private final Storage storageClient;
private final GcsConfig gcsConfig;
private final String tmpTableName;
private final DestinationSyncMode destSyncMode;
private final String schemaName;
private final String streamName;
private final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final Set<String> gcsStagingFiles = new HashSet<>();
private final HashMap<String, WriteChannel> channels = new HashMap<>();
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
private final String stagingFolder;
protected StagingFilenameGenerator filenameGenerator;

public GcsStreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
Expand Down Expand Up @@ -234,6 +234,16 @@ public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOExcep
.getService();
}

@VisibleForTesting
public String getTmpTableName() {
return tmpTableName;
}

@VisibleForTesting
public Set<String> getGcsStagingFiles() {
return gcsStagingFiles;
}

public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database,
String gcsFileLocation,
String schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ public abstract class S3StreamCopier implements StreamCopier {
protected final AmazonS3 s3Client;
protected final S3DestinationConfig s3Config;
protected final String tmpTableName;
private final DestinationSyncMode destSyncMode;
protected final String schemaName;
protected final String streamName;
protected final JdbcDatabase db;
protected final ConfiguredAirbyteStream configuredAirbyteStream;
protected final String stagingFolder;
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
private final DestinationSyncMode destSyncMode;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final ConfiguredAirbyteStream configuredAirbyteStream;
private final Timestamp uploadTime;
protected final String stagingFolder;
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
protected final Set<String> activeStagingWriterFileNames = new HashSet<>();
protected final Set<String> stagingFileNames = new LinkedHashSet<>();
private final boolean purgeStagingData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV APPLICATION_VERSION 0.4.12
ENV APPLICATION_VERSION 0.4.13
ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.12
LABEL io.airbyte.version=0.4.13
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package io.airbyte.integrations.destination.snowflake;

import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StreamCopier.MAX_FILES_PER_COPY;

import com.google.cloud.storage.Storage;
import com.google.common.collect.Lists;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
Expand All @@ -13,8 +17,14 @@
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeGcsStreamCopier extends GcsStreamCopier implements SnowflakeParallelCopyStreamCopier {

public class SnowflakeGcsStreamCopier extends GcsStreamCopier {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStreamCopier.class);

public SnowflakeGcsStreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
Expand All @@ -30,20 +40,44 @@ public SnowflakeGcsStreamCopier(final String stagingFolder,
this.filenameGenerator = stagingFilenameGenerator;
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {
List<List<String>> partitions = Lists.partition(new ArrayList<>(gcsStagingFiles), MAX_FILES_PER_COPY);
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName,
schemaName, partitions.size());

copyFilesInParallel(partitions);
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public void copyIntoStage(List<String> files) {

final var copyQuery = String.format(
"COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration "
+ " file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') "
+ "files = (" + generateFilesList(files) + " );",
schemaName,
tmpTableName,
generateBucketPath());

Exceptions.toRuntime(() -> db.execute(copyQuery));
}

@Override
public String generateBucketPath() {
return "gcs://" + gcsConfig.getBucketName() + "/" + stagingFolder + "/" + schemaName + "/";
}

@Override
public void copyGcsCsvFileIntoTable(final JdbcDatabase database,
final String gcsFileLocation,
final String schema,
final String tableName,
final GcsConfig gcsConfig)
throws SQLException {
final var copyQuery = String.format(
"COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');",
schema,
tableName,
gcsFileLocation);
throw new RuntimeException("Snowflake GCS Stream Copier should not copy individual files without use of a parallel copy");

database.execute(copyQuery);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

interface SnowflakeParallelCopyStreamCopier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add javadoc comments to the methods in this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add javadoc comments to the methods in this interface?

done


/**
* Generates list of staging files. See more
* https://docs.snowflake.com/en/user-guide/data-load-considerations-load.html#lists-of-files
*/
default String generateFilesList(List<String> files) {
StringJoiner joiner = new StringJoiner(",");
files.forEach(filename -> joiner.add("'" + filename.substring(filename.lastIndexOf("/") + 1) + "'"));
return joiner.toString();
}

/**
* Executes async copying of staging files.This method should block until the copy/upload has
* completed.
*/
default void copyFilesInParallel(List<List<String>> partitions) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<CompletableFuture<Void>> futures = partitions.stream()
.map(partition -> CompletableFuture.runAsync(() -> copyIntoStage(partition), executorService))
.collect(Collectors.toList());

try {
// wait until all futures ready
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (Exception e) {
throw new RuntimeException("Failed to copy files from stage to tmp table {}" + e);
} finally {
executorService.shutdown();
}
}

/**
* Copies staging files to the temporary table using <COPY INTO> statement
*/
void copyIntoStage(List<String> files);

/**
* Generates full bucket/container path to staging files
*/
String generateBucketPath();

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,33 @@

import com.amazonaws.services.s3.AmazonS3;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeS3StreamCopier extends S3StreamCopier {
public class SnowflakeS3StreamCopier extends S3StreamCopier implements SnowflakeParallelCopyStreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class);

// From https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html
// "Split your load data files so that the files are about equal size, between 1 MB and 1 GB after
// compression"
public static final int MAX_PARTS_PER_FILE = 4;
public static final int MAX_FILES_PER_COPY = 1000;

public SnowflakeS3StreamCopier(final String stagingFolder,
final String schema,
Expand Down Expand Up @@ -54,6 +64,7 @@ public SnowflakeS3StreamCopier(final String stagingFolder,
final SqlOperations sqlOperations,
final Timestamp uploadTime,
final ConfiguredAirbyteStream configuredAirbyteStream) {

super(stagingFolder,
schema,
client,
Expand All @@ -66,24 +77,47 @@ public SnowflakeS3StreamCopier(final String stagingFolder,
MAX_PARTS_PER_FILE);
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this completely identical to the methods in the GCS copier? if yes, let's extract them to a shared implementation rather than copy-pasting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao extracted shared implementation into SnowflakeParallelCopyStreamCopier interface

List<List<String>> partitions = Lists.partition(new ArrayList<>(stagingWritersByFile.keySet()), MAX_FILES_PER_COPY);
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName,
schemaName, partitions.size());

copyFilesInParallel(partitions);
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public void copyIntoStage(List<String> files) {
final var copyQuery = String.format(
"COPY INTO %s.%s FROM '%s' "
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') "
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') "
+ "files = (" + generateFilesList(files) + " );",
schemaName,
tmpTableName,
generateBucketPath(),
s3Config.getAccessKeyId(),
s3Config.getSecretAccessKey());

Exceptions.toRuntime(() -> db.execute(copyQuery));
}

@Override
public String generateBucketPath() {
return "s3://" + s3Config.getBucketName() + "/"
+ S3OutputPathHelper.getOutputPrefix(s3Config.getBucketPath(), configuredAirbyteStream.getStream()) + "/";
}

@Override
public void copyS3CsvFileIntoTable(final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config)
throws SQLException {
final var copyQuery = String.format(
"COPY INTO %s.%s FROM '%s' "
+ "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') "
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');",
schema,
tableName,
s3FileLocation,
s3Config.getAccessKeyId(),
s3Config.getSecretAccessKey());
throw new RuntimeException("Snowflake Stream Copier should not copy individual files without use of a parallel copy");

database.execute(copyQuery);
}

}
Loading