@@ -17,15 +17,6 @@ import io.airbyte.commons.json.Jsons
17
17
import io.airbyte.integrations.base.destination.typing_deduping.*
18
18
import io.airbyte.integrations.base.destination.typing_deduping.Struct
19
19
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
20
- import java.sql.*
21
- import java.time.Instant
22
- import java.time.OffsetDateTime
23
- import java.time.temporal.ChronoUnit
24
- import java.util.*
25
- import java.util.concurrent.CompletableFuture
26
- import java.util.concurrent.CompletionStage
27
- import java.util.function.Predicate
28
- import java.util.stream.Collectors.toMap
29
20
import lombok.extern.slf4j.Slf4j
30
21
import org.jooq.Condition
31
22
import org.jooq.DSLContext
@@ -37,6 +28,17 @@ import org.jooq.impl.DSL.quotedName
37
28
import org.jooq.impl.SQLDataType
38
29
import org.slf4j.Logger
39
30
import org.slf4j.LoggerFactory
31
+ import java.sql.*
32
+ import java.time.Instant
33
+ import java.time.OffsetDateTime
34
+ import java.time.temporal.ChronoUnit
35
+ import java.util.*
36
+ import java.util.concurrent.CompletableFuture
37
+ import java.util.concurrent.CompletionStage
38
+ import java.util.function.Predicate
39
+ import java.util.stream.Collectors.toMap
40
+ import java.util.HashMap
41
+
40
42
41
43
@Slf4j
42
44
abstract class JdbcDestinationHandler <DestinationState >(
@@ -254,12 +256,19 @@ abstract class JdbcDestinationHandler<DestinationState>(
254
256
// This is to handle any destinations that upcase the column names.
255
257
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
256
258
val record = recordJson as ObjectNode
257
- record.fieldNames().forEachRemaining { fieldName: String ->
258
- record.set<JsonNode >(
259
- fieldName.lowercase(Locale .getDefault()),
260
- record[fieldName]
261
- )
259
+ val newFields: HashMap <String , JsonNode > = HashMap ()
260
+
261
+ val it = record.fieldNames()
262
+ while (it.hasNext()) {
263
+ val fieldName = it.next()
264
+ // We can't directly call record.set here, because that will raise a
265
+ // ConcurrentModificationException on the fieldnames iterator.
266
+ // Instead, build up a map of new fields and set them all at once.
267
+ newFields.put(fieldName.lowercase(Locale .getDefault()), record[fieldName])
262
268
}
269
+
270
+
271
+ record.setAll<JsonNode >(newFields)
263
272
}
264
273
.collect(
265
274
toMap(
0 commit comments