-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Redshift Destination Apply buffering strategy #12601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2d805f0
dcdf673
d058c9c
931d891
756d43b
3fc748c
722e173
56b3910
d9af45e
5e3f279
877917f
162d983
c0f1aba
57eab3a
e657f9d
1f661a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
* {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to | ||
* an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for | ||
* production workloads, but does require users to set up an S3 bucket and pass in additional | ||
* credentials. See {@link RedshiftCopyS3Destination} for more detail. This class inspect the given | ||
* credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given | ||
* arguments to determine which strategy to use. | ||
*/ | ||
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> { | ||
|
@@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) { | |
public static Map<DestinationType, Destination> getTypeToDestination() { | ||
return Map.of( | ||
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER), | ||
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
deleted RedshiftCopyS3Destination |
||
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftStagingS3Destination(RedshiftDataTmpTableMode.SUPER)); | ||
} | ||
|
||
public static DestinationType determineUploadMode(final JsonNode config) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.redshift; | ||
|
||
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS; | ||
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcDatabase; | ||
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.integrations.base.sentry.AirbyteSentry; | ||
import io.airbyte.integrations.destination.NamingConventionTransformer; | ||
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; | ||
import io.airbyte.integrations.destination.record_buffer.FileBuffer; | ||
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode; | ||
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations; | ||
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; | ||
import io.airbyte.integrations.destination.s3.S3Destination; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.integrations.destination.s3.S3StorageOperations; | ||
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer; | ||
import io.airbyte.integrations.destination.staging.StagingConsumerFactory; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import java.util.Map; | ||
import java.util.function.Consumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class); | ||
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode; | ||
|
||
public RedshiftStagingS3Destination(RedshiftDataTmpTableMode redshiftDataTmpTableMode) { | ||
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode)); | ||
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode; | ||
} | ||
|
||
@Override | ||
public AirbyteConnectionStatus check(final JsonNode config) { | ||
final S3DestinationConfig s3Config = getS3DestinationConfig(config); | ||
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, ""); | ||
|
||
final NamingConventionTransformer nameTransformer = getNamingResolver(); | ||
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations = | ||
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode); | ||
try (final JdbcDatabase database = getDatabase(config)) { | ||
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText()); | ||
AirbyteSentry.executeWithTracing("CreateAndDropTable", | ||
() -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations)); | ||
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); | ||
} catch (final Exception e) { | ||
LOGGER.error("Exception while checking connection: ", e); | ||
return new AirbyteConnectionStatus() | ||
.withStatus(AirbyteConnectionStatus.Status.FAILED) | ||
.withMessage("Could not connect with provided configuration. \n" + e.getMessage()); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
protected JdbcDatabase getDatabase(final JsonNode config) { | ||
return getJdbcDatabase(config); | ||
} | ||
|
||
@Override | ||
protected NamingConventionTransformer getNamingResolver() { | ||
return new RedshiftSQLNameTransformer(); | ||
} | ||
|
||
@Override | ||
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) { | ||
return SSL_JDBC_PARAMETERS; | ||
} | ||
|
||
// this is a no op since we override getDatabase. | ||
@Override | ||
public JsonNode toJdbcConfig(JsonNode config) { | ||
return Jsons.emptyObject(); | ||
} | ||
|
||
@Override | ||
public AirbyteMessageConsumer getConsumer(final JsonNode config, | ||
final ConfiguredAirbyteCatalog catalog, | ||
final Consumer<AirbyteMessage> outputRecordCollector) { | ||
final S3DestinationConfig s3Config = getS3DestinationConfig(config); | ||
return new StagingConsumerFactory().create( | ||
outputRecordCollector, | ||
getDatabase(config), | ||
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode), | ||
getNamingResolver(), | ||
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), | ||
config, | ||
catalog, | ||
isPurgeStagingData(config)); | ||
} | ||
|
||
private boolean isPurgeStagingData(final JsonNode config) { | ||
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean(); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary but maybe shift this part
writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet())
in a separate method with name which describe what we do hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description of this interface is located in the interface by itsself, please see
@suhomud