Skip to content

Commit 28916dc

Browse files
authored
🎉 Destination GCS: fix connection check (#10299)
* Use stream transfer manager in check command * Refactor code * Move the check method to s3 destination * Bump versions * Revert s3 version bump * Bump gcs version in seed
1 parent 39049cb commit 28916dc

File tree

6 files changed

+60
-65
lines changed

6 files changed

+60
-65
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
- name: Google Cloud Storage (GCS)
6161
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
6262
dockerRepository: airbyte/destination-gcs
63-
dockerImageTag: 0.1.20
63+
dockerImageTag: 0.1.21
6464
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
6565
icon: googlecloudstorage.svg
6666
- name: Google Firestore

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1162,7 +1162,7 @@
11621162
- "overwrite"
11631163
- "append"
11641164
supportsNamespaces: true
1165-
- dockerImage: "airbyte/destination-gcs:0.1.20"
1165+
- dockerImage: "airbyte/destination-gcs:0.1.21"
11661166
spec:
11671167
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
11681168
connectionSpecification:

airbyte-integrations/connectors/destination-gcs/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.20
19+
LABEL io.airbyte.version=0.1.21
2020
LABEL io.airbyte.name=airbyte/destination-gcs

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java

+6-61
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,19 @@
55
package io.airbyte.integrations.destination.gcs;
66

77
import com.amazonaws.services.s3.AmazonS3;
8-
import com.amazonaws.services.s3.transfer.TransferManager;
9-
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
10-
import com.amazonaws.services.s3.transfer.Upload;
118
import com.fasterxml.jackson.databind.JsonNode;
129
import io.airbyte.integrations.BaseConnector;
1310
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1411
import io.airbyte.integrations.base.Destination;
1512
import io.airbyte.integrations.base.IntegrationRunner;
1613
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
1714
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory;
15+
import io.airbyte.integrations.destination.s3.S3Destination;
1816
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1917
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
2018
import io.airbyte.protocol.models.AirbyteMessage;
2119
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
22-
import java.io.File;
23-
import java.io.FileWriter;
24-
import java.io.IOException;
2520
import java.util.function.Consumer;
26-
import org.apache.commons.lang3.StringUtils;
2721
import org.slf4j.Logger;
2822
import org.slf4j.LoggerFactory;
2923

@@ -33,25 +27,21 @@ public class GcsDestination extends BaseConnector implements Destination {
3327
public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, "
3428
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list";
3529

36-
public static final String CHECK_ACTIONS_TMP_FILE_NAME = "test";
37-
public static final String DUMMY_TEXT = "This is just a dummy text to write to test file";
38-
3930
public static void main(final String[] args) throws Exception {
4031
new IntegrationRunner(new GcsDestination()).run(args);
4132
}
4233

4334
@Override
4435
public AirbyteConnectionStatus check(final JsonNode config) {
4536
try {
46-
final GcsDestinationConfig destinationConfig = GcsDestinationConfig
47-
.getGcsDestinationConfig(config);
37+
final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
4838
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
4939

50-
// Test single Upload (for small files) permissions
51-
testSingleUpload(s3Client, destinationConfig);
40+
// Test single upload (for small files) permissions
41+
S3Destination.testSingleUpload(s3Client, destinationConfig.getBucketName());
5242

53-
// Test Multipart Upload permissions
54-
testMultipartUpload(s3Client, destinationConfig);
43+
// Test multipart upload with stream transfer manager
44+
S3Destination.testMultipartUpload(s3Client, destinationConfig.getBucketName());
5545

5646
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
5747
} catch (final Exception e) {
@@ -65,51 +55,6 @@ public AirbyteConnectionStatus check(final JsonNode config) {
6555
}
6656
}
6757

68-
private void testSingleUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig) {
69-
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
70-
s3Client.putObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, DUMMY_TEXT);
71-
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
72-
LOGGER.info("Finished checking for normal upload mode");
73-
}
74-
75-
private void testMultipartUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig)
76-
throws Exception {
77-
78-
LOGGER.info("Started testing if all required credentials assigned to user for Multipart upload");
79-
final TransferManager tm = TransferManagerBuilder.standard()
80-
.withS3Client(s3Client)
81-
// Sets the size threshold, in bytes, for when to use multipart uploads. Uploads over this size will
82-
// automatically use a multipart upload strategy, while uploads smaller than this threshold will use
83-
// a single connection to upload the whole object. So we need to set it as small for testing
84-
// connection. See javadoc for more details.
85-
.withMultipartUploadThreshold(1024L) // set 1KB as part size
86-
.build();
87-
88-
try {
89-
// TransferManager processes all transfers asynchronously,
90-
// so this call returns immediately.
91-
final Upload upload = tm.upload(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, getTmpFileToUpload());
92-
upload.waitForCompletion();
93-
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
94-
} finally {
95-
tm.shutdownNow(true);
96-
}
97-
LOGGER.info("Finished verification for multipart upload mode");
98-
}
99-
100-
private File getTmpFileToUpload() throws IOException {
101-
final File tmpFile = File.createTempFile(CHECK_ACTIONS_TMP_FILE_NAME, ".tmp");
102-
try (final FileWriter writer = new FileWriter(tmpFile)) {
103-
// Text should be bigger than Threshold's size to make client use a multipart upload strategy,
104-
// smaller than threshold will use a single connection to upload the whole object even if multipart
105-
// upload option is ON. See {@link TransferManagerBuilder#withMultipartUploadThreshold}
106-
// javadoc for more information.
107-
108-
writer.write(StringUtils.repeat(DUMMY_TEXT, 1000));
109-
}
110-
return tmpFile;
111-
}
112-
11358
@Override
11459
public AirbyteMessageConsumer getConsumer(final JsonNode config,
11560
final ConfiguredAirbyteCatalog configuredCatalog,

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java

+50-1
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,29 @@
44

55
package io.airbyte.integrations.destination.s3;
66

7+
import alex.mojaki.s3upload.MultiPartOutputStream;
8+
import alex.mojaki.s3upload.StreamTransferManager;
79
import com.amazonaws.services.s3.AmazonS3;
810
import com.fasterxml.jackson.databind.JsonNode;
911
import com.google.common.annotations.VisibleForTesting;
1012
import io.airbyte.integrations.BaseConnector;
1113
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1214
import io.airbyte.integrations.base.Destination;
1315
import io.airbyte.integrations.base.IntegrationRunner;
16+
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
1417
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory;
1518
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
1619
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1720
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
1821
import io.airbyte.protocol.models.AirbyteMessage;
1922
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
23+
import java.io.IOException;
24+
import java.io.PrintWriter;
25+
import java.nio.charset.StandardCharsets;
2026
import java.util.UUID;
2127
import java.util.function.Consumer;
28+
import org.apache.commons.csv.CSVFormat;
29+
import org.apache.commons.csv.CSVPrinter;
2230
import org.slf4j.Logger;
2331
import org.slf4j.LoggerFactory;
2432

@@ -33,7 +41,15 @@ public static void main(final String[] args) throws Exception {
3341
@Override
3442
public AirbyteConnectionStatus check(final JsonNode config) {
3543
try {
36-
attemptS3WriteAndDelete(S3DestinationConfig.getS3DestinationConfig(config), config.get("s3_bucket_path").asText());
44+
final S3DestinationConfig destinationConfig = S3DestinationConfig.getS3DestinationConfig(config);
45+
final AmazonS3 s3Client = destinationConfig.getS3Client();
46+
47+
// Test single upload (for small files) permissions
48+
testSingleUpload(s3Client, destinationConfig.getBucketName());
49+
50+
// Test multipart upload with stream transfer manager
51+
testMultipartUpload(s3Client, destinationConfig.getBucketName());
52+
3753
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
3854
} catch (final Exception e) {
3955
LOGGER.error("Exception attempting to access the S3 bucket: ", e);
@@ -44,6 +60,39 @@ public AirbyteConnectionStatus check(final JsonNode config) {
4460
}
4561
}
4662

63+
public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName) {
64+
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
65+
final String testFile = "test_" + System.currentTimeMillis();
66+
s3Client.putObject(bucketName, testFile, "this is a test file");
67+
s3Client.deleteObject(bucketName, testFile);
68+
LOGGER.info("Finished checking for normal upload mode");
69+
}
70+
71+
public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName) throws IOException {
72+
LOGGER.info("Started testing if all required credentials assigned to user for multipart upload");
73+
74+
final String testFile = "test_" + System.currentTimeMillis();
75+
final StreamTransferManager manager = S3StreamTransferManagerHelper.getDefault(
76+
bucketName,
77+
testFile,
78+
s3Client,
79+
(long) S3StreamTransferManagerHelper.DEFAULT_PART_SIZE_MB);
80+
81+
try (final MultiPartOutputStream outputStream = manager.getMultiPartOutputStreams().get(0);
82+
final CSVPrinter csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8), CSVFormat.DEFAULT)) {
83+
final String oneMegaByteString = "a".repeat(500_000);
84+
// write a file larger than the 5 MB, which is the default part size, to make sure it is a multipart upload
85+
for (int i = 0; i < 7; ++i) {
86+
csvPrinter.printRecord(System.currentTimeMillis(), oneMegaByteString);
87+
}
88+
}
89+
90+
manager.complete();
91+
s3Client.deleteObject(bucketName, testFile);
92+
93+
LOGGER.info("Finished verification for multipart upload mode");
94+
}
95+
4796
@Override
4897
public AirbyteMessageConsumer getConsumer(final JsonNode config,
4998
final ConfiguredAirbyteCatalog configuredCatalog,

docs/integrations/destinations/gcs.md

+1
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A
229229
230230
| Version | Date | Pull Request | Subject |
231231
| :--- | :--- | :--- | :--- |
232+
| 0.1.21 | 2022-02-12 | [\#10299](https://github.com/airbytehq/airbyte/pull/10299) | Fix connection check to require only the necessary permissions. |
232233
| 0.1.20 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. |
233234
| 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
234235
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |

0 commit comments

Comments
 (0)