Skip to content

Commit ca96b04

Browse files
authored
Destination Redshift: Adapting to Kotlin CDK (#36589)
1 parent 53bb59f commit ca96b04

File tree

12 files changed

+42
-35
lines changed

12 files changed

+42
-35
lines changed

airbyte-integrations/connectors/destination-redshift/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.25.0'
7+
cdkVersionRequired = '0.28.19'
88
features = ['db-destinations', 's3-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-redshift/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 2.3.2
8+
dockerImageTag: 2.4.0
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ private boolean isEphemeralKeysAndPurgingStagingData(final JsonNode config, fina
9595
public AirbyteConnectionStatus check(final JsonNode config) {
9696
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
9797
final EncryptionConfig encryptionConfig =
98-
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
98+
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY))
99+
: new NoEncryption();
99100
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
100101
return new AirbyteConnectionStatus()
101102
.withStatus(Status.FAILED)
@@ -220,7 +221,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
220221
final Consumer<AirbyteMessage> outputRecordCollector)
221222
throws Exception {
222223
final EncryptionConfig encryptionConfig =
223-
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
224+
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY))
225+
: new NoEncryption();
224226
final JsonNode s3Options = findS3Options(config);
225227
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
226228
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.Optional;
2929
import java.util.UUID;
3030
import java.util.stream.Collectors;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {
3335

@@ -38,6 +40,8 @@ public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implem
3840
private final ObjectMapper objectMapper;
3941
private final byte[] keyEncryptingKey;
4042

43+
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftS3StagingSqlOperations.class);
44+
4145
public RedshiftS3StagingSqlOperations(final NamingConventionTransformer nameTransformer,
4246
final AmazonS3 s3Client,
4347
final S3DestinationConfig s3Config,

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.google.common.collect.Iterables;
2121
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
2222
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
23-
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
23+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
2424
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
2525
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
2626
import io.airbyte.commons.json.Jsons;

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void execute(final Sql sql) throws Exception {
4646
// see https://github.com/airbytehq/airbyte/issues/33900
4747
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n");
4848
modifiedStatements.addAll(transaction);
49-
jdbcDatabase.executeWithinTransaction(modifiedStatements);
49+
getJdbcDatabase().executeWithinTransaction(modifiedStatements);
5050
} catch (final SQLException e) {
5151
log.error("Sql {}-{} failed", queryId, transactionId, e);
5252
throw e;

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftRawTableAirbyteMetaMigration.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class RedshiftRawTableAirbyteMetaMigration(
6464
"Executing RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} for real"
6565
)
6666
destinationHandler.execute(
67-
getRawTableMetaColumnAddDdl(stream.id.rawNamespace, stream.id.rawName)
67+
getRawTableMetaColumnAddDdl(stream.id.rawNamespace!!, stream.id.rawName!!)
6868
)
6969

7070
// Update the state. We didn't modify the table in a relevant way, so don't invalidate the

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
133133
.entrySet()
134134
.stream()
135135
.map(column -> castedField(
136-
field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())),
136+
field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())),
137137
column.getValue(),
138-
column.getKey().name(),
138+
column.getKey().getName(),
139139
useExpensiveSaferCasting))
140140
.collect(Collectors.toList());
141141
}
@@ -170,16 +170,16 @@ Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
170170
}
171171

172172
Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
173-
final Field<?> field = field(quotedName(COLUMN_NAME_DATA, column.originalName()));
173+
final Field<?> field = field(quotedName(COLUMN_NAME_DATA, column.getOriginalName()));
174174
// Just checks if data is not null but casted data is null. This also accounts for conditional
175175
// casting result of array and struct.
176176
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
177177
// supports regex functions.
178178
return field(CASE_STATEMENT_SQL_TEMPLATE,
179-
field.isNotNull().and(castedField(field, type, column.name(), true).isNull()),
179+
field.isNotNull().and(castedField(field, type, column.getName(), true).isNull()),
180180
function("ARRAY", getSuperType(),
181181
function("JSON_PARSE", getSuperType(), val(
182-
"{\"field\": \"" + column.name() + "\", "
182+
"{\"field\": \"" + column.getName() + "\", "
183183
+ "\"change\": \"" + Change.NULLED.value() + "\", "
184184
+ "\"reason\": \"" + Reason.DESTINATION_TYPECAST_ERROR + "\"}"))),
185185
field("ARRAY()"));
@@ -219,12 +219,12 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
219219
// literally identical to postgres's getRowNumber implementation, changes here probably should
220220
// be reflected there
221221
final List<Field<?>> primaryKeyFields =
222-
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
222+
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.getName()))).collect(Collectors.toList())
223223
: new ArrayList<>();
224224
final List<Field<?>> orderedFields = new ArrayList<>();
225225
// We can still use Jooq's field to get the quoted name with raw sql templating.
226226
// jooq's .desc returns SortField<?> instead of Field<?> and NULLS LAST doesn't work with it
227-
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.name())))));
227+
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.getName())))));
228228
orderedFields.add(field("{0} desc", quotedName(COLUMN_NAME_AB_EXTRACTED_AT)));
229229
return rowNumber()
230230
.over()
@@ -235,7 +235,7 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
235235
@Override
236236
protected Condition cdcDeletedAtNotNullCondition() {
237237
return field(name(COLUMN_NAME_AB_LOADED_AT)).isNotNull()
238-
.and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, cdcDeletedAtColumn.name())))
238+
.and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, getCdcDeletedAtColumn().getName())))
239239
.ne("null"));
240240
}
241241

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ public Pair<JsonNode, AirbyteRecordMessageMeta> transform(final StreamDescriptor
9191
final String namespace =
9292
(streamDescriptor.getNamespace() != null && !streamDescriptor.getNamespace().isEmpty()) ? streamDescriptor.getNamespace() : defaultNamespace;
9393
final StreamConfig streamConfig = parsedCatalog.getStream(namespace, streamDescriptor.getName());
94-
final Optional<String> cursorField = streamConfig.cursor().map(ColumnId::originalName);
94+
final Optional<String> cursorField = streamConfig.getCursor().map(ColumnId::getOriginalName);
9595
// convert List<ColumnId> to Set<ColumnId> for faster lookup
96-
final Set<String> primaryKeys = streamConfig.primaryKey().stream().map(ColumnId::originalName).collect(Collectors.toSet());
97-
final DestinationSyncMode syncMode = streamConfig.destinationSyncMode();
96+
final Set<String> primaryKeys = streamConfig.getPrimaryKey().stream().map(ColumnId::getOriginalName).collect(Collectors.toSet());
97+
final DestinationSyncMode syncMode = streamConfig.getDestinationSyncMode();
9898
final TransformationInfo transformationInfo = transformNodes(jsonNode, DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K);
9999
final int originalBytes = transformationInfo.originalBytes;
100100
final int transformedBytes = transformationInfo.originalBytes - transformationInfo.removedBytes;

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ public void testRawTableMetaMigration_append() throws Exception {
6767
.withSyncMode(SyncMode.FULL_REFRESH)
6868
.withDestinationSyncMode(DestinationSyncMode.APPEND)
6969
.withStream(new AirbyteStream()
70-
.withNamespace(streamNamespace)
71-
.withName(streamName)
72-
.withJsonSchema(SCHEMA))));
70+
.withNamespace(getStreamNamespace())
71+
.withName(getStreamName())
72+
.withJsonSchema(getSchema()))));
7373

7474
// First sync without _airbyte_meta
7575
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages_before_meta.jsonl");
@@ -92,9 +92,9 @@ public void testRawTableMetaMigration_incrementalDedupe() throws Exception {
9292
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
9393
.withPrimaryKey(List.of(List.of("id1"), List.of("id2")))
9494
.withStream(new AirbyteStream()
95-
.withNamespace(streamNamespace)
96-
.withName(streamName)
97-
.withJsonSchema(SCHEMA))));
95+
.withNamespace(getStreamNamespace())
96+
.withName(getStreamName())
97+
.withJsonSchema(getSchema()))));
9898

9999
// First sync without _airbyte_meta
100100
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages_before_meta.jsonl");
@@ -145,16 +145,16 @@ public void testRawTableLoadWithSuperVarcharLimitation() throws Exception {
145145
.withSyncMode(SyncMode.FULL_REFRESH)
146146
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
147147
.withStream(new AirbyteStream()
148-
.withNamespace(streamNamespace)
149-
.withName(streamName)
150-
.withJsonSchema(SCHEMA))));
148+
.withNamespace(getStreamNamespace())
149+
.withName(getStreamName())
150+
.withJsonSchema(getSchema()))));
151151
final AirbyteMessage message1 = Jsons.deserialize(record1, AirbyteMessage.class);
152-
message1.getRecord().setNamespace(streamNamespace);
153-
message1.getRecord().setStream(streamName);
152+
message1.getRecord().setNamespace(getStreamNamespace());
153+
message1.getRecord().setStream(getStreamName());
154154
((ObjectNode) message1.getRecord().getData()).put("name", largeString1);
155155
final AirbyteMessage message2 = Jsons.deserialize(record2, AirbyteMessage.class);
156-
message2.getRecord().setNamespace(streamNamespace);
157-
message2.getRecord().setStream(streamName);
156+
message2.getRecord().setNamespace(getStreamNamespace());
157+
message2.getRecord().setStream(getStreamName());
158158
((ObjectNode) message2.getRecord().getData()).put("name", largeString2);
159159

160160
// message1 should be preserved which is just on limit, message2 should be nulled.

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected DSLContext getDslContext() {
152152

153153
@Override
154154
protected DestinationHandler<RedshiftState> getDestinationHandler() {
155-
return new RedshiftDestinationHandler(databaseName, database, namespace);
155+
return new RedshiftDestinationHandler(databaseName, database, getNamespace());
156156
}
157157

158158
@Override
@@ -178,9 +178,9 @@ protected Field<?> toJsonValue(final String valueAsString) {
178178
@Override
179179
@Test
180180
public void testCreateTableIncremental() throws Exception {
181-
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
182-
destinationHandler.execute(sql);
183-
List<DestinationInitialStatus<RedshiftState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
181+
final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false);
182+
getDestinationHandler().execute(sql);
183+
List<DestinationInitialStatus<RedshiftState>> initialStatuses = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream()));
184184
assertEquals(1, initialStatuses.size());
185185
final DestinationInitialStatus<RedshiftState> initialStatus = initialStatuses.getFirst();
186186
assertTrue(initialStatus.isFinalTablePresent());

docs/integrations/destinations/redshift.md

+1
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
236236

237237
| Version | Date | Pull Request | Subject |
238238
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
239+
| 2.4.0 | 2024-03-21 | [\#36589](https://github.com/airbytehq/airbyte/pull/36589) | Adapt to Kotlin cdk 0.28.19 |
239240
| 2.3.2 | 2024-03-21 | [\#36374](https://github.com/airbytehq/airbyte/pull/36374) | Supress Jooq DataAccessException error message in logs |
240241
| 2.3.1 | 2024-03-18 | [\#36255](https://github.com/airbytehq/airbyte/pull/36255) | Mark as Certified-GA |
241242
| 2.3.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | CDK 0.25.0; Record nulling for VARCHAR > 64K & record > 16MB (super limit) |

0 commit comments

Comments
 (0)