4
4
5
5
package io .airbyte .integrations .destination .iceberg ;
6
6
7
- import static io .airbyte .integrations .base .JavaBaseConstants .COLUMN_NAME_AB_ID ;
8
- import static io .airbyte .integrations .base .JavaBaseConstants .COLUMN_NAME_DATA ;
9
- import static io .airbyte .integrations .base .JavaBaseConstants .COLUMN_NAME_EMITTED_AT ;
10
7
import static org .apache .logging .log4j .util .Strings .isNotBlank ;
11
8
12
9
import io .airbyte .commons .json .Jsons ;
13
- import io .airbyte .integrations .base .AirbyteStreamNameNamespacePair ;
14
10
import io .airbyte .integrations .base .CommitOnStateAirbyteMessageConsumer ;
15
11
import io .airbyte .integrations .destination .iceberg .config .WriteConfig ;
16
12
import io .airbyte .integrations .destination .iceberg .config .catalog .IcebergCatalogConfig ;
17
13
import io .airbyte .protocol .models .AirbyteMessage ;
18
14
import io .airbyte .protocol .models .AirbyteMessage .Type ;
19
15
import io .airbyte .protocol .models .AirbyteRecordMessage ;
16
+ import io .airbyte .protocol .models .AirbyteStreamNameNamespacePair ;
20
17
import io .airbyte .protocol .models .ConfiguredAirbyteCatalog ;
21
18
import io .airbyte .protocol .models .ConfiguredAirbyteStream ;
22
19
import io .airbyte .protocol .models .DestinationSyncMode ;
31
28
import java .util .function .Consumer ;
32
29
import lombok .extern .slf4j .Slf4j ;
33
30
import org .apache .iceberg .catalog .Catalog ;
34
- import org .apache .iceberg .catalog .TableIdentifier ;
35
- import org .apache .iceberg .spark .actions .SparkActions ;
36
31
import org .apache .spark .sql .Row ;
37
32
import org .apache .spark .sql .SaveMode ;
38
- import org .apache .spark .sql .SparkSession ;
39
33
import org .apache .spark .sql .catalyst .expressions .GenericRow ;
40
- import org .apache .spark .sql .types .StringType$ ;
41
- import org .apache .spark .sql .types .StructType ;
42
- import org .apache .spark .sql .types .TimestampType$ ;
43
34
44
35
/**
45
36
* @author Leibniz on 2022/10/26.
46
37
*/
47
38
@ Slf4j
48
39
public class IcebergConsumer extends CommitOnStateAirbyteMessageConsumer {
49
40
50
- private final SparkSession spark ;
41
+ private final IcebergOperations operations ;
51
42
private final ConfiguredAirbyteCatalog catalog ;
52
43
private final IcebergCatalogConfig catalogConfig ;
53
44
54
45
private Map <AirbyteStreamNameNamespacePair , WriteConfig > writeConfigs ;
55
46
56
- private final StructType normalizationSchema ;
57
-
58
- public IcebergConsumer (SparkSession spark ,
59
- Consumer <AirbyteMessage > outputRecordCollector ,
60
- ConfiguredAirbyteCatalog catalog ,
61
- IcebergCatalogConfig catalogConfig ) {
47
+ public IcebergConsumer (IcebergOperations operations ,
48
+ Consumer <AirbyteMessage > outputRecordCollector ,
49
+ ConfiguredAirbyteCatalog catalog ,
50
+ IcebergCatalogConfig catalogConfig ) {
62
51
super (outputRecordCollector );
63
- this .spark = spark ;
52
+ this .operations = operations ;
64
53
this .catalog = catalog ;
65
54
this .catalogConfig = catalogConfig ;
66
- this .normalizationSchema = new StructType ().add (COLUMN_NAME_AB_ID , StringType$ .MODULE$ )
67
- .add (COLUMN_NAME_EMITTED_AT , TimestampType$ .MODULE$ )
68
- .add (COLUMN_NAME_DATA , StringType$ .MODULE$ );
69
55
}
70
56
71
57
/**
@@ -77,31 +63,37 @@ protected void startTracked() throws Exception {
77
63
Map <AirbyteStreamNameNamespacePair , WriteConfig > configs = new HashMap <>();
78
64
Set <String > namespaceSet = new HashSet <>();
79
65
for (final ConfiguredAirbyteStream stream : catalog .getStreams ()) {
66
+ final DestinationSyncMode syncMode = stream .getDestinationSyncMode ();
67
+ if (syncMode == null ) {
68
+ throw new IllegalStateException ("Undefined destination sync mode" );
69
+ }
70
+ final boolean isAppendMode = syncMode != DestinationSyncMode .OVERWRITE ;
71
+
80
72
final String streamName = stream .getStream ().getName ().toLowerCase ();
81
73
String namespace = (isNotBlank (stream .getStream ().getNamespace ()) ? stream .getStream ().getNamespace ()
82
- : catalogConfig .defaultOutputDatabase ()).toLowerCase ();
74
+ : catalogConfig .defaultOutputDatabase ()).toLowerCase ();
83
75
if (!namespaceSet .contains (namespace )) {
84
76
namespaceSet .add (namespace );
85
77
try {
86
- spark . sql ( "CREATE DATABASE IF NOT EXISTS " + namespace );
78
+ operations . createDatabase ( namespace );
87
79
} catch (Exception e ) {
88
80
log .warn ("Create non-existed database failed: {}" , e .getMessage (), e );
89
81
}
90
82
}
91
- final DestinationSyncMode syncMode = stream .getDestinationSyncMode ();
92
- if (syncMode == null ) {
93
- throw new IllegalStateException ("Undefined destination sync mode" );
94
- }
95
- final boolean isAppendMode = syncMode != DestinationSyncMode .OVERWRITE ;
96
- AirbyteStreamNameNamespacePair nameNamespacePair = AirbyteStreamNameNamespacePair .fromAirbyteSteam (stream .getStream ());
83
+ AirbyteStreamNameNamespacePair nameNamespacePair = AirbyteStreamNameNamespacePair .fromAirbyteStream (stream .getStream ());
97
84
Integer flushBatchSize = catalogConfig .getFormatConfig ().getFlushBatchSize ();
98
85
WriteConfig writeConfig = new WriteConfig (namespace , streamName , isAppendMode , flushBatchSize );
99
86
configs .put (nameNamespacePair , writeConfig );
87
+
88
+ //drop temp table
100
89
try {
101
- spark . sql ( "DROP TABLE IF EXISTS " + writeConfig .getFullTempTableName () );
90
+ operations . dropTable ( writeConfig . getNamespace (), writeConfig .getTempTableName (), true );
102
91
} catch (Exception e ) {
103
92
log .warn ("Drop existed temp table failed: {}" , e .getMessage (), e );
104
93
}
94
+
95
+ //create temp table, don't catch exceptions, let it throws and fail fast
96
+ operations .createAirbyteRawTable (writeConfig .getFullTempTableName ());
105
97
}
106
98
this .writeConfigs = configs ;
107
99
}
@@ -143,11 +135,8 @@ private void appendToTempTable(WriteConfig writeConfig) {
143
135
// saveAsTable even if rows is empty, to ensure table is created.
144
136
// otherwise the table would be missing, and throws exception in close()
145
137
log .info ("=> Flushing {} rows into {}" , rows .size (), tableName );
146
- spark .createDataFrame (rows , normalizationSchema ).write ()
147
- // append data to temp table
148
- .mode (SaveMode .Append )
149
- // TODO compression config
150
- .option ("write-format" , catalogConfig .getFormatConfig ().getFormat ().getFormatName ()).saveAsTable (tableName );
138
+ String formatName = catalogConfig .getFormatConfig ().getFormat ().getFormatName ();
139
+ operations .appendRowsToTable (tableName , rows , formatName );
151
140
}
152
141
153
142
/**
@@ -168,15 +157,13 @@ protected void close(boolean hasFailed) throws Exception {
168
157
String tempTableName = writeConfig .getFullTempTableName ();
169
158
String finalTableName = writeConfig .getFullTableName ();
170
159
log .info ("=> Migration({}) data from {} to {}" ,
171
- writeConfig .isAppendMode () ? "append" : "overwrite" ,
172
- tempTableName ,
173
- finalTableName );
174
- spark .sql ("SELECT * FROM %s" .formatted (tempTableName ))
175
- .write ()
176
- .mode (writeConfig .isAppendMode () ? SaveMode .Append : SaveMode .Overwrite )
177
- .saveAsTable (finalTableName );
160
+ writeConfig .isAppendMode () ? "append" : "overwrite" ,
161
+ tempTableName ,
162
+ finalTableName );
163
+ SaveMode saveMode = writeConfig .isAppendMode () ? SaveMode .Append : SaveMode .Overwrite ;
164
+ operations .copyFullTable (tempTableName , finalTableName , saveMode );
178
165
if (catalogConfig .getFormatConfig ().isAutoCompact ()) {
179
- tryCompactTable (icebergCatalog , writeConfig );
166
+ tryCompactTable (writeConfig );
180
167
}
181
168
}
182
169
log .info ("==> Copy temp tables finished..." );
@@ -186,38 +173,28 @@ protected void close(boolean hasFailed) throws Exception {
186
173
} finally {
187
174
log .info ("Removing temp tables..." );
188
175
for (Entry <AirbyteStreamNameNamespacePair , WriteConfig > entry : writeConfigs .entrySet ()) {
189
- tryDropTempTable (icebergCatalog , entry .getValue ());
176
+ tryDropTempTable (entry .getValue ());
190
177
}
191
178
log .info ("Closing Spark Session..." );
192
- this . spark .close ();
179
+ operations .close ();
193
180
log .info ("Finishing destination process...completed" );
194
181
}
195
182
}
196
183
197
- private void tryDropTempTable (Catalog icebergCatalog , WriteConfig writeConfig ) {
184
+ private void tryDropTempTable (WriteConfig writeConfig ) {
198
185
try {
199
- log .info ("Trying to drop temp table: {}" , writeConfig .getFullTempTableName ());
200
- TableIdentifier tempTableIdentifier = TableIdentifier .of (writeConfig .getNamespace (),
201
- writeConfig .getTempTableName ());
202
- boolean dropSuccess = icebergCatalog .dropTable (tempTableIdentifier , true );
203
- log .info ("Drop temp table: {}" , writeConfig .getFullTempTableName ());
186
+ operations .dropTable (writeConfig .getNamespace (), writeConfig .getTempTableName (), true );
204
187
} catch (Exception e ) {
205
188
String errMsg = e .getMessage ();
206
189
log .error ("Drop temp table caught exception:{}" , errMsg , e );
207
190
}
208
191
}
209
192
210
- private void tryCompactTable (Catalog icebergCatalog , WriteConfig writeConfig ) {
193
+ private void tryCompactTable (WriteConfig writeConfig ) {
211
194
log .info ("=> Auto-Compact is enabled, try compact Iceberg data files" );
212
- int compactTargetFileSizeBytes =
213
- catalogConfig .getFormatConfig ().getCompactTargetFileSizeInMb () * 1024 * 1024 ;
195
+ int compactTargetFileSizeBytes = catalogConfig .getFormatConfig ().getCompactTargetFileSizeInMb () * 1024 * 1024 ;
214
196
try {
215
- TableIdentifier tableIdentifier = TableIdentifier .of (writeConfig .getNamespace (),
216
- writeConfig .getTableName ());
217
- SparkActions .get ()
218
- .rewriteDataFiles (icebergCatalog .loadTable (tableIdentifier ))
219
- .option ("target-file-size-bytes" , String .valueOf (compactTargetFileSizeBytes ))
220
- .execute ();
197
+ operations .compactTable (writeConfig .getNamespace (), writeConfig .getTableName (), compactTargetFileSizeBytes );
221
198
} catch (Exception e ) {
222
199
log .warn ("Compact Iceberg data files failed: {}" , e .getMessage (), e );
223
200
}
0 commit comments