4
4
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
5
5
6
6
import com.fasterxml.jackson.databind.JsonNode
7
+ import com.fasterxml.jackson.databind.node.ObjectNode
7
8
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8
9
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9
10
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
@@ -23,16 +24,16 @@ import java.time.temporal.ChronoUnit
23
24
import java.util.*
24
25
import java.util.concurrent.CompletableFuture
25
26
import java.util.concurrent.CompletionStage
26
- import java.util.function.Function
27
27
import java.util.function.Predicate
28
- import java.util.stream.Collectors
29
- import kotlin.collections.LinkedHashMap
28
+ import java.util.stream.Collectors.toMap
30
29
import lombok.extern.slf4j.Slf4j
31
30
import org.jooq.Condition
32
31
import org.jooq.DSLContext
33
32
import org.jooq.SQLDialect
34
33
import org.jooq.conf.ParamType
35
34
import org.jooq.impl.DSL
35
+ import org.jooq.impl.DSL.field
36
+ import org.jooq.impl.DSL.quotedName
36
37
import org.jooq.impl.SQLDataType
37
38
import org.slf4j.Logger
38
39
import org.slf4j.LoggerFactory
@@ -207,60 +208,73 @@ abstract class JdbcDestinationHandler<DestinationState>(
207
208
@get:Throws(SQLException ::class )
208
209
protected val allDestinationStates: Map <AirbyteStreamNameNamespacePair , DestinationState >
209
210
get() {
211
+
210
212
// Guarantee the table exists.
211
213
jdbcDatabase.execute(
212
214
dslContext
213
215
.createTableIfNotExists(
214
- DSL .quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME )
215
- )
216
- .column(
217
- DSL .quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME ),
218
- SQLDataType .VARCHAR
216
+ quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME )
219
217
)
218
+ .column(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME ), SQLDataType .VARCHAR )
220
219
.column(
221
- DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE ),
220
+ quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE ),
222
221
SQLDataType .VARCHAR
223
222
) // Just use a string type, even if the destination has a json type.
224
223
// We're never going to query this column in a fancy way - all our processing
225
224
// can happen
226
225
// client-side.
227
226
.column(
228
- DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE ),
227
+ quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE ),
229
228
SQLDataType .VARCHAR
230
229
) // Add an updated_at field. We don't actually need it yet, but it can't hurt!
231
230
.column(
232
- DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT ),
231
+ quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT ),
233
232
SQLDataType .TIMESTAMPWITHTIMEZONE
234
233
)
235
234
.getSQL(ParamType .INLINED )
236
235
)
236
+
237
237
// Fetch all records from it. We _could_ filter down to just our streams... but meh.
238
238
// This is small
239
239
// data.
240
240
return jdbcDatabase
241
241
.queryJsons(
242
242
dslContext
243
243
.select(
244
- DSL . field(DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME )),
245
- DSL . field(DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE )),
246
- DSL . field(DSL . quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE ))
244
+ field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME )),
245
+ field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE )),
246
+ field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE ))
247
247
)
248
- .from(DSL . quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME ))
249
- .sql
248
+ .from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME ))
249
+ .getSQL()
250
250
)
251
251
.stream()
252
+ .peek { recordJson: JsonNode ->
253
+ // Forcibly downcase all key names.
254
+ // This is to handle any destinations that upcase the column names.
255
+ // For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
256
+ val record = recordJson as ObjectNode
257
+ record.fieldNames().forEachRemaining { fieldName: String ->
258
+ record.set<JsonNode >(
259
+ fieldName.lowercase(Locale .getDefault()),
260
+ record[fieldName]
261
+ )
262
+ }
263
+ }
252
264
.collect(
253
- Collectors .toMap(
254
- Function { record: JsonNode ->
255
- val nameNode = record[DESTINATION_STATE_TABLE_COLUMN_NAME ]
256
- val namespaceNode = record[DESTINATION_STATE_TABLE_COLUMN_NAMESPACE ]
265
+ toMap(
266
+ { record ->
267
+ val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME )
268
+ val namespaceNode: JsonNode =
269
+ record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE )
257
270
AirbyteStreamNameNamespacePair (
258
- nameNode? .asText(),
259
- namespaceNode? .asText()
271
+ if ( nameNode != null ) nameNode .asText() else null ,
272
+ if ( namespaceNode != null ) namespaceNode .asText() else null
260
273
)
261
274
},
262
- Function { record: JsonNode ->
263
- val stateNode = record[DESTINATION_STATE_TABLE_COLUMN_STATE ]
275
+ { record ->
276
+ val stateNode: JsonNode =
277
+ record.get(DESTINATION_STATE_TABLE_COLUMN_STATE )
264
278
val state =
265
279
if (stateNode != null ) Jsons .deserialize(stateNode.asText())
266
280
else Jsons .emptyObject()
0 commit comments