Skip to content

Commit 9d7ab35

Browse files
tulirenshmfMaxwellJKsherifnada
authored andcommitted
🎉 New Destination: Google Cloud Storage (#4784)
* Adding Google Cloud Storage as destination * Removed few comments and amended the version * Added documentation in docs/integrations/destinations/gcs.md * Amended gcs.md with the right pull id * Implemented all the fixes requested by tuliren as per #4329 * Renaming all the files * Branch alligned to S3 0.1.7 (with Avro and Jsonl). Removed redundant file by making S3 a dependency for GCS * Removed some additional duplicates between GCS and S3 * Revert changes in the root files * Revert jdbc files * Fix package names * Refactor gcs config * Format code * Fix gcs connection * Format code * Add acceptance tests * Fix parquet acceptance test * Add ci credentials * Register the connector and update documentations * Fix typo * Format code * Add unit test * Add comments * Update readme Co-authored-by: Sherif A. Nada <[email protected]> Co-authored-by: Marco Fontana <[email protected]> Co-authored-by: [email protected] <[email protected]> Co-authored-by: Marco Fontana <[email protected]> Co-authored-by: Sherif A. Nada <[email protected]>
1 parent e2d4b4f commit 9d7ab35

40 files changed

+2787
-5
lines changed

.github/workflows/publish-command.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ jobs:
139139
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
140140
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
141141
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
142+
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
142143
- run: |
143144
docker login -u airbytebot -p ${DOCKER_PASSWORD}
144145
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }}

.github/workflows/test-command.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ jobs:
137137
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
138138
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
139139
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
140+
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
140141
- run: |
141142
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
142143
name: test ${{ github.event.inputs.connector }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
3+
"name": "Google Cloud Storage (GCS)",
4+
"dockerRepository": "airbyte/destination-gcs",
5+
"dockerImageTag": "0.1.0",
6+
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs"
7+
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
dockerRepository: airbyte/destination-bigquery-denormalized
2525
dockerImageTag: 0.1.0
2626
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
27+
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
28+
name: Google Cloud Storage (GCS)
29+
dockerRepository: airbyte/destination-gcs
30+
dockerImageTag: 0.1.0
31+
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
2732
- destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
2833
name: Google PubSub
2934
dockerRepository: airbyte/destination-pubsub

airbyte-integrations/builds.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,16 @@
129129
# Destinations
130130
BigQuery [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-bigquery)
131131

132+
Google Cloud Storage (GCS) [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-gcs)
133+
134+
Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub)
135+
132136
Local CSV [![destination-csv](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-csv%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-csv)
133137

134138
Local JSON [![destination-local-json](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-local-json%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-local-json)
135139

136140
Postgres [![destination-postgres](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-postgres%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-postgres)
137141

138-
Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub)
139-
140142
Redshift [![destination-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-redshift%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-redshift)
141143

142144
S3 [![destination-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-s3)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
!Dockerfile
3+
!build
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM airbyte/integration-base-java:dev
2+
3+
WORKDIR /airbyte
4+
ENV APPLICATION destination-gcs
5+
6+
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
7+
8+
RUN tar xf ${APPLICATION}.tar --strip-components=1
9+
10+
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.name=airbyte/destination-gcs
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Destination Google Cloud Storage (GCS)
2+
3+
In order to test the D3 destination, you need an Google Cloud Platform account.
4+
5+
## Community Contributor
6+
7+
As a community contributor, you can follow these steps to run integration tests.
8+
9+
- Create an GCS bucket for testing.
10+
- Generate a [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) for the bucket with reading and writing permissions. Please note that currently only the HMAC key credential is supported. More credential types will be added in the future.
11+
- Paste the bucket and key information into the config files under [`./sample_secrets`](./sample_secrets).
12+
- Rename the directory from `sample_secrets` to `secrets`.
13+
- Feel free to modify the config files with different settings in the acceptance test file (e.g. `GcsCsvDestinationAcceptanceTest.java`, method `getFormatConfig`), as long as they follow the schema defined in [spec.json](src/main/resources/spec.json).
14+
15+
## Airbyte Employee
16+
17+
- Access the `destination gcs creds` secrets on Last Pass, and put it in `sample_secrets/config.json`.
18+
- Rename the directory from `sample_secrets` to `secrets`.
19+
20+
## Add New Output Format
21+
- Add a new enum in `S3Format`.
22+
- Modify `spec.json` to specify the configuration of this new format.
23+
- Update `S3FormatConfigs` to be able to construct a config for this new format.
24+
- Create a new package under `io.airbyte.integrations.destination.gcs`.
25+
- Implement a new `GcsWriter`. The implementation can extend `BaseGcsWriter`.
26+
- Write an acceptance test for the new output format. The test can extend `GcsDestinationAcceptanceTest`.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
plugins {
2+
id 'application'
3+
id 'airbyte-docker'
4+
id 'airbyte-integration-test-java'
5+
}
6+
7+
application {
8+
mainClass = 'io.airbyte.integrations.destination.gcs.GcsDestination'
9+
}
10+
11+
dependencies {
12+
implementation project(':airbyte-config:models')
13+
implementation project(':airbyte-protocol:models')
14+
implementation project(':airbyte-integrations:bases:base-java')
15+
implementation project(':airbyte-integrations:connectors:destination-jdbc')
16+
implementation project(':airbyte-integrations:connectors:destination-s3')
17+
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
18+
19+
implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.14')
20+
implementation 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.1'
21+
22+
// csv
23+
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
24+
implementation 'org.apache.commons:commons-csv:1.4'
25+
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
26+
27+
// parquet
28+
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
29+
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
30+
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
31+
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
32+
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
33+
34+
testImplementation 'org.apache.commons:commons-lang3:3.11'
35+
36+
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
37+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs')
38+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"gcs_bucket_name": "<bucket-name>",
3+
"gcs_bucket_path": "integration-test",
4+
"gcs_bucket_region": "<region>",
5+
"credential": {
6+
"credential_type": "HMAC_KEY",
7+
"hmac_key_access_id": "<access-id>",
8+
"hmac_key_secret": "<secret>"
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.destination.gcs;
26+
27+
import com.amazonaws.services.s3.AmazonS3;
28+
import io.airbyte.commons.json.Jsons;
29+
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
30+
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
31+
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
32+
import io.airbyte.integrations.destination.s3.writer.S3Writer;
33+
import io.airbyte.protocol.models.AirbyteMessage;
34+
import io.airbyte.protocol.models.AirbyteMessage.Type;
35+
import io.airbyte.protocol.models.AirbyteRecordMessage;
36+
import io.airbyte.protocol.models.AirbyteStream;
37+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
38+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
39+
import java.sql.Timestamp;
40+
import java.util.HashMap;
41+
import java.util.Map;
42+
import java.util.UUID;
43+
import java.util.function.Consumer;
44+
45+
public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer {
46+
47+
private final GcsDestinationConfig gcsDestinationConfig;
48+
private final ConfiguredAirbyteCatalog configuredCatalog;
49+
private final GcsWriterFactory writerFactory;
50+
private final Consumer<AirbyteMessage> outputRecordCollector;
51+
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;
52+
53+
private AirbyteMessage lastStateMessage = null;
54+
55+
public GcsConsumer(GcsDestinationConfig gcsDestinationConfig,
56+
ConfiguredAirbyteCatalog configuredCatalog,
57+
GcsWriterFactory writerFactory,
58+
Consumer<AirbyteMessage> outputRecordCollector) {
59+
this.gcsDestinationConfig = gcsDestinationConfig;
60+
this.configuredCatalog = configuredCatalog;
61+
this.writerFactory = writerFactory;
62+
this.outputRecordCollector = outputRecordCollector;
63+
this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size());
64+
}
65+
66+
@Override
67+
protected void startTracked() throws Exception {
68+
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);
69+
70+
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
71+
72+
for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
73+
S3Writer writer = writerFactory
74+
.create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp);
75+
writer.initialize();
76+
77+
AirbyteStream stream = configuredStream.getStream();
78+
AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair
79+
.fromAirbyteSteam(stream);
80+
streamNameAndNamespaceToWriters.put(streamNamePair, writer);
81+
}
82+
}
83+
84+
@Override
85+
protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception {
86+
if (airbyteMessage.getType() == Type.STATE) {
87+
this.lastStateMessage = airbyteMessage;
88+
return;
89+
} else if (airbyteMessage.getType() != Type.RECORD) {
90+
return;
91+
}
92+
93+
AirbyteRecordMessage recordMessage = airbyteMessage.getRecord();
94+
AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair
95+
.fromRecordMessage(recordMessage);
96+
97+
if (!streamNameAndNamespaceToWriters.containsKey(pair)) {
98+
throw new IllegalArgumentException(
99+
String.format(
100+
"Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
101+
Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage)));
102+
}
103+
104+
UUID id = UUID.randomUUID();
105+
streamNameAndNamespaceToWriters.get(pair).write(id, recordMessage);
106+
}
107+
108+
@Override
109+
protected void close(boolean hasFailed) throws Exception {
110+
for (S3Writer handler : streamNameAndNamespaceToWriters.values()) {
111+
handler.close(hasFailed);
112+
}
113+
// Gcs stream uploader is all or nothing if a failure happens in the destination.
114+
if (!hasFailed) {
115+
outputRecordCollector.accept(lastStateMessage);
116+
}
117+
}
118+
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.destination.gcs;
26+
27+
import com.amazonaws.services.s3.AmazonS3;
28+
import com.fasterxml.jackson.databind.JsonNode;
29+
import io.airbyte.integrations.BaseConnector;
30+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
31+
import io.airbyte.integrations.base.Destination;
32+
import io.airbyte.integrations.base.IntegrationRunner;
33+
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
34+
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory;
35+
import io.airbyte.protocol.models.AirbyteConnectionStatus;
36+
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
37+
import io.airbyte.protocol.models.AirbyteMessage;
38+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
39+
import java.util.function.Consumer;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
public class GcsDestination extends BaseConnector implements Destination {
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class);
46+
47+
public static void main(String[] args) throws Exception {
48+
new IntegrationRunner(new GcsDestination()).run(args);
49+
}
50+
51+
@Override
52+
public AirbyteConnectionStatus check(JsonNode config) {
53+
try {
54+
GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
55+
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
56+
s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content");
57+
s3Client.deleteObject(destinationConfig.getBucketName(), "test");
58+
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
59+
} catch (Exception e) {
60+
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
61+
return new AirbyteConnectionStatus()
62+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
63+
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
64+
.getMessage());
65+
}
66+
}
67+
68+
@Override
69+
public AirbyteMessageConsumer getConsumer(JsonNode config,
70+
ConfiguredAirbyteCatalog configuredCatalog,
71+
Consumer<AirbyteMessage> outputRecordCollector) {
72+
GcsWriterFactory formatterFactory = new ProductionWriterFactory();
73+
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
74+
}
75+
76+
}

0 commit comments

Comments
 (0)