Skip to content

Commit 47f304b

Browse files
destination-redshift: add option for drop cascade (#38189)
1 parent fe8b9b7 commit 47f304b

File tree

12 files changed

+85
-10
lines changed

12 files changed

+85
-10
lines changed

airbyte-integrations/connectors/destination-redshift/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 2.6.1
8+
dockerImageTag: 2.6.2
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) {
129129

130130
@Override
131131
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
132-
return new RedshiftSqlGenerator(super.getNamingResolver());
132+
return new RedshiftSqlGenerator(super.getNamingResolver(), config);
133133
}
134134

135135
@Override

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
179179

180180
@Override
181181
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
182-
return new RedshiftSqlGenerator(getNamingResolver());
182+
return new RedshiftSqlGenerator(getNamingResolver(), config);
183183
}
184184

185185
@Override
@@ -229,7 +229,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
229229
stream.getStream().setNamespace(defaultNamespace);
230230
}
231231
}
232-
final RedshiftSqlGenerator sqlGenerator = new RedshiftSqlGenerator(getNamingResolver());
232+
233+
final RedshiftSqlGenerator sqlGenerator = new RedshiftSqlGenerator(getNamingResolver(), config);
233234
final ParsedCatalog parsedCatalog;
234235
final TyperDeduper typerDeduper;
235236
final JdbcDatabase database = getDatabase(getDataSource(config));

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/constants/RedshiftDestinationConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ private RedshiftDestinationConstants() {}
1818

1919
public static final DataType<String> SUPER_TYPE = new DefaultDataType<>(null, String.class, "super");
2020

21+
public static final String DROP_CASCADE_OPTION = "drop_cascade";
22+
2123
}

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
99
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
10+
import io.airbyte.commons.exceptions.ConfigErrorException;
1011
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
1112
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
1213
import io.airbyte.integrations.base.destination.typing_deduping.Array;
@@ -49,6 +50,12 @@ public void execute(final Sql sql) throws Exception {
4950
getJdbcDatabase().executeWithinTransaction(modifiedStatements);
5051
} catch (final SQLException e) {
5152
log.error("Sql {}-{} failed", queryId, transactionId, e);
53+
// This is a big hammer for something that should be much more targetted, only when executing the
54+
// DROP TABLE command.
55+
if (e.getMessage().contains("ERROR: cannot drop table") && e.getMessage().contains("because other objects depend on it")) {
56+
throw new ConfigErrorException(
57+
"Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter", e);
58+
}
5259
throw e;
5360
}
5461

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.jooq.impl.DSL.rowNumber;
1717
import static org.jooq.impl.DSL.val;
1818

19+
import com.fasterxml.jackson.databind.JsonNode;
1920
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
2021
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2122
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
@@ -49,8 +50,20 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
4950

5051
private static final String AIRBYTE_META_COLUMN_CHANGES_KEY = "changes";
5152

52-
public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer) {
53-
super(namingTransformer);
53+
private final boolean dropCascade;
54+
55+
private static boolean isDropCascade(JsonNode config) {
56+
final JsonNode dropCascadeNode = config.get(RedshiftDestinationConstants.DROP_CASCADE_OPTION);
57+
return dropCascadeNode != null && dropCascadeNode.asBoolean();
58+
}
59+
60+
public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer, JsonNode config) {
61+
this(namingTransformer, isDropCascade(config));
62+
}
63+
64+
public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer, boolean dropCascade) {
65+
super(namingTransformer, dropCascade);
66+
this.dropCascade = dropCascade;
5467
}
5568

5669
/**

airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json

+7
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@
253253
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
254254
"order": 11,
255255
"group": "connection"
256+
},
257+
"drop_cascade": {
258+
"type": "boolean",
259+
"default": false,
260+
"description": "Drop tables with CASCADE. WARNING! This will delete all data in all dependent objects (views, etc.). Use with caution. This option is intended for usecases which can easily rebuild the dependent objects.",
261+
"title": "Drop tables with CASCADE. (WARNING! Risk of unrecoverable data loss)",
262+
"order": 12
256263
}
257264
},
258265
"groups": [

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {
5151

5252
@Override
5353
protected SqlGenerator getSqlGenerator() {
54-
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
54+
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {
5555

5656
// Override only for tests to print formatted SQL. The actual implementation should use unformatted
5757
// to save bytes.

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66

77
import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;
88
import static io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.escapeStringLiteral;
9+
import static org.jooq.impl.DSL.createView;
10+
import static org.jooq.impl.DSL.quotedName;
11+
import static org.jooq.impl.DSL.select;
12+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
913
import static org.junit.jupiter.api.Assertions.assertEquals;
1014
import static org.junit.jupiter.api.Assertions.assertFalse;
15+
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
1116
import static org.junit.jupiter.api.Assertions.assertTrue;
1217

1318
import com.fasterxml.jackson.databind.JsonNode;
@@ -19,6 +24,7 @@
1924
import io.airbyte.cdk.db.jdbc.JdbcUtils;
2025
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2126
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
27+
import io.airbyte.commons.exceptions.ConfigErrorException;
2228
import io.airbyte.commons.json.Jsons;
2329
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
2430
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
@@ -39,6 +45,7 @@
3945
import org.jooq.DataType;
4046
import org.jooq.Field;
4147
import org.jooq.SQLDialect;
48+
import org.jooq.conf.ParamType;
4249
import org.jooq.conf.Settings;
4350
import org.jooq.impl.DSL;
4451
import org.jooq.impl.DefaultDataType;
@@ -143,7 +150,7 @@ public static void teardownRedshift() throws Exception {
143150

144151
@Override
145152
protected JdbcSqlGenerator getSqlGenerator() {
146-
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
153+
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {
147154

148155
// Override only for tests to print formatted SQL. The actual implementation should use unformatted
149156
// to save bytes.
@@ -193,4 +200,41 @@ public void testCreateTableIncremental() throws Exception {
193200
// TODO assert on table clustering, etc.
194201
}
195202

203+
/**
204+
* Verify that we correctly DROP...CASCADE the final table when cascadeDrop is enabled.
205+
*/
206+
@Test
207+
public void testCascadeDropEnabled() throws Exception {
208+
// Explicitly create a sqlgenerator with cascadeDrop=true
209+
final RedshiftSqlGenerator generator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), true);
210+
// Create a table, then create a view referencing it
211+
getDestinationHandler().execute(generator.createTable(getIncrementalAppendStream(), "", false));
212+
database.execute(createView(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), "example_view"))
213+
.as(select().from(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), getIncrementalAppendStream().getId().getFinalName())))
214+
.getSQL(ParamType.INLINED));
215+
// Create a "soft reset" table
216+
getDestinationHandler().execute(generator.createTable(getIncrementalDedupStream(), "_soft_reset", false));
217+
218+
// Overwriting the first table with the second table should succeed.
219+
assertDoesNotThrow(() -> getDestinationHandler().execute(generator.overwriteFinalTable(getIncrementalDedupStream().getId(), "_soft_reset")));
220+
}
221+
222+
@Test
223+
public void testCascadeDropDisabled() throws Exception {
224+
// Explicitly create a sqlgenerator with cascadeDrop=false
225+
final RedshiftSqlGenerator generator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false);
226+
// Create a table, then create a view referencing it
227+
getDestinationHandler().execute(generator.createTable(getIncrementalAppendStream(), "", false));
228+
database.execute(createView(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), "example_view"))
229+
.as(select().from(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), getIncrementalAppendStream().getId().getFinalName())))
230+
.getSQL(ParamType.INLINED));
231+
// Create a "soft reset" table
232+
getDestinationHandler().execute(generator.createTable(getIncrementalDedupStream(), "_soft_reset", false));
233+
234+
// Overwriting the first table with the second table should fal with a configurationError.
235+
Throwable t = assertThrowsExactly(ConfigErrorException.class,
236+
() -> getDestinationHandler().execute(generator.overwriteFinalTable(getIncrementalDedupStream().getId(), "_soft_reset")));
237+
assertTrue(t.getMessage().equals("Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter"));
238+
}
239+
196240
}

airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class RedshiftSqlGeneratorTest {
3434

3535
private static final Random RANDOM = new Random();
3636

37-
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
37+
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {
3838

3939
// Override only for tests to print formatted SQL. The actual implementation should use unformatted
4040
// to save bytes.

airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public class RedshiftSuperLimitationTransformerTest {
4343

4444
private RedshiftSuperLimitationTransformer transformer;
45-
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer());
45+
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false);
4646

4747
@BeforeEach
4848
public void setup() {

docs/integrations/destinations/redshift.md

+1
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
242242

243243
| Version | Date | Pull Request | Subject |
244244
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
245+
| 2.6.2 | 2024-05-14 | [38189](https://github.com/airbytehq/airbyte/pull/38189) | adding an option to DROP CASCADE on resets |
245246
| 2.6.1 | 2024-05-13 | [\#38126](https://github.com/airbytehq/airbyte/pull/38126) | Adapt to signature changes in `StreamConfig` |
246247
| 2.6.0 | 2024-05-08 | [\#37713](https://github.com/airbytehq/airbyte/pull/37713) | Remove option for incremental typing and deduping |
247248
| 2.5.0 | 2024-05-06 | [\#34613](https://github.com/airbytehq/airbyte/pull/34613) | Upgrade Redshift driver to work with Cluster patch 181; Adapt to CDK 0.33.0; Minor signature changes |

0 commit comments

Comments
 (0)