5
5
package io .airbyte .integrations .destination .mssql ;
6
6
7
7
import com .fasterxml .jackson .databind .JsonNode ;
8
+ import com .fasterxml .jackson .databind .ObjectMapper ;
8
9
import com .google .common .collect .Lists ;
9
10
import io .airbyte .cdk .db .jdbc .JdbcDatabase ;
10
11
import io .airbyte .cdk .integrations .base .JavaBaseConstants ;
12
+ import io .airbyte .cdk .integrations .destination .async .model .PartialAirbyteMessage ;
11
13
import io .airbyte .cdk .integrations .destination .jdbc .SqlOperations ;
12
- import io .airbyte .cdk .integrations .destination .jdbc .SqlOperationsUtils ;
13
- import io .airbyte .protocol .models .v0 .AirbyteRecordMessage ;
14
14
import java .sql .SQLException ;
15
+ import java .sql .Timestamp ;
16
+ import java .time .Instant ;
15
17
import java .util .List ;
18
+ import java .util .Objects ;
19
+ import java .util .UUID ;
20
+ import org .slf4j .Logger ;
21
+ import org .slf4j .LoggerFactory ;
16
22
17
23
public class SqlServerOperations implements SqlOperations {
18
24
25
+ private static final Logger LOGGER = LoggerFactory .getLogger (SqlServerOperations .class );
26
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
27
+
19
28
@ Override
20
29
public void createSchemaIfNotExists (final JdbcDatabase database , final String schemaName ) throws Exception {
21
30
final String query = String .format ("IF NOT EXISTS ( SELECT * FROM sys.schemas WHERE name = '%s') EXEC('CREATE SCHEMA [%s]')" ,
@@ -37,10 +46,12 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
37
46
+ "CREATE TABLE %s.%s ( \n "
38
47
+ "%s VARCHAR(64) PRIMARY KEY,\n "
39
48
+ "%s NVARCHAR(MAX),\n " // Microsoft SQL Server specific: NVARCHAR can store Unicode meanwhile VARCHAR - not
40
- + "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n "
49
+ + "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET(),\n "
50
+ + "%s DATETIMEOFFSET(7),\n "
51
+ + "%s NVARCHAR(MAX),\n "
41
52
+ ");\n " ,
42
- schemaName , tableName , schemaName , tableName , JavaBaseConstants .COLUMN_NAME_AB_ID , JavaBaseConstants .COLUMN_NAME_DATA ,
43
- JavaBaseConstants .COLUMN_NAME_EMITTED_AT );
53
+ schemaName , tableName , schemaName , tableName , JavaBaseConstants .COLUMN_NAME_AB_RAW_ID , JavaBaseConstants .COLUMN_NAME_DATA ,
54
+ JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT , JavaBaseConstants . COLUMN_NAME_AB_LOADED_AT , JavaBaseConstants . COLUMN_NAME_AB_META );
44
55
}
45
56
46
57
@ Override
@@ -60,30 +71,60 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem
60
71
61
72
@ Override
62
73
public void insertRecords (final JdbcDatabase database ,
63
- final List <AirbyteRecordMessage > records ,
74
+ final List <PartialAirbyteMessage > records ,
64
75
final String schemaName ,
65
76
final String tempTableName )
66
77
throws SQLException {
67
78
// MSSQL has a limitation of 2100 parameters used in a query
68
79
// Airbyte inserts data with 3 columns (raw table) this limits to 700 records.
69
80
// Limited the variable to 500 records to
70
- final int MAX_BATCH_SIZE = 500 ;
81
+ final int MAX_BATCH_SIZE = 400 ;
71
82
final String insertQueryComponent = String .format (
72
- "INSERT INTO %s.%s (%s, %s, %s) VALUES\n " ,
83
+ "INSERT INTO %s.%s (%s, %s, %s, %s, %s ) VALUES\n " ,
73
84
schemaName ,
74
85
tempTableName ,
75
- JavaBaseConstants .COLUMN_NAME_AB_ID ,
86
+ JavaBaseConstants .COLUMN_NAME_AB_RAW_ID ,
76
87
JavaBaseConstants .COLUMN_NAME_DATA ,
77
- JavaBaseConstants .COLUMN_NAME_EMITTED_AT );
78
- final String recordQueryComponent = "(?, ?, ?), \n " ;
79
- final List < List < AirbyteRecordMessage >> batches = Lists . partition ( records , MAX_BATCH_SIZE );
80
- batches . forEach ( record -> {
81
- try {
82
- SqlOperationsUtils . insertRawRecordsInSingleQuery ( insertQueryComponent , recordQueryComponent , database , record );
83
- } catch ( final SQLException e ) {
84
- e . printStackTrace () ;
88
+ JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ,
89
+ JavaBaseConstants . COLUMN_NAME_AB_LOADED_AT ,
90
+ JavaBaseConstants . COLUMN_NAME_AB_META );
91
+ final String recordQueryComponent = "(?, ?, ?, ?, ?), \n " ;
92
+ final List < List < PartialAirbyteMessage >> batches = Lists . partition ( records , MAX_BATCH_SIZE );
93
+ for ( List < PartialAirbyteMessage > batch : batches ) {
94
+ if ( batch . isEmpty () ) {
95
+ continue ;
85
96
}
86
- });
97
+ database .execute (connection -> {
98
+ final StringBuilder sqlStatement = new StringBuilder (insertQueryComponent );
99
+ for (PartialAirbyteMessage ignored : batch ) {
100
+ sqlStatement .append (recordQueryComponent );
101
+ }
102
+ final var sql = sqlStatement .substring (0 , sqlStatement .length () - 2 ) + ";" ;
103
+ try (final var statement = connection .prepareStatement (sql )) {
104
+ int i = 1 ;
105
+ for (PartialAirbyteMessage record : batch ) {
106
+ final var id = UUID .randomUUID ().toString ();
107
+ statement .setString (i ++, id );
108
+ statement .setString (i ++, record .getSerialized ());
109
+ statement .setTimestamp (i ++, Timestamp .from (Instant .ofEpochMilli (Objects .requireNonNull (record .getRecord ()).getEmittedAt ())));
110
+ statement .setTimestamp (i ++, null );
111
+ String metadata ;
112
+ if (record .getRecord ().getMeta () != null ) {
113
+ try {
114
+ metadata = OBJECT_MAPPER .writeValueAsString (record .getRecord ().getMeta ());
115
+ } catch (Exception e ) {
116
+ LOGGER .error ("Failed to serialize record metadata for record {}" , id , e );
117
+ metadata = null ;
118
+ }
119
+ } else {
120
+ metadata = null ;
121
+ }
122
+ statement .setString (i ++, metadata );
123
+ }
124
+ statement .execute ();
125
+ }
126
+ });
127
+ }
87
128
}
88
129
89
130
@ Override
0 commit comments