Skip to content

🐛Destination-gcs\destination-bigquery(gcs) - updated check() method to handle that user has both storage.objects.create and storage.multipartUploads.create roles #9121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 10, 2022
Merged
5 changes: 1 addition & 4 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,7 @@ class Config:
)
spec: Optional[ConnectorSpecification] = None
connectionStatus: Optional[AirbyteConnectionStatus] = None
catalog: Optional[AirbyteCatalog] = Field(
None,
description="log message: any kind of logging you want the platform to know about.",
)
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
state: Optional[AirbyteStateMessage] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.6.1",
"dockerImageTag": "0.6.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.17",
"dockerImageTag": "0.1.19",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs",
"icon": "googlecloudstorage.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: BigQuery (denormalized typed struct)
Expand Down Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.18
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google PubSub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-bigquery:0.6.1"
- dockerImage: "airbyte/destination-bigquery:0.6.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -261,7 +261,7 @@
type: "string"
description: "When running custom transformations or Basic normalization,\
\ running queries on interactive mode can hit BQ limits, choosing batch\
\ will solve those limitss."
\ will solve those limits."
title: "Transformation Query Run Type"
default: "interactive"
enum:
Expand Down Expand Up @@ -311,10 +311,12 @@
title: "Block Size (MB) for GCS multipart upload"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
keep_files_in_gcs-bucket:
Expand Down Expand Up @@ -1141,7 +1143,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.1.18"
- dockerImage: "airbyte/destination-gcs:0.1.19"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class Config:
log: Optional[AirbyteLogMessage] = Field(None, description="log message: any kind of logging you want the platform to know about.")
spec: Optional[ConnectorSpecification] = None
connectionStatus: Optional[AirbyteConnectionStatus] = None
catalog: Optional[AirbyteCatalog] = Field(None, description="log message: any kind of logging you want the platform to know about.")
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
state: Optional[AirbyteStateMessage] = Field(
None, description="schema message: the state. Must be the last message produced. The platform uses this information"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(
workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.1
LABEL io.airbyte.version=0.6.2
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/destination-gcs
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/destination-gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ As a community contributor, you can follow these steps to run integration tests.

## Airbyte Employee

- Access the `destination gcs creds` secrets on Last Pass, and put it in `sample_secrets/config.json`.
- Access the `SECRET_DESTINATION-GCS__CREDS` secrets on SecretManager, and put it in `sample_secrets/config.json`.
_ Access the `SECRET_DESTINATION-GCS_NO_MULTIPART_ROLE_CREDS` secrets on SecretManager, and put it in `sample_secrets/insufficient_roles_config.json`.
- Rename the directory from `sample_secrets` to `secrets`.

### GCP Service Account for Testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ dependencies {

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs')
integrationTestJavaImplementation project(':airbyte-workers')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"gcs_bucket_name": "<bucket-name>",
"gcs_bucket_path": "integration-test",
"gcs_bucket_region": "<region>",
"credential": {
"credential_type": "HMAC_KEY",
"hmac_key_access_id": "<access-id-for-user-with-no-multipart-role>",
"hmac_key_secret": "<secret-for-user-with-no-multipart-role>"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand All @@ -16,13 +19,22 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class);
public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, "
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list";

public static final String CHECK_ACTIONS_TMP_FILE_NAME = "test";
public static final String DUMMY_TEXT = "This is just a dummy text to write to test file";

public static void main(final String[] args) throws Exception {
new IntegrationRunner(new GcsDestination()).run(args);
Expand All @@ -31,26 +43,80 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
final GcsDestinationConfig destinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(config);
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content");
s3Client.deleteObject(destinationConfig.getBucketName(), "test");

// Test single Upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig);

// Test Multipart Upload permissions
testMultipartUpload(s3Client, destinationConfig);

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
LOGGER.error("Please make sure you account has all of these roles: " + EXPECTED_ROLES);

return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
.getMessage());
}
}

private void testSingleUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
s3Client.putObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, DUMMY_TEXT);
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
LOGGER.info("Finished checking for normal upload mode");
}

private void testMultipartUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig)
throws Exception {

LOGGER.info("Started testing if all required credentials assigned to user for Multipart upload");
final TransferManager tm = TransferManagerBuilder.standard()
.withS3Client(s3Client)
// Sets the size threshold, in bytes, for when to use multipart uploads. Uploads over this size will
// automatically use a multipart upload strategy, while uploads smaller than this threshold will use
// a single connection to upload the whole object. So we need to set it as small for testing
// connection. See javadoc for more details.
.withMultipartUploadThreshold(1024L) // set 1KB as part size
.build();

try {
// TransferManager processes all transfers asynchronously,
// so this call returns immediately.
final Upload upload = tm.upload(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, getTmpFileToUpload());
upload.waitForCompletion();
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
} finally {
tm.shutdownNow(true);
}
LOGGER.info("Finished verification for multipart upload mode");
}

private File getTmpFileToUpload() throws IOException {
final File tmpFile = File.createTempFile(CHECK_ACTIONS_TMP_FILE_NAME, ".tmp");
try (final FileWriter writer = new FileWriter(tmpFile)) {
// Text should be bigger than Threshold's size to make client use a multipart upload strategy,
// smaller than threshold will use a single connection to upload the whole object even if multipart
// upload option is ON. See {@link TransferManagerBuilder#withMultipartUploadThreshold}
// javadoc for more information.

writer.write(StringUtils.repeat(DUMMY_TEXT, 1000));
}
return tmpFile;
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final GcsWriterFactory formatterFactory = new ProductionWriterFactory();
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog,
formatterFactory, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.gcs;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
Expand All @@ -13,6 +15,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
Expand All @@ -25,6 +28,7 @@
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,7 +48,8 @@ public abstract class GcsDestinationAcceptanceTest extends DestinationAcceptance
protected static final Logger LOGGER = LoggerFactory.getLogger(GcsDestinationAcceptanceTest.class);
protected static final ObjectMapper MAPPER = MoreMappers.initMapper();

protected final String secretFilePath = "secrets/config.json";
protected static final String SECRET_FILE_PATH = "secrets/config.json";
protected static final String SECRET_FILE_PATH_INSUFFICIENT_ROLES = "secrets/insufficient_roles_config.json";
protected final S3Format outputFormat;
protected JsonNode configJson;
protected GcsDestinationConfig config;
Expand All @@ -55,7 +60,7 @@ protected GcsDestinationAcceptanceTest(final S3Format outputFormat) {
}

protected JsonNode getBaseConfigJson() {
return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath)));
return Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH)));
}

@Override
Expand Down Expand Up @@ -147,4 +152,27 @@ protected void tearDown(final TestDestinationEnv testEnv) {
}
}

/**
* Verify that when given user with no Multipart Upload Roles, that check connection returns a
* failed response. Assume that the #getInsufficientRolesFailCheckConfig() returns the service
* account has storage.objects.create permission but not storage.multipartUploads.create.
*/
@Test
public void testCheckConnectionInsufficientRoles() throws Exception {
final JsonNode baseConfigJson = Jsons.deserialize(IOs.readFile(Path.of(
SECRET_FILE_PATH_INSUFFICIENT_ROLES)));

// Set a random GCS bucket path for each integration test
final JsonNode configJson = Jsons.clone(baseConfigJson);
final String testBucketPath = String.format(
"%s_test_%s",
outputFormat.name().toLowerCase(Locale.ROOT),
RandomStringUtils.randomAlphanumeric(5));
((ObjectNode) configJson)
.put("gcs_bucket_path", testBucketPath)
.set("format", getFormatConfig());

assertEquals(Status.FAILED, runCheck(configJson).getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final ResultSet tableInfo = connection.createStatement()
.executeQuery(String.format("SHOW TABLES LIKE '%s' IN SCHEMA %s;", tableName, schema));
assertTrue(tableInfo.next());
// check that we're creating permanent tables. DBT defaults to transient tables, which have `TRANSIENT` as the value for the `kind` column.
// check that we're creating permanent tables. DBT defaults to transient tables, which have
// `TRANSIENT` as the value for the `kind` column.
assertEquals("TABLE", tableInfo.getString("kind"));
return connection.createStatement()
.executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.2 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.6.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/issues/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.5.1 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
Expand All @@ -170,8 +171,8 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/pull/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.1.11 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
| 0.1.10 | 2021-11-09 | [\#7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |
| 0.1.17 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.1.16 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. |
Expand Down