-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🎉 Destination Redshift (copy): accept bucket path for staging data #8607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 43 commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
4f8eb4f
rename to Legacy
edgao 0c4159d
add legacy test
edgao a62ead7
create S3StreamCopier
edgao daa9352
fix param deletion
edgao 48d25d6
wip
edgao babcc77
wip
edgao 098ce39
make tests work. mocks are awful.
edgao b949984
WIP replace old code; nothing works yet
edgao 533880e
add getObjectKey; add S3CsvWriterTest
edgao 36bdea8
write to multiple files correctly
edgao b03896a
correct deleteStagingFiles test
edgao dc8a2c4
completed things
edgao 94029e4
fix test
edgao 3019ef0
unit capitalization
edgao 50a3a29
formatting
edgao 7dfd452
wip
edgao 15b9f27
remove mistaken dep
edgao c54baf3
use UUID suffix
edgao 5c367df
various improvements
edgao 2b27a4e
optional header; unit test file contents
edgao 2bcf9a5
fix field name
edgao 24c6a19
remove comment
edgao d860fd6
RECORD CLASS RECORD CLASS
edgao ff400c7
warning
edgao 0fe979b
text block
edgao 685f366
add more csv options
edgao d327bcc
update comment
edgao 0f7b2bd
assert copy operation
edgao 866db40
add test
edgao 1b4f41f
cutover to non-legacy stream copier
edgao 89f53fe
update param name
edgao 5cd8b56
minor comments about sheet generator + Flattening
edgao 7a1de37
timezones :(
edgao d4fcff7
add dup code comment
edgao 9d3c6c1
delete redundant tests
edgao e975bb8
manifest also exists within bucketPath
edgao 0ef601e
add comment
edgao 734141d
better comment
edgao 8cd2e51
rename getObjectKey + add javadoc
edgao 842e1d9
explain default behavior
edgao 485fe5d
remove from abstract classes
edgao 1bdfab4
Merge branch 'master' into edgao/s3_based_stream_copier
edgao ed07183
reformat
edgao 5d0427a
add implementation for getObjectPath
edgao fed6c36
prepare for publish
edgao f58bb90
follow doc conventions
edgao 87b8543
follow doc conventions
edgao 028c3b6
rename to getOutputPath
edgao 192d6a3
add comment
edgao 257dcb7
Merge branch 'master' into edgao/s3_based_stream_copier
edgao 372098d
Merge branch 'master' into edgao/s3_based_stream_copier
edgao 7625bee
regenerate seed specs
edgao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
235 changes: 235 additions & 0 deletions
235
.../src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/LegacyS3StreamCopier.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.jdbc.copy.s3; | ||
|
||
import alex.mojaki.s3upload.MultiPartOutputStream; | ||
import alex.mojaki.s3upload.StreamTransferManager; | ||
import com.amazonaws.services.s3.AmazonS3; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import io.airbyte.protocol.models.DestinationSyncMode; | ||
import java.io.IOException; | ||
import java.io.PrintWriter; | ||
import java.nio.charset.StandardCharsets; | ||
import java.sql.SQLException; | ||
import java.sql.Timestamp; | ||
import java.time.Instant; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import org.apache.commons.csv.CSVFormat; | ||
import org.apache.commons.csv.CSVPrinter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* @deprecated See {@link S3StreamCopier} | ||
*/ | ||
@Deprecated | ||
public abstract class LegacyS3StreamCopier implements StreamCopier { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(LegacyS3StreamCopier.class); | ||
|
||
private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default. | ||
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS; | ||
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file. | ||
// The BATCH_SIZE is defined in CopyConsumerFactory. | ||
// The average size of such a file will be about 1 GB. | ||
// This will make it easier to work with files and speed up the recording of large amounts of data. | ||
// In addition, for a large number of records, we will not get a drop in the copy request to | ||
// QUERY_TIMEOUT when | ||
// the records from the file are copied to the staging table. | ||
public static final int MAX_PARTS_PER_FILE = 1000; | ||
|
||
protected final AmazonS3 s3Client; | ||
protected final S3DestinationConfig s3Config; | ||
protected final String tmpTableName; | ||
private final DestinationSyncMode destSyncMode; | ||
protected final String schemaName; | ||
protected final String streamName; | ||
protected final JdbcDatabase db; | ||
private final ExtendedNameTransformer nameTransformer; | ||
private final SqlOperations sqlOperations; | ||
protected final Set<String> s3StagingFiles = new HashSet<>(); | ||
private final Map<String, StreamTransferManager> multipartUploadManagers = new HashMap<>(); | ||
private final Map<String, MultiPartOutputStream> outputStreams = new HashMap<>(); | ||
private final Map<String, CSVPrinter> csvPrinters = new HashMap<>(); | ||
protected final String stagingFolder; | ||
private final StagingFilenameGenerator filenameGenerator; | ||
|
||
public LegacyS3StreamCopier(final String stagingFolder, | ||
final DestinationSyncMode destSyncMode, | ||
final String schema, | ||
final String streamName, | ||
final AmazonS3 client, | ||
final JdbcDatabase db, | ||
final S3DestinationConfig s3Config, | ||
final ExtendedNameTransformer nameTransformer, | ||
final SqlOperations sqlOperations) { | ||
this.destSyncMode = destSyncMode; | ||
this.schemaName = schema; | ||
this.streamName = streamName; | ||
this.stagingFolder = stagingFolder; | ||
this.db = db; | ||
this.nameTransformer = nameTransformer; | ||
this.sqlOperations = sqlOperations; | ||
this.tmpTableName = nameTransformer.getTmpTableName(streamName); | ||
this.s3Client = client; | ||
this.s3Config = s3Config; | ||
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE); | ||
} | ||
|
||
private String prepareS3StagingFile() { | ||
return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename()); | ||
} | ||
|
||
@Override | ||
public String prepareStagingFile() { | ||
final var name = prepareS3StagingFile(); | ||
if (!s3StagingFiles.contains(name)) { | ||
s3StagingFiles.add(name); | ||
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); | ||
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not | ||
// have support for streaming multipart uploads; | ||
// The alternative is first writing the entire output to disk before loading into S3. This is not | ||
// feasible with large tables. | ||
// Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's | ||
// configured part size. | ||
// Memory consumption is (numUploadThreads + queue capacity) * part size = (10 + 10) * 10 = 200 MB | ||
// at current configurations. | ||
final var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) | ||
.numUploadThreads(DEFAULT_UPLOAD_THREADS) | ||
.queueCapacity(DEFAULT_QUEUE_CAPACITY) | ||
.partSize(s3Config.getPartSize()); | ||
multipartUploadManagers.put(name, manager); | ||
final var outputStream = manager.getMultiPartOutputStreams().get(0); | ||
// We only need one output stream as we only have one input stream. This is reasonably performant. | ||
// See the above comment. | ||
outputStreams.put(name, outputStream); | ||
final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); | ||
try { | ||
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); | ||
} catch (final IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
return name; | ||
} | ||
|
||
@Override | ||
public void write(final UUID id, final AirbyteRecordMessage recordMessage, final String s3FileName) throws Exception { | ||
if (csvPrinters.containsKey(s3FileName)) { | ||
csvPrinters.get(s3FileName).printRecord(id, | ||
Jsons.serialize(recordMessage.getData()), | ||
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); | ||
} | ||
} | ||
|
||
@Override | ||
public void closeStagingUploader(final boolean hasFailed) throws Exception { | ||
if (hasFailed) { | ||
for (final var multipartUploadManager : multipartUploadManagers.values()) { | ||
multipartUploadManager.abort(); | ||
} | ||
} | ||
closeAndWaitForUpload(); | ||
} | ||
|
||
@Override | ||
public void createDestinationSchema() throws Exception { | ||
LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName); | ||
sqlOperations.createSchemaIfNotExists(db, schemaName); | ||
} | ||
|
||
@Override | ||
public void createTemporaryTable() throws Exception { | ||
LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName); | ||
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName); | ||
} | ||
|
||
@Override | ||
public void copyStagingFileToTemporaryTable() throws Exception { | ||
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); | ||
s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> { | ||
copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); | ||
})); | ||
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); | ||
} | ||
|
||
@Override | ||
public String createDestinationTable() throws Exception { | ||
final var destTableName = nameTransformer.getRawTableName(streamName); | ||
LOGGER.info("Preparing table {} in destination.", destTableName); | ||
sqlOperations.createTableIfNotExists(db, schemaName, destTableName); | ||
LOGGER.info("Table {} in destination prepared.", tmpTableName); | ||
|
||
return destTableName; | ||
} | ||
|
||
@Override | ||
public String generateMergeStatement(final String destTableName) { | ||
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName); | ||
final var queries = new StringBuilder(); | ||
if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) { | ||
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName)); | ||
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName); | ||
} | ||
queries.append(sqlOperations.copyTableQuery(db, schemaName, tmpTableName, destTableName)); | ||
return queries.toString(); | ||
} | ||
|
||
@Override | ||
public void removeFileAndDropTmpTable() throws Exception { | ||
s3StagingFiles.forEach(s3StagingFile -> { | ||
LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile); | ||
if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) { | ||
s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile); | ||
} | ||
LOGGER.info("S3 staging file {} cleaned.", s3StagingFile); | ||
}); | ||
|
||
LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); | ||
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); | ||
LOGGER.info("{} tmp table in destination cleaned.", tmpTableName); | ||
} | ||
|
||
protected static String getFullS3Path(final String s3BucketName, final String s3StagingFile) { | ||
return String.join("/", "s3:/", s3BucketName, s3StagingFile); | ||
} | ||
|
||
/** | ||
* Closes the printers/outputstreams and waits for any buffered uploads to complete. | ||
*/ | ||
private void closeAndWaitForUpload() throws IOException { | ||
LOGGER.info("Uploading remaining data for {} stream.", streamName); | ||
for (final var csvPrinter : csvPrinters.values()) { | ||
csvPrinter.close(); | ||
} | ||
for (final var outputStream : outputStreams.values()) { | ||
outputStream.close(); | ||
} | ||
for (final var multipartUploadManager : multipartUploadManagers.values()) { | ||
multipartUploadManager.complete(); | ||
} | ||
LOGGER.info("All data for {} stream uploaded.", streamName); | ||
} | ||
|
||
public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, | ||
String s3FileLocation, | ||
String schema, | ||
String tableName, | ||
S3DestinationConfig s3Config) | ||
throws SQLException; | ||
|
||
} |
61 changes: 61 additions & 0 deletions
61
...in/java/io/airbyte/integrations/destination/jdbc/copy/s3/LegacyS3StreamCopierFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.jdbc.copy.s3; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import io.airbyte.protocol.models.DestinationSyncMode; | ||
|
||
/** | ||
* See {@link S3StreamCopierFactory} instead. | ||
*/ | ||
@Deprecated | ||
public abstract class LegacyS3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similarly to LegacyS3StreamCopier; this is identical to master's S3StreamCopierFactory, modulo the classname. |
||
|
||
/** | ||
* Used by the copy consumer. | ||
*/ | ||
@Override | ||
public StreamCopier create(final String configuredSchema, | ||
final S3DestinationConfig s3Config, | ||
final String stagingFolder, | ||
final ConfiguredAirbyteStream configuredStream, | ||
final ExtendedNameTransformer nameTransformer, | ||
final JdbcDatabase db, | ||
final SqlOperations sqlOperations) { | ||
try { | ||
final AirbyteStream stream = configuredStream.getStream(); | ||
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); | ||
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); | ||
final AmazonS3 s3Client = s3Config.getS3Client(); | ||
|
||
return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations); | ||
} catch (final Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* For specific copier suppliers to implement. | ||
*/ | ||
public abstract StreamCopier create(String stagingFolder, | ||
DestinationSyncMode syncMode, | ||
String schema, | ||
String streamName, | ||
AmazonS3 s3Client, | ||
JdbcDatabase db, | ||
S3DestinationConfig s3Config, | ||
ExtendedNameTransformer nameTransformer, | ||
SqlOperations sqlOperations) | ||
throws Exception; | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.