Skip to content

Commit 6b902f5

Browse files
authored
Destination Redshift: pull in cdk update for refreshes bugfix (#42506)
1 parent b20365a commit 6b902f5

File tree

12 files changed

+80
-20
lines changed

12 files changed

+80
-20
lines changed

airbyte-integrations/connectors/destination-redshift/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.41.4'
7+
cdkVersionRequired = '0.44.14'
88
features = ['db-destinations', 's3-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-redshift/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 3.4.1
8+
dockerImageTag: 3.4.2
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import io.airbyte.integrations.base.destination.operation.DefaultFlush
5151
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
5252
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
5353
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
54+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
5455
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
5556
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
5657
import io.airbyte.integrations.base.destination.typing_deduping.Sql
@@ -72,7 +73,6 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
7273
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
7374
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
7475
import io.airbyte.protocol.models.v0.ConnectorSpecification
75-
import io.airbyte.protocol.models.v0.DestinationSyncMode
7676
import java.sql.SQLException
7777
import java.time.Duration
7878
import java.util.Objects
@@ -145,7 +145,7 @@ class RedshiftDestination : BaseConnector(), Destination {
145145
val streamConfig =
146146
StreamConfig(
147147
id = streamId,
148-
destinationSyncMode = DestinationSyncMode.APPEND,
148+
postImportAction = ImportType.APPEND,
149149
primaryKey = listOf(),
150150
cursor = Optional.empty(),
151151
columns = linkedMapOf(),
@@ -199,6 +199,8 @@ class RedshiftDestination : BaseConnector(), Destination {
199199
isAirbyteMetaPresentInRaw = true,
200200
isGenerationIdPresent = true,
201201
),
202+
finalTableGenerationId = 1,
203+
finalTempTableGenerationId = 1,
202204
),
203205
FileUploadFormat.CSV,
204206
destinationColumns,

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class RedshiftStagingStorageOperation(
100100
return null
101101
}
102102

103-
return generation.first()[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID].asLong()
103+
return generation.first()[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID]?.asLong() ?: 0
104104
}
105105

106106
override fun writeToStage(
@@ -114,7 +114,12 @@ class RedshiftStagingStorageOperation(
114114
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath"
115115
}
116116
val filename =
117-
s3StorageOperations.uploadRecordsToBucket(data, streamId.rawNamespace, objectPath)
117+
s3StorageOperations.uploadRecordsToBucket(
118+
data,
119+
streamId.rawNamespace,
120+
objectPath,
121+
streamConfig.generationId
122+
)
118123

119124
log.info {
120125
"Starting copy to target table from stage: ${streamId.rawName}$suffix in destination from stage: $objectPath/$filename."

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ import org.jooq.SQLDialect
2323
private val log = KotlinLogging.logger {}
2424

2525
class RedshiftDestinationHandler(
26-
databaseName: String?,
26+
databaseName: String,
2727
jdbcDatabase: JdbcDatabase,
2828
rawNamespace: String
2929
) :
3030
JdbcDestinationHandler<RedshiftState>(
3131
databaseName,
3232
jdbcDatabase,
3333
rawNamespace,
34-
SQLDialect.DEFAULT
34+
SQLDialect.DEFAULT,
35+
generationHandler = RedshiftGenerationHandler(databaseName)
3536
) {
3637
override fun createNamespaces(schemas: Set<String>) {
3738
// SHOW SCHEMAS will fail with a "schema ... does not exist" error
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.redshift.typing_deduping
6+
7+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9+
import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler
10+
11+
class RedshiftGenerationHandler(private val databaseName: String) : JdbcGenerationHandler {
12+
override fun getGenerationIdInTable(
13+
database: JdbcDatabase,
14+
namespace: String,
15+
name: String
16+
): Long? {
17+
val tableExistsWithGenerationId =
18+
database.executeMetadataQuery {
19+
// Find a column named _airbyte_generation_id
20+
// in the relevant table.
21+
val resultSet =
22+
it.getColumns(
23+
databaseName,
24+
namespace,
25+
name,
26+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
27+
)
28+
// Check if there were any such columns.
29+
resultSet.next()
30+
}
31+
// The table doesn't exist, or exists but doesn't have generation id
32+
if (!tableExistsWithGenerationId) {
33+
return null
34+
}
35+
36+
// The table exists and has generation ID. Query it.
37+
val queryResult =
38+
database.queryJsons(
39+
"""
40+
SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}
41+
FROM "$namespace"."$name"
42+
LIMIT 1
43+
""".trimIndent()
44+
)
45+
return queryResult
46+
.firstOrNull()
47+
?.get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
48+
?.asLong()
49+
?: 0
50+
}
51+
}

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ import io.airbyte.commons.json.Jsons.emptyObject
1313
import io.airbyte.commons.json.Jsons.jsonNode
1414
import io.airbyte.commons.json.Jsons.serialize
1515
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
16+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
1617
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
1718
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1819
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
19-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2020
import io.airbyte.protocol.models.v0.StreamDescriptor
2121
import io.github.oshai.kotlinlogging.KotlinLogging
2222
import java.nio.charset.StandardCharsets
@@ -77,7 +77,7 @@ class RedshiftSuperLimitationTransformer(
7777
// convert List<ColumnId> to Set<ColumnId> for faster lookup
7878
val primaryKeys =
7979
streamConfig.primaryKey.stream().map(ColumnId::originalName).collect(Collectors.toSet())
80-
val syncMode = streamConfig.destinationSyncMode
80+
val syncMode = streamConfig.postImportAction
8181
val transformationInfo =
8282
transformNodes(jsonNode, DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K)
8383
val originalBytes = transformationInfo.originalBytes
@@ -97,7 +97,7 @@ class RedshiftSuperLimitationTransformer(
9797
transformedBytes
9898
)
9999
val minimalNode = constructMinimalJsonWithPks(jsonNode, primaryKeys, cursorField)
100-
if (minimalNode.isEmpty && syncMode == DestinationSyncMode.APPEND_DEDUP) {
100+
if (minimalNode.isEmpty && syncMode == ImportType.DEDUPE) {
101101
// Fail the sync if PKs are missing in DEDUPE, no point sending an empty record to
102102
// destination.
103103
throw RuntimeException(

airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingStorageOperationTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException
1818
import io.airbyte.commons.json.Jsons
1919
import io.airbyte.commons.string.Strings
2020
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
21+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
2122
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2223
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
2324
import io.airbyte.integrations.destination.redshift.operation.RedshiftStagingStorageOperation
@@ -26,7 +27,6 @@ import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlG
2627
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil
2728
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
2829
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
29-
import io.airbyte.protocol.models.v0.DestinationSyncMode
3030
import java.nio.file.Files
3131
import java.nio.file.Path
3232
import java.util.Optional
@@ -54,7 +54,7 @@ class RedshiftS3StagingStorageOperationTest {
5454
private val streamConfig =
5555
StreamConfig(
5656
streamId,
57-
DestinationSyncMode.APPEND,
57+
ImportType.APPEND,
5858
emptyList(),
5959
Optional.empty(),
6060
LinkedHashMap(),

airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
// And append the records from the second sync
88
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_generation_id": 43, "_airbyte_meta": {"sync_id": 42, "changes": []}}
99
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_generation_id": 43, "_airbyte_meta": {"sync_id": 42, "changes": []}}
10-
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z", "_airbyte_generation_id": 43, "_airbyte_meta": {"sync_id": 42, "changes": []}}}
10+
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_generation_id": 43, "_airbyte_meta": {"sync_id": 42, "changes": []}}

airbyte-integrations/connectors/destination-redshift/src/test/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolT
88
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
99
import io.airbyte.integrations.base.destination.typing_deduping.Array
1010
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
11+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
1112
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1213
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1314
import io.airbyte.integrations.base.destination.typing_deduping.Struct
1415
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer
15-
import io.airbyte.protocol.models.v0.DestinationSyncMode
1616
import java.io.IOException
1717
import java.time.Instant
1818
import java.util.Arrays
@@ -73,7 +73,7 @@ class RedshiftSqlGeneratorTest {
7373
incrementalDedupStream =
7474
StreamConfig(
7575
streamId!!,
76-
DestinationSyncMode.APPEND_DEDUP,
76+
ImportType.DEDUPE,
7777
primaryKey,
7878
Optional.of(cursor),
7979
columns,
@@ -84,7 +84,7 @@ class RedshiftSqlGeneratorTest {
8484
incrementalAppendStream =
8585
StreamConfig(
8686
streamId!!,
87-
DestinationSyncMode.APPEND,
87+
ImportType.APPEND,
8888
primaryKey,
8989
Optional.of(cursor),
9090
columns,
@@ -164,7 +164,7 @@ class RedshiftSqlGeneratorTest {
164164
redshiftSqlGenerator.updateTable(
165165
StreamConfig(
166166
streamId!!,
167-
DestinationSyncMode.APPEND_DEDUP,
167+
ImportType.DEDUPE,
168168
primaryKey,
169169
Optional.of(cursor),
170170
columns,

airbyte-integrations/connectors/destination-redshift/src/test/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ import io.airbyte.commons.resources.MoreResources.readResource
1111
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
1212
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
1313
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
14+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
1415
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
1516
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1617
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1718
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer
1819
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1920
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
20-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2121
import io.airbyte.protocol.models.v0.StreamDescriptor
2222
import java.io.IOException
2323
import java.nio.charset.StandardCharsets
@@ -53,7 +53,7 @@ class RedshiftSuperLimitationTransformerTest {
5353
val streamConfig =
5454
StreamConfig(
5555
streamId,
56-
DestinationSyncMode.APPEND_DEDUP,
56+
ImportType.DEDUPE,
5757
primaryKey,
5858
Optional.empty(),
5959
columns,

docs/integrations/destinations/redshift.md

+1
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
222222

223223
| Version | Date | Pull Request | Subject |
224224
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
225+
| 3.4.2 | 2024-08-15 | [42506](https://github.com/airbytehq/airbyte/pull/42506) | Fix bug in refreshes logic (already mitigated in platform, just fixing protocol compliance) |
225226
| 3.4.1 | 2024-08-13 | [xxx](https://github.com/airbytehq/airbyte/pull/xxx) | Simplify Redshift Options |
226227
| 3.4.0 | 2024-07-23 | [42445](https://github.com/airbytehq/airbyte/pull/42445) | Respect the `drop cascade` option on raw tables |
227228
| 3.3.1 | 2024-07-15 | [41968](https://github.com/airbytehq/airbyte/pull/41968) | Don't hang forever on empty stream list; shorten error message on INCOMPLETE stream status |

0 commit comments

Comments
 (0)