-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🎉 New Destination: Google Cloud Storage #4784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
cd77654
Adding Google Cloud Storage as destination
shmf 8b556cc
Removed few comments and amended the version
MaxwellJK e952014
Added documentation in docs/integrations/destinations/gcs.md
MaxwellJK 8c25393
Amended gcs.md with the right pull id
MaxwellJK c6005d6
Implemented all the fixes requested by tuliren as per https://github.…
MaxwellJK 22d1740
Renaming all the files
MaxwellJK b889e52
Branch alligned to S3 0.1.7 (with Avro and Jsonl). Removed redundant …
MaxwellJK d44f6fe
Merge branch 'master' into destination-gcs
MaxwellJK 687689c
Removed some additional duplicates between GCS and S3
MaxwellJK dfa0fc3
Merge branch 'destination-gcs' of https://github.com/MaxwellJK/airbyt…
MaxwellJK 94241a3
Revert changes in the root files
tuliren ba22206
Revert jdbc files
tuliren 9ed20bb
Fix package names
tuliren 26a2a06
Refactor gcs config
tuliren 1bdc707
Format code
tuliren cb94d98
Fix gcs connection
tuliren 67c56bc
Format code
tuliren 107e0a5
Add acceptance tests
tuliren 5a7e3e9
Fix parquet acceptance test
tuliren 0e3ac06
Add ci credentials
tuliren f0324c3
Register the connector and update documentations
tuliren 0cff8ad
Fix typo
tuliren b922b08
Format code
tuliren 2519bce
Add unit test
tuliren 8f6c096
Add comments
tuliren cc27e5d
Update readme
tuliren File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
3 changes: 3 additions & 0 deletions
3
airbyte-integrations/connectors/destination-gcs/.dockerignore
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
* | ||
!Dockerfile | ||
!build |
11 changes: 11 additions & 0 deletions
11
airbyte-integrations/connectors/destination-gcs/Dockerfile
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
FROM airbyte/integration-base-java:dev | ||
|
||
WORKDIR /airbyte | ||
ENV APPLICATION destination-gcs | ||
|
||
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar | ||
|
||
RUN tar xf ${APPLICATION}.tar --strip-components=1 | ||
|
||
LABEL io.airbyte.version=0.1.0 | ||
LABEL io.airbyte.name=airbyte/destination-gcs |
39 changes: 39 additions & 0 deletions
39
airbyte-integrations/connectors/destination-gcs/build.gradle
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
plugins { | ||
id 'application' | ||
id 'airbyte-docker' | ||
id 'airbyte-integration-test-java' | ||
} | ||
|
||
application { | ||
mainClass = 'io.airbyte.integrations.destination.gcs.GcsDestination' | ||
} | ||
|
||
dependencies { | ||
implementation project(':airbyte-config:models') | ||
implementation project(':airbyte-protocol:models') | ||
implementation project(':airbyte-integrations:bases:base-java') | ||
implementation project(':airbyte-integrations:connectors:destination-jdbc') | ||
implementation project(':airbyte-integrations:connectors:destination-s3') | ||
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) | ||
|
||
implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.14') | ||
implementation 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.1' | ||
// implementation 'com.amazonaws:aws-java-sdk-s3' | ||
|
||
// csv | ||
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978' | ||
implementation 'org.apache.commons:commons-csv:1.4' | ||
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' | ||
|
||
// parquet | ||
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' | ||
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' | ||
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' | ||
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' | ||
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' | ||
|
||
testImplementation 'org.apache.commons:commons-lang3:3.11' | ||
|
||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') | ||
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs') | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# Destination Google Cloud Storage (GCS) | ||
|
||
In order to test the GCS destination, you need a Google Cloud Platform account. | ||
|
||
You need to generate the HMAC Keys for a service account then use them as keys to connect. | ||
Obviously you also need to provide the right grants to the user in order to allow it to create and delete buckets and files in GCS. | ||
119 changes: 119 additions & 0 deletions
119
...rs/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.gcs; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; | ||
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; | ||
import io.airbyte.integrations.destination.s3.writer.S3Writer; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.AirbyteMessage.Type; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import java.sql.Timestamp; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.function.Consumer; | ||
|
||
public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer { | ||
|
||
private final GcsDestinationConfig gcsDestinationConfig; | ||
private final ConfiguredAirbyteCatalog configuredCatalog; | ||
private final GcsWriterFactory writerFactory; | ||
private final Consumer<AirbyteMessage> outputRecordCollector; | ||
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters; | ||
|
||
private AirbyteMessage lastStateMessage = null; | ||
|
||
public GcsConsumer(GcsDestinationConfig gcsDestinationConfig, | ||
ConfiguredAirbyteCatalog configuredCatalog, | ||
GcsWriterFactory writerFactory, | ||
Consumer<AirbyteMessage> outputRecordCollector) { | ||
this.gcsDestinationConfig = gcsDestinationConfig; | ||
this.configuredCatalog = configuredCatalog; | ||
this.writerFactory = writerFactory; | ||
this.outputRecordCollector = outputRecordCollector; | ||
this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); | ||
} | ||
|
||
@Override | ||
protected void startTracked() throws Exception { | ||
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); | ||
|
||
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); | ||
|
||
for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { | ||
S3Writer writer = writerFactory | ||
.create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp); | ||
writer.initialize(); | ||
|
||
AirbyteStream stream = configuredStream.getStream(); | ||
AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair | ||
.fromAirbyteSteam(stream); | ||
streamNameAndNamespaceToWriters.put(streamNamePair, writer); | ||
} | ||
} | ||
|
||
@Override | ||
protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { | ||
if (airbyteMessage.getType() == Type.STATE) { | ||
this.lastStateMessage = airbyteMessage; | ||
return; | ||
} else if (airbyteMessage.getType() != Type.RECORD) { | ||
return; | ||
} | ||
|
||
AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); | ||
AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair | ||
.fromRecordMessage(recordMessage); | ||
|
||
if (!streamNameAndNamespaceToWriters.containsKey(pair)) { | ||
throw new IllegalArgumentException( | ||
String.format( | ||
"Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", | ||
Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); | ||
} | ||
|
||
UUID id = UUID.randomUUID(); | ||
streamNameAndNamespaceToWriters.get(pair).write(id, recordMessage); | ||
} | ||
|
||
@Override | ||
protected void close(boolean hasFailed) throws Exception { | ||
for (S3Writer handler : streamNameAndNamespaceToWriters.values()) { | ||
handler.close(hasFailed); | ||
} | ||
// Gcs stream uploader is all or nothing if a failure happens in the destination. | ||
if (!hasFailed) { | ||
outputRecordCollector.accept(lastStateMessage); | ||
} | ||
} | ||
|
||
} |
90 changes: 90 additions & 0 deletions
90
...destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.gcs; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.cloud.storage.BlobId; | ||
import com.google.cloud.storage.BlobInfo; | ||
import io.airbyte.integrations.BaseConnector; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.integrations.base.IntegrationRunner; | ||
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; | ||
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory; | ||
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig; | ||
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import java.io.IOException; | ||
import java.util.function.Consumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class GcsDestination extends BaseConnector implements Destination { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class); | ||
|
||
public static void main(String[] args) throws Exception { | ||
new IntegrationRunner(new GcsDestination()).run(args); | ||
} | ||
|
||
@Override | ||
public AirbyteConnectionStatus check(JsonNode config) { | ||
try { | ||
GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config); | ||
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig); | ||
s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content"); | ||
s3Client.deleteObject(destinationConfig.getBucketName(), "test"); | ||
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); | ||
} catch (Exception e) { | ||
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage()); | ||
return new AirbyteConnectionStatus() | ||
.withStatus(AirbyteConnectionStatus.Status.FAILED) | ||
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e | ||
.getMessage()); | ||
} | ||
} | ||
|
||
@Override | ||
public AirbyteMessageConsumer getConsumer(JsonNode config, | ||
ConfiguredAirbyteCatalog configuredCatalog, | ||
Consumer<AirbyteMessage> outputRecordCollector) { | ||
GcsWriterFactory formatterFactory = new ProductionWriterFactory(); | ||
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector); | ||
} | ||
|
||
private static void attemptWriteAndDeleteGcsObject(GcsConfig gcsConfig) throws IOException { | ||
tuliren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var storage = GcsStreamCopier.getStorageClient(gcsConfig); | ||
var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/test-file"); | ||
var blobInfo = BlobInfo.newBuilder(blobId).build(); | ||
|
||
storage.create(blobInfo, "".getBytes()); | ||
storage.delete(blobId); | ||
} | ||
|
||
} |
82 changes: 82 additions & 0 deletions
82
...ation-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.gcs; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfig; | ||
import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfigs; | ||
import io.airbyte.integrations.destination.s3.S3FormatConfig; | ||
import io.airbyte.integrations.destination.s3.S3FormatConfigs; | ||
|
||
public class GcsDestinationConfig { | ||
|
||
private final String bucketName; | ||
private final String bucketPath; | ||
private final String bucketRegion; | ||
private final GcsCredentialConfig credentialConfig; | ||
private final S3FormatConfig formatConfig; | ||
|
||
public GcsDestinationConfig(String bucketName, | ||
String bucketPath, | ||
String bucketRegion, | ||
GcsCredentialConfig credentialConfig, | ||
S3FormatConfig formatConfig) { | ||
this.bucketName = bucketName; | ||
this.bucketPath = bucketPath; | ||
this.bucketRegion = bucketRegion; | ||
this.credentialConfig = credentialConfig; | ||
this.formatConfig = formatConfig; | ||
} | ||
|
||
public static GcsDestinationConfig getGcsDestinationConfig(JsonNode config) { | ||
return new GcsDestinationConfig( | ||
config.get("gcs_bucket_name").asText(), | ||
config.get("gcs_bucket_path").asText(), | ||
config.get("gcs_bucket_region").asText(), | ||
GcsCredentialConfigs.getCredentialConfig(config), | ||
S3FormatConfigs.getS3FormatConfig(config)); | ||
} | ||
|
||
public String getBucketName() { | ||
return bucketName; | ||
} | ||
|
||
public String getBucketPath() { | ||
return bucketPath; | ||
} | ||
|
||
public String getBucketRegion() { | ||
return bucketRegion; | ||
} | ||
|
||
public GcsCredentialConfig getCredentialConfig() { | ||
return credentialConfig; | ||
} | ||
|
||
public S3FormatConfig getFormatConfig() { | ||
return formatConfig; | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.