Skip to content

Commit 9221114

Browse files
authored
Destination Redshift: Add generation_id and sync_id (#40201)
1 parent 3a3e058 commit 9221114

File tree

45 files changed

+398
-163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+398
-163
lines changed

airbyte-integrations/connectors/destination-redshift/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.35.16'
7+
cdkVersionRequired = '0.38.3'
88
features = ['db-destinations', 's3-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-redshift/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 3.1.1
8+
dockerImageTag: 3.2.0
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM
3535
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure
3636
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination.Companion.DISABLE_TYPE_DEDUPE
3737
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination.Companion.RAW_SCHEMA_OVERRIDE
38-
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
3938
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption
4039
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig
4140
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig.Companion.fromJson
@@ -61,6 +60,7 @@ import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinatio
6160
import io.airbyte.integrations.destination.redshift.operation.RedshiftStagingStorageOperation
6261
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDV2Migration
6362
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler
63+
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftGenerationIdMigration
6464
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration
6565
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator
6666
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState
@@ -69,6 +69,7 @@ import io.airbyte.integrations.destination.redshift.util.RedshiftUtil
6969
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
7070
import io.airbyte.protocol.models.v0.AirbyteMessage
7171
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
72+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
7273
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
7374
import io.airbyte.protocol.models.v0.ConnectorSpecification
7475
import io.airbyte.protocol.models.v0.DestinationSyncMode
@@ -81,7 +82,6 @@ import java.util.concurrent.Executors
8182
import java.util.function.Consumer
8283
import javax.sql.DataSource
8384
import org.apache.commons.lang3.NotImplementedException
84-
import org.apache.commons.lang3.StringUtils
8585
import org.jetbrains.annotations.VisibleForTesting
8686
import org.slf4j.Logger
8787
import org.slf4j.LoggerFactory
@@ -187,7 +187,11 @@ class RedshiftDestination : BaseConnector(), Destination {
187187
isSchemaMismatch = true,
188188
isFinalTableEmpty = true,
189189
destinationState =
190-
RedshiftState(needsSoftReset = false, isAirbyteMetaPresentInRaw = true),
190+
RedshiftState(
191+
needsSoftReset = false,
192+
isAirbyteMetaPresentInRaw = true,
193+
isGenerationIdPresent = true,
194+
),
191195
),
192196
FileUploadFormat.CSV,
193197
destinationColumns,
@@ -211,7 +215,7 @@ class RedshiftDestination : BaseConnector(), Destination {
211215
)
212216
streamOperation.finalizeTable(
213217
streamConfig,
214-
StreamSyncSummary(recordsWritten = Optional.of(1)),
218+
StreamSyncSummary(recordsWritten = 1, AirbyteStreamStatus.COMPLETE),
215219
)
216220

217221
// And now that we have a table, simulate the next sync startup.
@@ -315,14 +319,6 @@ class RedshiftDestination : BaseConnector(), Destination {
315319
return RedshiftSqlGenerator(namingResolver, config)
316320
}
317321

318-
private fun getDestinationHandler(
319-
databaseName: String,
320-
database: JdbcDatabase,
321-
rawTableSchema: String
322-
): JdbcDestinationHandler<RedshiftState> {
323-
return RedshiftDestinationHandler(databaseName, database, rawTableSchema)
324-
}
325-
326322
private fun getMigrations(
327323
database: JdbcDatabase,
328324
databaseName: String,
@@ -336,6 +332,7 @@ class RedshiftDestination : BaseConnector(), Destination {
336332
sqlGenerator,
337333
),
338334
RedshiftRawTableAirbyteMetaMigration(database, databaseName),
335+
RedshiftGenerationIdMigration(database, databaseName)
339336
)
340337
}
341338

@@ -372,13 +369,7 @@ class RedshiftDestination : BaseConnector(), Destination {
372369
else NoEncryption()
373370
val s3Options = RedshiftUtil.findS3Options(config)
374371
val s3Config: S3DestinationConfig = S3DestinationConfig.getS3DestinationConfig(s3Options)
375-
376372
val defaultNamespace = config["schema"].asText()
377-
for (stream in catalog.streams) {
378-
if (StringUtils.isEmpty(stream.stream.namespace)) {
379-
stream.stream.namespace = defaultNamespace
380-
}
381-
}
382373

383374
val sqlGenerator = RedshiftSqlGenerator(namingResolver, config)
384375
val parsedCatalog: ParsedCatalog
@@ -388,10 +379,10 @@ class RedshiftDestination : BaseConnector(), Destination {
388379
val rawNamespace: String
389380
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
390381
rawNamespace = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
391-
catalogParser = CatalogParser(sqlGenerator, rawNamespace)
382+
catalogParser = CatalogParser(sqlGenerator, defaultNamespace, rawNamespace)
392383
} else {
393384
rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
394-
catalogParser = CatalogParser(sqlGenerator, rawNamespace)
385+
catalogParser = CatalogParser(sqlGenerator, defaultNamespace, rawNamespace)
395386
}
396387
val redshiftDestinationHandler =
397388
RedshiftDestinationHandler(databaseName, database, rawNamespace)
@@ -436,8 +427,7 @@ class RedshiftDestination : BaseConnector(), Destination {
436427
},
437428
onFlush = DefaultFlush(OPTIMAL_FLUSH_BATCH_SIZE, syncOperation),
438429
catalog,
439-
BufferManager(bufferMemoryLimit),
440-
Optional.ofNullable(defaultNamespace),
430+
BufferManager(defaultNamespace, bufferMemoryLimit),
441431
FlushFailure(),
442432
Executors.newFixedThreadPool(5),
443433
AirbyteMessageDeserializer(getDataTransformer(parsedCatalog, defaultNamespace)),
@@ -463,7 +453,7 @@ class RedshiftDestination : BaseConnector(), Destination {
463453
"com.amazon.redshift.ssl.NonValidatingFactory"
464454
)
465455

466-
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_META
456+
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION
467457

468458
private const val OPTIMAL_FLUSH_BATCH_SIZE: Long = 50 * 1024 * 1024
469459
private val bufferMemoryLimit: Long = (Runtime.getRuntime().maxMemory() * 0.5).toLong()

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class RedshiftStagingStorageOperation(
5151
s3StorageOperations.createBucketIfNotExists()
5252
}
5353

54-
override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
54+
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
55+
val streamId = streamConfig.id
5556
val objectPath: String = getStagingPath(streamId)
5657
log.info {
5758
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath"
@@ -201,7 +202,8 @@ class RedshiftStagingStorageOperation(
201202
${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} TIMESTAMPTZ DEFAULT GETDATE(),
202203
${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} TIMESTAMPTZ,
203204
${JavaBaseConstants.COLUMN_NAME_DATA} SUPER NOT NULL,
204-
${JavaBaseConstants.COLUMN_NAME_AB_META} SUPER NULL
205+
${JavaBaseConstants.COLUMN_NAME_AB_META} SUPER NULL,
206+
${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID} BIGINT NULL
205207
)
206208
""".trimIndent()
207209
}

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDV2Migration.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class RedshiftDV2Migration(
3434
RedshiftState(
3535
needsSoftReset = false,
3636
isAirbyteMetaPresentInRaw = false,
37+
isGenerationIdPresent = false
3738
),
3839
true,
3940
)

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,9 @@ class RedshiftDestinationHandler(
9090
for (transaction in transactions) {
9191
val transactionId = UUID.randomUUID()
9292
if (logStatements) {
93-
log.info(
94-
"Executing sql {}-{}: {}",
95-
queryId,
96-
transactionId,
97-
java.lang.String.join("\n", transaction)
98-
)
93+
log.info {
94+
"Executing sql $queryId-$transactionId: ${transaction.joinToString("\n")}"
95+
}
9996
}
10097
val startTime = System.currentTimeMillis()
10198

@@ -113,7 +110,7 @@ class RedshiftDestinationHandler(
113110
logStatements = logStatements
114111
)
115112
} catch (e: SQLException) {
116-
log.error("Sql {}-{} failed", queryId, transactionId, e)
113+
log.error(e) { "Sql $queryId-$transactionId failed" }
117114
// This is a big hammer for something that should be much more targetted, only when
118115
// executing the
119116
// DROP TABLE command.
@@ -129,12 +126,9 @@ class RedshiftDestinationHandler(
129126
throw e
130127
}
131128

132-
log.info(
133-
"Sql {}-{} completed in {} ms",
134-
queryId,
135-
transactionId,
136-
System.currentTimeMillis() - startTime
137-
)
129+
log.info {
130+
"Sql $queryId-$transactionId completed in ${System.currentTimeMillis() - startTime} ms"
131+
}
138132
}
139133
}
140134

@@ -156,7 +150,8 @@ class RedshiftDestinationHandler(
156150
return RedshiftState(
157151
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(),
158152
json.hasNonNull("isAirbyteMetaPresentInRaw") &&
159-
json["isAirbyteMetaPresentInRaw"].asBoolean()
153+
json["isAirbyteMetaPresentInRaw"].asBoolean(),
154+
json.hasNonNull("isGenerationIdPresent") && json["isGenerationIdPresent"].asBoolean(),
160155
)
161156
}
162157

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.redshift.typing_deduping
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
10+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
11+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
12+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
13+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
14+
import io.github.oshai.kotlinlogging.KotlinLogging
15+
16+
private val logger = KotlinLogging.logger {}
17+
18+
class RedshiftGenerationIdMigration(
19+
private val database: JdbcDatabase,
20+
private val databaseName: String,
21+
) : Migration<RedshiftState> {
22+
override fun migrateIfNecessary(
23+
destinationHandler: DestinationHandler<RedshiftState>,
24+
stream: StreamConfig,
25+
state: DestinationInitialStatus<RedshiftState>
26+
): Migration.MigrationResult<RedshiftState> {
27+
if (state.destinationState.isGenerationIdPresent) {
28+
logger.info {
29+
"Skipping generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because our state says it's already done"
30+
}
31+
return Migration.MigrationResult(state.destinationState, invalidateInitialState = false)
32+
}
33+
34+
if (!state.initialRawTableStatus.rawTableExists) {
35+
// The raw table doesn't exist. No migration necessary. Update the state.
36+
logger.info {
37+
"Skipping generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
38+
}
39+
return Migration.MigrationResult(
40+
state.destinationState.copy(isGenerationIdPresent = true),
41+
invalidateInitialState = false
42+
)
43+
}
44+
45+
// Add generation_id to the raw table if necessary
46+
val rawTableDefinitionQueryResult: List<JsonNode> =
47+
database.queryJsons(
48+
"""
49+
SHOW COLUMNS
50+
FROM TABLE "$databaseName"."${stream.id.rawNamespace}"."${stream.id.rawName}"
51+
LIKE '${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}'
52+
""".trimIndent()
53+
)
54+
if (rawTableDefinitionQueryResult.isNotEmpty()) {
55+
logger.info {
56+
"${stream.id.originalNamespace}.${stream.id.originalName}: Skipping generation_id migration for raw table because it already has the generation_id column"
57+
}
58+
} else {
59+
logger.info {
60+
"Migrating generation_id for table ${stream.id.rawNamespace}.${stream.id.rawName}"
61+
}
62+
// Quote for raw table columns
63+
val alterRawTableSql =
64+
"""
65+
ALTER TABLE "${stream.id.rawNamespace}"."${stream.id.rawName}"
66+
ADD COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" BIGINT;
67+
""".trimIndent()
68+
database.execute(alterRawTableSql)
69+
}
70+
71+
// Add generation_id to the final table if necessary
72+
// As a slight optimization, only do this if we previously detected that the final table
73+
// schema is wrong
74+
if (state.isFinalTablePresent && state.isSchemaMismatch) {
75+
val finalTableColumnQueryResult: List<JsonNode> =
76+
database.queryJsons(
77+
"""
78+
SHOW COLUMNS
79+
FROM TABLE "$databaseName"."${stream.id.finalNamespace}"."${stream.id.finalName}"
80+
LIKE '${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}'
81+
""".trimIndent()
82+
)
83+
if (finalTableColumnQueryResult.isNotEmpty()) {
84+
logger.info {
85+
"${stream.id.originalNamespace}.${stream.id.originalName}: Skipping generation_id migration for final table because it already has the generation_id column"
86+
}
87+
} else {
88+
logger.info {
89+
"Migrating generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName}"
90+
}
91+
database.execute(
92+
"""
93+
ALTER TABLE "${stream.id.finalNamespace}"."${stream.id.finalName}"
94+
ADD COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" BIGINT NULL;
95+
""".trimIndent()
96+
)
97+
}
98+
} else {
99+
logger.info {
100+
"${stream.id.originalNamespace}.${stream.id.originalName}: Skipping generation_id migration for final table. Final table exists: ${state.isFinalTablePresent}; final table schema is incorrect: ${state.isSchemaMismatch}"
101+
}
102+
}
103+
104+
return Migration.MigrationResult(
105+
state.destinationState.copy(isGenerationIdPresent = true),
106+
invalidateInitialState = true
107+
)
108+
}
109+
}

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
1717
import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants
1818
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange
1919
import java.sql.Timestamp
20-
import java.util.*
20+
import java.util.Optional
2121
import java.util.stream.Collectors
2222
import org.jooq.Condition
2323
import org.jooq.DataType
@@ -146,7 +146,7 @@ open class RedshiftSqlGenerator(
146146
* @param arrays
147147
* @return
148148
*/
149-
fun arrayConcatStmt(arrays: List<Field<*>?>): Field<*>? {
149+
private fun arrayConcatStmt(arrays: List<Field<*>?>): Field<*>? {
150150
if (arrays.isEmpty()) {
151151
return DSL.field("ARRAY()") // Return an empty string if the list is empty
152152
}
@@ -165,7 +165,7 @@ open class RedshiftSqlGenerator(
165165
return result
166166
}
167167

168-
fun toCastingErrorCaseStmt(column: ColumnId, type: AirbyteType): Field<*> {
168+
private fun toCastingErrorCaseStmt(column: ColumnId, type: AirbyteType): Field<*> {
169169
val field: Field<*> =
170170
DSL.field(DSL.quotedName(JavaBaseConstants.COLUMN_NAME_DATA, column.originalName))
171171
// Just checks if data is not null but casted data is null. This also accounts for
@@ -260,7 +260,14 @@ open class RedshiftSqlGenerator(
260260
"OBJECT",
261261
superType,
262262
DSL.`val`(AIRBYTE_META_COLUMN_CHANGES_KEY),
263-
airbyteMetaChangesArray
263+
airbyteMetaChangesArray,
264+
DSL.`val`(JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY),
265+
DSL.field(
266+
DSL.quotedName(
267+
JavaBaseConstants.COLUMN_NAME_AB_META,
268+
JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY
269+
)
270+
),
264271
)
265272
.`as`(JavaBaseConstants.COLUMN_NAME_AB_META)
266273
}

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftState.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ package io.airbyte.integrations.destination.redshift.typing_deduping
66

77
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
88

9-
data class RedshiftState(val needsSoftReset: Boolean, val isAirbyteMetaPresentInRaw: Boolean) :
10-
MinimumDestinationState {
9+
data class RedshiftState(
10+
val needsSoftReset: Boolean,
11+
val isAirbyteMetaPresentInRaw: Boolean,
12+
val isGenerationIdPresent: Boolean,
13+
) : MinimumDestinationState {
1114
override fun needsSoftReset(): Boolean {
1215
return needsSoftReset
1316
}

0 commit comments

Comments
 (0)