3
3
*/
4
4
package io.airbyte.cdk.integrations.destination.jdbc
5
5
6
- import com.google.common.annotations.VisibleForTesting
6
+ import com.fasterxml.jackson.databind.ObjectMapper
7
7
import com.google.common.collect.Iterables
8
8
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9
9
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.isDestinationV2
10
10
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
11
- import io.airbyte.commons.functional.CheckedConsumer
12
11
import java.sql.Connection
13
12
import java.sql.SQLException
14
13
import java.sql.Timestamp
@@ -17,136 +16,90 @@ import java.util.*
17
16
import java.util.function.Consumer
18
17
import java.util.function.Supplier
19
18
20
- object SqlOperationsUtils {
21
- /* *
22
- * Inserts "raw" records in a single query. The purpose of helper to abstract away
23
- * database-specific SQL syntax from this query.
24
- *
25
- * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id,
26
- * data, emitted_at)
27
- * @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?)
28
- * @param jdbcDatabase jdbc database
29
- * @param records records to write
30
- * @throws SQLException exception
31
- */
32
- @JvmStatic
33
- @Throws(SQLException ::class )
34
- fun insertRawRecordsInSingleQuery (
35
- insertQueryComponent : String? ,
36
- recordQueryComponent : String? ,
37
- jdbcDatabase : JdbcDatabase ,
38
- records : List <PartialAirbyteMessage >
39
- ) {
40
- insertRawRecordsInSingleQuery(
41
- insertQueryComponent,
42
- recordQueryComponent,
43
- jdbcDatabase,
44
- records,
45
- { UUID .randomUUID() },
46
- true
47
- )
19
+ /* *
20
+ * Inserts "raw" records in a single query. The purpose of helper to abstract away database-specific
21
+ * SQL syntax from this query.
22
+ *
23
+ * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id,
24
+ * data, emitted_at)
25
+ * @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?)
26
+ * @param jdbcDatabase jdbc database
27
+ * @param records records to write
28
+ * @throws SQLException exception
29
+ */
30
+ @Throws(SQLException ::class )
31
+ fun insertRawRecordsInSingleQuery (
32
+ insertQueryComponent : String? ,
33
+ recordQueryComponent : String? ,
34
+ jdbcDatabase : JdbcDatabase ,
35
+ records : List <PartialAirbyteMessage >,
36
+ uuidSupplier : Supplier <UUID > = Supplier { UUID .randomUUID() },
37
+ sem : Boolean = true,
38
+ partitionSize : Int = 10000,
39
+ nullSqlWideType : Int = java.sql.Types .VARCHAR
40
+ ) {
41
+ if (records.isEmpty()) {
42
+ return
48
43
}
44
+ val objectMapper = ObjectMapper ()
45
+ jdbcDatabase.execute { connection: Connection ->
49
46
50
- /* *
51
- * Inserts "raw" records in a single query. The purpose of helper to abstract away
52
- * database-specific SQL syntax from this query.
53
- *
54
- * This version does not add a semicolon at the end of the INSERT statement.
55
- *
56
- * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id,
57
- * data, emitted_at)
58
- * @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?)
59
- * @param jdbcDatabase jdbc database
60
- * @param records records to write
61
- * @throws SQLException exception
62
- */
63
- @Throws(SQLException ::class )
64
- fun insertRawRecordsInSingleQueryNoSem (
65
- insertQueryComponent : String? ,
66
- recordQueryComponent : String? ,
67
- jdbcDatabase : JdbcDatabase ,
68
- records : List <PartialAirbyteMessage >
69
- ) {
70
- insertRawRecordsInSingleQuery(
71
- insertQueryComponent,
72
- recordQueryComponent,
73
- jdbcDatabase,
74
- records,
75
- { UUID .randomUUID() },
76
- false
77
- )
78
- }
47
+ // Strategy: We want to use PreparedStatement because it handles binding values to
48
+ // the SQL query
49
+ // (e.g. handling formatting timestamps). A PreparedStatement statement is created
50
+ // by supplying the
51
+ // full SQL string at creation time. Then subsequently specifying which values are
52
+ // bound to the
53
+ // string. Thus there will be two loops below.
54
+ // 1) Loop over records to build the full string.
55
+ // 2) Loop over the records and bind the appropriate values to the string.
56
+ // We also partition the query to run on 10k records at a time, since some DBs set a
57
+ // max limit on
58
+ // how many records can be inserted at once
59
+ // TODO(sherif) this should use a smarter, destination-aware partitioning scheme
60
+ // instead of 10k by
61
+ // default
62
+ for (partition in Iterables .partition(records, partitionSize)) {
63
+ val sql = StringBuilder (insertQueryComponent)
64
+ partition.forEach(
65
+ Consumer { r: PartialAirbyteMessage ? -> sql.append(recordQueryComponent) }
66
+ )
67
+ val s = sql.toString()
68
+ val s1 = s.substring(0 , s.length - 2 ) + (if (sem) " ;" else " " )
79
69
80
- @VisibleForTesting
81
- @Throws(SQLException ::class )
82
- fun insertRawRecordsInSingleQuery (
83
- insertQueryComponent : String? ,
84
- recordQueryComponent : String? ,
85
- jdbcDatabase : JdbcDatabase ,
86
- records : List <PartialAirbyteMessage >,
87
- uuidSupplier : Supplier <UUID >,
88
- sem : Boolean
89
- ) {
90
- if (records.isEmpty()) {
91
- return
92
- }
70
+ connection.prepareStatement(s1).use { statement ->
71
+ // second loop: bind values to the SQL string.
72
+ // 1-indexed
73
+ var i = 1
74
+ for (message in partition) {
75
+ // Airbyte Raw ID
76
+ statement.setString(i++ , uuidSupplier.get().toString())
93
77
94
- jdbcDatabase.execute(
95
- CheckedConsumer { connection : Connection ->
78
+ // Message Data
79
+ statement.setString(i ++ , message.serialized)
96
80
97
- // Strategy: We want to use PreparedStatement because it handles binding values to
98
- // the SQL query
99
- // (e.g. handling formatting timestamps). A PreparedStatement statement is created
100
- // by supplying the
101
- // full SQL string at creation time. Then subsequently specifying which values are
102
- // bound to the
103
- // string. Thus there will be two loops below.
104
- // 1) Loop over records to build the full string.
105
- // 2) Loop over the records and bind the appropriate values to the string.
106
- // We also partition the query to run on 10k records at a time, since some DBs set a
107
- // max limit on
108
- // how many records can be inserted at once
109
- // TODO(sherif) this should use a smarter, destination-aware partitioning scheme
110
- // instead of 10k by
111
- // default
112
- for (partition in Iterables .partition(records, 10000 )) {
113
- val sql = StringBuilder (insertQueryComponent)
114
- partition.forEach(
115
- Consumer { r: PartialAirbyteMessage ? -> sql.append(recordQueryComponent) }
81
+ // Extracted At
82
+ statement.setTimestamp(
83
+ i++ ,
84
+ Timestamp .from(Instant .ofEpochMilli(message.record!! .emittedAt))
116
85
)
117
- val s = sql.toString()
118
- val s1 = s.substring(0 , s.length - 2 ) + (if (sem) " ;" else " " )
119
-
120
- connection.prepareStatement(s1).use { statement ->
121
- // second loop: bind values to the SQL string.
122
- // 1-indexed
123
- var i = 1
124
- for (message in partition) {
125
- // Airbyte Raw ID
126
- statement.setString(i, uuidSupplier.get().toString())
127
- i++
128
86
129
- // Message Data
130
- statement.setString(i, message.serialized)
131
- i++
132
-
133
- // Extracted At
134
- statement.setTimestamp (
135
- i,
136
- Timestamp .from( Instant .ofEpochMilli( message.record!! .emittedAt) )
87
+ if (isDestinationV2) {
88
+ // Loaded At
89
+ statement.setTimestamp( i++ , null )
90
+ // Airbyte Meta
91
+ if (message.record !! .meta != null ) {
92
+ statement.setString (
93
+ i++ ,
94
+ objectMapper.writeValueAsString( message.record!! .meta )
137
95
)
138
- i++
139
-
140
- if (isDestinationV2) {
141
- // Loaded At
142
- statement.setTimestamp(i, null )
143
- i++
144
- }
96
+ } else {
97
+ statement.setNull(i++ , nullSqlWideType)
145
98
}
146
- statement.execute()
147
99
}
148
100
}
101
+ statement.execute()
149
102
}
150
- )
103
+ }
151
104
}
152
105
}
0 commit comments