Skip to content

🎉 Destination GCS: fix connection check #10299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.20
dockerImageTag: 0.1.21
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google Firestore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.1.20"
- dockerImage: "airbyte/destination-gcs:0.1.21"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.version=0.1.21
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@
package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,25 +27,21 @@ public class GcsDestination extends BaseConnector implements Destination {
public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, "
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list";

public static final String CHECK_ACTIONS_TMP_FILE_NAME = "test";
public static final String DUMMY_TEXT = "This is just a dummy text to write to test file";

public static void main(final String[] args) throws Exception {
new IntegrationRunner(new GcsDestination()).run(args);
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final GcsDestinationConfig destinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(config);
final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);

// Test single Upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig);
// Test single upload (for small files) permissions
S3Destination.testSingleUpload(s3Client, destinationConfig.getBucketName());

// Test Multipart Upload permissions
testMultipartUpload(s3Client, destinationConfig);
// Test multipart upload with stream transfer manager
S3Destination.testMultipartUpload(s3Client, destinationConfig.getBucketName());

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
Expand All @@ -65,51 +55,6 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private void testSingleUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
s3Client.putObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, DUMMY_TEXT);
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
LOGGER.info("Finished checking for normal upload mode");
}

private void testMultipartUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig)
throws Exception {

LOGGER.info("Started testing if all required credentials assigned to user for Multipart upload");
final TransferManager tm = TransferManagerBuilder.standard()
.withS3Client(s3Client)
// Sets the size threshold, in bytes, for when to use multipart uploads. Uploads over this size will
// automatically use a multipart upload strategy, while uploads smaller than this threshold will use
// a single connection to upload the whole object. So we need to set it as small for testing
// connection. See javadoc for more details.
.withMultipartUploadThreshold(1024L) // set 1KB as part size
.build();

try {
// TransferManager processes all transfers asynchronously,
// so this call returns immediately.
final Upload upload = tm.upload(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, getTmpFileToUpload());
upload.waitForCompletion();
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
} finally {
tm.shutdownNow(true);
}
LOGGER.info("Finished verification for multipart upload mode");
}

private File getTmpFileToUpload() throws IOException {
final File tmpFile = File.createTempFile(CHECK_ACTIONS_TMP_FILE_NAME, ".tmp");
try (final FileWriter writer = new FileWriter(tmpFile)) {
// Text should be bigger than Threshold's size to make client use a multipart upload strategy,
// smaller than threshold will use a single connection to upload the whole object even if multipart
// upload option is ON. See {@link TransferManagerBuilder#withMultipartUploadThreshold}
// javadoc for more information.

writer.write(StringUtils.repeat(DUMMY_TEXT, 1000));
}
return tmpFile;
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@

package io.airbyte.integrations.destination.s3;

import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory;
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,7 +41,15 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
attemptS3WriteAndDelete(S3DestinationConfig.getS3DestinationConfig(config), config.get("s3_bucket_path").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just replace all usages of this method, or is it possible that the staging warehouse destinations do actually need those extra permissions? E.g.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would depend on the use case, i.e. how the warehouse uses the S3 or other staging bucket. Since there is no complaint from the community about any permission issue for those destinations, we have not looked into them yet.

final S3DestinationConfig destinationConfig = S3DestinationConfig.getS3DestinationConfig(config);
final AmazonS3 s3Client = destinationConfig.getS3Client();

// Test single upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig.getBucketName());

// Test multipart upload with stream transfer manager
testMultipartUpload(s3Client, destinationConfig.getBucketName());

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the S3 bucket: ", e);
Expand All @@ -44,6 +60,39 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
final String testFile = "test_" + System.currentTimeMillis();
s3Client.putObject(bucketName, testFile, "this is a test file");
s3Client.deleteObject(bucketName, testFile);
LOGGER.info("Finished checking for normal upload mode");
}

public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName) throws IOException {
LOGGER.info("Started testing if all required credentials assigned to user for multipart upload");

final String testFile = "test_" + System.currentTimeMillis();
final StreamTransferManager manager = S3StreamTransferManagerHelper.getDefault(
bucketName,
testFile,
s3Client,
(long) S3StreamTransferManagerHelper.DEFAULT_PART_SIZE_MB);

try (final MultiPartOutputStream outputStream = manager.getMultiPartOutputStreams().get(0);
final CSVPrinter csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8), CSVFormat.DEFAULT)) {
final String oneMegaByteString = "a".repeat(500_000);
// write a file larger than the 5 MB, which is the default part size, to make sure it is a multipart upload
for (int i = 0; i < 7; ++i) {
csvPrinter.printRecord(System.currentTimeMillis(), oneMegaByteString);
}
}

manager.complete();
s3Client.deleteObject(bucketName, testFile);

LOGGER.info("Finished verification for multipart upload mode");
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.21 | 2022-02-12 | [\#10299](https://github.com/airbytehq/airbyte/pull/10299) | Fix connection check to require only the necessary permissions. |
| 0.1.20 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. |
| 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |
Expand Down