Skip to content

🎉 Destination Redshift (copy): accept bucket path for staging data #8607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4f8eb4f
rename to Legacy
edgao Dec 7, 2021
0c4159d
add legacy test
edgao Dec 7, 2021
a62ead7
create S3StreamCopier
edgao Dec 7, 2021
daa9352
fix param deletion
edgao Dec 7, 2021
48d25d6
wip
edgao Dec 8, 2021
babcc77
wip
edgao Dec 8, 2021
098ce39
make tests work. mocks are awful.
edgao Dec 9, 2021
b949984
WIP replace old code; nothing works yet
edgao Dec 9, 2021
533880e
add getObjectKey; add S3CsvWriterTest
edgao Dec 9, 2021
36bdea8
write to multiple files correctly
edgao Dec 9, 2021
b03896a
correct deleteStagingFiles test
edgao Dec 9, 2021
dc8a2c4
completed things
edgao Dec 9, 2021
94029e4
fix test
edgao Dec 9, 2021
3019ef0
unit capitalization
edgao Dec 11, 2021
50a3a29
formatting
edgao Dec 11, 2021
7dfd452
wip
edgao Dec 11, 2021
15b9f27
remove mistaken dep
edgao Dec 13, 2021
c54baf3
use UUID suffix
edgao Dec 13, 2021
5c367df
various improvements
edgao Dec 13, 2021
2b27a4e
optional header; unit test file contents
edgao Dec 14, 2021
2bcf9a5
fix field name
edgao Dec 14, 2021
24c6a19
remove comment
edgao Dec 14, 2021
d860fd6
RECORD CLASS RECORD CLASS
edgao Dec 14, 2021
ff400c7
warning
edgao Dec 14, 2021
0fe979b
text block
edgao Dec 14, 2021
685f366
add more csv options
edgao Dec 14, 2021
d327bcc
update comment
edgao Dec 14, 2021
0f7b2bd
assert copy operation
edgao Dec 14, 2021
866db40
add test
edgao Dec 14, 2021
1b4f41f
cutover to non-legacy stream copier
edgao Dec 14, 2021
89f53fe
update param name
edgao Dec 14, 2021
5cd8b56
minor comments about sheet generator + Flattening
edgao Dec 14, 2021
7a1de37
timezones :(
edgao Dec 15, 2021
d4fcff7
add dup code comment
edgao Dec 15, 2021
9d3c6c1
delete redundant tests
edgao Dec 15, 2021
e975bb8
manifest also exists within bucketPath
edgao Dec 15, 2021
0ef601e
add comment
edgao Dec 15, 2021
734141d
better comment
edgao Dec 15, 2021
8cd2e51
rename getObjectKey + add javadoc
edgao Dec 15, 2021
842e1d9
explain default behavior
edgao Dec 15, 2021
485fe5d
remove from abstract classes
edgao Dec 15, 2021
1bdfab4
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 15, 2021
ed07183
reformat
edgao Dec 16, 2021
5d0427a
add implementation for getObjectPath
edgao Dec 16, 2021
fed6c36
prepare for publish
edgao Dec 16, 2021
f58bb90
follow doc conventions
edgao Dec 16, 2021
87b8543
follow doc conventions
edgao Dec 16, 2021
028c3b6
rename to getOutputPath
edgao Dec 16, 2021
192d6a3
add comment
edgao Dec 16, 2021
257dcb7
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 16, 2021
372098d
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 17, 2021
7625bee
regenerate seed specs
edgao Dec 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
Expand All @@ -24,7 +25,7 @@

/**
* This implementation is similar to
* {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that
* {@link LegacyS3StreamCopier}. The difference is that
* this implementation creates Parquet staging files, instead of CSV ones.
* <p>
* </p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public class GcsAvroWriter extends BaseGcsWriter implements S3Writer {
private final DataFileWriter<GenericData.Record> dataFileWriter;

public GcsAvroWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonAvroConverter converter)
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonAvroConverter converter)
throws IOException {
super(config, s3Client, configuredStream);

Expand Down Expand Up @@ -85,4 +85,9 @@ protected void closeWhenFail() throws IOException {
uploadManager.abort();
}

@Override
public String getObjectKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be implemented as part of this PR? or separate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look at this; not sure how much effort it will be. If it's reasonably straightforward I'll implement this on all the writers.

Copy link
Contributor

@sherifnada sherifnada Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can also just be a separate PR, no reason to make this PR super big

// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer {
private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type)

public GcsCsvWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp)
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp)
throws IOException {
super(config, s3Client, configuredStream);

Expand Down Expand Up @@ -90,4 +90,9 @@ public CSVPrinter getCsvPrinter() {
return csvPrinter;
}

@Override
public String getObjectKey() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer {
private final PrintWriter printWriter;

public GcsJsonlWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp) {
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp) {
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL);
Expand Down Expand Up @@ -78,4 +78,9 @@ protected void closeWhenFail() {
uploadManager.abort();
}

@Override
public String getObjectKey() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public class GcsParquetWriter extends BaseGcsWriter implements S3Writer {
private final AvroRecordFactory avroRecordFactory;

public GcsParquetWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonAvroConverter converter)
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonAvroConverter converter)
throws URISyntaxException, IOException {
super(config, s3Client, configuredStream);

Expand Down Expand Up @@ -109,4 +109,9 @@ public void close(final boolean hasFailed) throws IOException {
}
}

@Override
public String getObjectKey() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public abstract class BaseGcsWriter implements S3Writer {
protected final String outputPrefix;

protected BaseGcsWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream) {
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream) {
this.config = config;
this.s3Client = s3Client;
this.stream = configuredStream.getStream();
Expand Down Expand Up @@ -90,9 +90,8 @@ public void initialize() {
}

/**
* {@link AmazonS3#doesBucketExistV2} should be used to check the bucket existence. However, this
* method does not work for GCS. So we use {@link AmazonS3#headBucket} instead, which will throw an
* exception if the bucket does not exist, or there is no permission to access it.
* {@link AmazonS3#doesBucketExistV2} should be used to check the bucket existence. However, this method does not work for GCS. So we use {@link
* AmazonS3#headBucket} instead, which will throw an exception if the bucket does not exist, or there is no permission to access it.
*/
public boolean gcsBucketExist(final AmazonS3 s3Client, final String bucket) {
try {
Expand Down Expand Up @@ -141,4 +140,9 @@ public static String getOutputFilename(final Timestamp timestamp, final S3Format
format.getFileExtension());
}

@Override
public String getObjectKey() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation "org.mockito:mockito-inline:4.1.0"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CopyConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 mib
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import java.util.UUID;

/**
* StreamCopier is responsible for writing to a staging persistence and providing methods to remove
* the staged data.
* StreamCopier is responsible for writing to a staging persistence and providing methods to remove the staged data.
*/
public interface StreamCopier {

Expand All @@ -19,8 +18,8 @@ public interface StreamCopier {
void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception;

/**
* Closes the writer for the stream to the staging persistence. This method should block until all
* buffered data has been written to the persistence.
* Closes the writer for the stream to the staging persistence. This method should block until all buffered data has been written to the
* persistence.
*/
void closeStagingUploader(boolean hasFailed) throws Exception;

Expand All @@ -30,8 +29,7 @@ public interface StreamCopier {
void createTemporaryTable() throws Exception;

/**
* Copies the staging file to the temporary table. This method should block until the copy/upload
* has completed.
* Copies the staging file to the temporary table. This method should block until the copy/upload has completed.
*/
void copyStagingFileToTemporaryTable() throws Exception;

Expand All @@ -53,15 +51,14 @@ public interface StreamCopier {
String generateMergeStatement(String destTableName) throws Exception;

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
* completion or failure.
* Cleans up the copier by removing the staging file and dropping the temporary table after completion or failure.
*/
void removeFileAndDropTmpTable() throws Exception;

/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix that is appended to a shared filename prefix
*/
String prepareStagingFile();

Expand Down
Loading