4
4
5
5
package io .airbyte .integrations .destination .bigquery .typing_deduping ;
6
6
7
+ import static io .airbyte .integrations .base .destination .typing_deduping .CollectionUtils .containsAllIgnoreCase ;
8
+ import static io .airbyte .integrations .base .destination .typing_deduping .CollectionUtils .containsIgnoreCase ;
9
+ import static io .airbyte .integrations .base .destination .typing_deduping .CollectionUtils .matchingKey ;
10
+ import static io .airbyte .integrations .destination .bigquery .typing_deduping .BigQuerySqlGenerator .QUOTE ;
11
+ import static io .airbyte .integrations .destination .bigquery .typing_deduping .BigQuerySqlGenerator .clusteringColumns ;
12
+ import static io .airbyte .integrations .destination .bigquery .typing_deduping .BigQuerySqlGenerator .toDialectType ;
13
+ import static java .util .stream .Collectors .toMap ;
14
+
7
15
import com .google .cloud .bigquery .BigQuery ;
8
16
import com .google .cloud .bigquery .BigQueryException ;
17
+ import com .google .cloud .bigquery .Field ;
9
18
import com .google .cloud .bigquery .FieldValue ;
10
19
import com .google .cloud .bigquery .Job ;
11
20
import com .google .cloud .bigquery .JobConfiguration ;
14
23
import com .google .cloud .bigquery .JobStatistics ;
15
24
import com .google .cloud .bigquery .JobStatus ;
16
25
import com .google .cloud .bigquery .QueryJobConfiguration ;
26
+ import com .google .cloud .bigquery .StandardSQLTypeName ;
27
+ import com .google .cloud .bigquery .StandardTableDefinition ;
17
28
import com .google .cloud .bigquery .Table ;
18
29
import com .google .cloud .bigquery .TableDefinition ;
19
30
import com .google .cloud .bigquery .TableId ;
31
+ import com .google .cloud .bigquery .TimePartitioning ;
32
+ import com .google .common .annotations .VisibleForTesting ;
20
33
import com .google .common .collect .Streams ;
21
34
import io .airbyte .cdk .integrations .base .AirbyteExceptionHandler ;
35
+ import io .airbyte .cdk .integrations .base .JavaBaseConstants ;
36
+ import io .airbyte .integrations .base .destination .typing_deduping .AlterTableReport ;
37
+ import io .airbyte .integrations .base .destination .typing_deduping .ColumnId ;
22
38
import io .airbyte .integrations .base .destination .typing_deduping .DestinationHandler ;
39
+ import io .airbyte .integrations .base .destination .typing_deduping .DestinationInitialStatus ;
40
+ import io .airbyte .integrations .base .destination .typing_deduping .InitialRawTableStatus ;
23
41
import io .airbyte .integrations .base .destination .typing_deduping .Sql ;
42
+ import io .airbyte .integrations .base .destination .typing_deduping .StreamConfig ;
24
43
import io .airbyte .integrations .base .destination .typing_deduping .StreamId ;
44
+ import io .airbyte .integrations .base .destination .typing_deduping .TableNotMigratedException ;
45
+ import io .airbyte .integrations .base .destination .typing_deduping .migrators .MinimumDestinationState ;
46
+ import io .airbyte .integrations .base .destination .typing_deduping .migrators .MinimumDestinationState .Impl ;
25
47
import java .math .BigInteger ;
48
+ import java .util .ArrayList ;
49
+ import java .util .Collection ;
50
+ import java .util .Collections ;
26
51
import java .util .Comparator ;
27
- import java .util .LinkedHashMap ;
52
+ import java .util .HashSet ;
28
53
import java .util .List ;
29
54
import java .util .Map ;
30
55
import java .util .Optional ;
56
+ import java .util .Set ;
31
57
import java .util .UUID ;
58
+ import java .util .stream .Collectors ;
32
59
import java .util .stream .Stream ;
33
60
import org .apache .commons .text .StringSubstitutor ;
34
61
import org .slf4j .Logger ;
35
62
import org .slf4j .LoggerFactory ;
36
63
37
64
// TODO this stuff almost definitely exists somewhere else in our codebase.
38
- public class BigQueryDestinationHandler implements DestinationHandler <TableDefinition > {
65
+ public class BigQueryDestinationHandler implements DestinationHandler <MinimumDestinationState . Impl > {
39
66
40
67
private static final Logger LOGGER = LoggerFactory .getLogger (BigQueryDestinationHandler .class );
41
68
@@ -47,32 +74,24 @@ public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocatio
47
74
this .datasetLocation = datasetLocation ;
48
75
}
49
76
50
- @ Override
51
77
public Optional <TableDefinition > findExistingTable (final StreamId id ) {
52
78
final Table table = bq .getTable (id .finalNamespace (), id .finalName ());
53
79
return Optional .ofNullable (table ).map (Table ::getDefinition );
54
80
}
55
81
56
- @ Override
57
- public LinkedHashMap <String , TableDefinition > findExistingFinalTables (List <StreamId > streamIds ) throws Exception {
58
- return null ;
59
- }
60
-
61
- @ Override
62
82
public boolean isFinalTableEmpty (final StreamId id ) {
63
83
return BigInteger .ZERO .equals (bq .getTable (TableId .of (id .finalNamespace (), id .finalName ())).getNumRows ());
64
84
}
65
85
66
- @ Override
67
- public InitialRawTableState getInitialRawTableState (final StreamId id ) throws Exception {
86
+ public InitialRawTableStatus getInitialRawTableState (final StreamId id ) throws Exception {
68
87
final Table rawTable = bq .getTable (TableId .of (id .rawNamespace (), id .rawName ()));
69
88
if (rawTable == null ) {
70
89
// Table doesn't exist. There are no unprocessed records, and no timestamp.
71
- return new InitialRawTableState ( false , Optional .empty ());
90
+ return new InitialRawTableStatus ( false , false , Optional .empty ());
72
91
}
73
92
74
93
final FieldValue unloadedRecordTimestamp = bq .query (QueryJobConfiguration .newBuilder (new StringSubstitutor (Map .of (
75
- "raw_table" , id .rawTableId (BigQuerySqlGenerator . QUOTE ))).replace (
94
+ "raw_table" , id .rawTableId (QUOTE ))).replace (
76
95
// bigquery timestamps have microsecond precision
77
96
"""
78
97
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
@@ -84,11 +103,11 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
84
103
// If it's not null, then we can return immediately - we've found some unprocessed records and their
85
104
// timestamp.
86
105
if (!unloadedRecordTimestamp .isNull ()) {
87
- return new InitialRawTableState ( true , Optional .of (unloadedRecordTimestamp .getTimestampInstant ()));
106
+ return new InitialRawTableStatus ( true , true , Optional .of (unloadedRecordTimestamp .getTimestampInstant ()));
88
107
}
89
108
90
109
final FieldValue loadedRecordTimestamp = bq .query (QueryJobConfiguration .newBuilder (new StringSubstitutor (Map .of (
91
- "raw_table" , id .rawTableId (BigQuerySqlGenerator . QUOTE ))).replace (
110
+ "raw_table" , id .rawTableId (QUOTE ))).replace (
92
111
"""
93
112
SELECT MAX(_airbyte_extracted_at)
94
113
FROM ${raw_table}
@@ -98,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
98
117
// So we just need to get the timestamp of the most recent record.
99
118
if (loadedRecordTimestamp .isNull ()) {
100
119
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
101
- return new InitialRawTableState ( false , Optional .empty ());
120
+ return new InitialRawTableStatus ( true , false , Optional .empty ());
102
121
} else {
103
122
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
104
- return new InitialRawTableState ( false , Optional .of (loadedRecordTimestamp .getTimestampInstant ()));
123
+ return new InitialRawTableStatus ( true , false , Optional .of (loadedRecordTimestamp .getTimestampInstant ()));
105
124
}
106
125
}
107
126
@@ -172,4 +191,133 @@ public void execute(final Sql sql) throws InterruptedException {
172
191
}
173
192
}
174
193
194
+ @ Override
195
+ public List <DestinationInitialStatus <Impl >> gatherInitialState (List <StreamConfig > streamConfigs ) throws Exception {
196
+ final List <DestinationInitialStatus <MinimumDestinationState .Impl >> initialStates = new ArrayList <>();
197
+ for (final StreamConfig streamConfig : streamConfigs ) {
198
+ final StreamId id = streamConfig .id ();
199
+ final Optional <TableDefinition > finalTable = findExistingTable (id );
200
+ final InitialRawTableStatus rawTableState = getInitialRawTableState (id );
201
+ initialStates .add (new DestinationInitialStatus <>(
202
+ streamConfig ,
203
+ finalTable .isPresent (),
204
+ rawTableState ,
205
+ finalTable .isPresent () && !existingSchemaMatchesStreamConfig (streamConfig , finalTable .get ()),
206
+ finalTable .isEmpty () || isFinalTableEmpty (id ),
207
+ // Return a default state blob since we don't actually track state.
208
+ new MinimumDestinationState .Impl (false )));
209
+ }
210
+ return initialStates ;
211
+ }
212
+
213
+ @ Override
214
+ public void commitDestinationStates (Map <StreamId , MinimumDestinationState .Impl > destinationStates ) throws Exception {
215
+ // Intentionally do nothing. Bigquery doesn't actually support destination states.
216
+ }
217
+
218
+ private boolean existingSchemaMatchesStreamConfig (final StreamConfig stream ,
219
+ final TableDefinition existingTable )
220
+ throws TableNotMigratedException {
221
+ final var alterTableReport = buildAlterTableReport (stream , existingTable );
222
+ boolean tableClusteringMatches = false ;
223
+ boolean tablePartitioningMatches = false ;
224
+ if (existingTable instanceof final StandardTableDefinition standardExistingTable ) {
225
+ tableClusteringMatches = clusteringMatches (stream , standardExistingTable );
226
+ tablePartitioningMatches = partitioningMatches (standardExistingTable );
227
+ }
228
+ LOGGER .info ("Alter Table Report {} {} {}; Clustering {}; Partitioning {}" ,
229
+ alterTableReport .columnsToAdd (),
230
+ alterTableReport .columnsToRemove (),
231
+ alterTableReport .columnsToChangeType (),
232
+ tableClusteringMatches ,
233
+ tablePartitioningMatches );
234
+
235
+ return alterTableReport .isNoOp () && tableClusteringMatches && tablePartitioningMatches ;
236
+ }
237
+
238
+ public AlterTableReport buildAlterTableReport (final StreamConfig stream , final TableDefinition existingTable ) {
239
+ final Set <String > pks = getPks (stream );
240
+
241
+ final Map <String , StandardSQLTypeName > streamSchema = stream .columns ().entrySet ().stream ()
242
+ .collect (toMap (
243
+ entry -> entry .getKey ().name (),
244
+ entry -> toDialectType (entry .getValue ())));
245
+
246
+ final Map <String , StandardSQLTypeName > existingSchema = existingTable .getSchema ().getFields ().stream ()
247
+ .collect (toMap (
248
+ field -> field .getName (),
249
+ field -> field .getType ().getStandardType ()));
250
+
251
+ // Columns in the StreamConfig that don't exist in the TableDefinition
252
+ final Set <String > columnsToAdd = streamSchema .keySet ().stream ()
253
+ .filter (name -> !containsIgnoreCase (existingSchema .keySet (), name ))
254
+ .collect (Collectors .toSet ());
255
+
256
+ // Columns in the current schema that are no longer in the StreamConfig
257
+ final Set <String > columnsToRemove = existingSchema .keySet ().stream ()
258
+ .filter (name -> !containsIgnoreCase (streamSchema .keySet (), name ) && !containsIgnoreCase (
259
+ JavaBaseConstants .V2_FINAL_TABLE_METADATA_COLUMNS , name ))
260
+ .collect (Collectors .toSet ());
261
+
262
+ // Columns that are typed differently than the StreamConfig
263
+ final Set <String > columnsToChangeType = Stream .concat (
264
+ streamSchema .keySet ().stream ()
265
+ // If it's not in the existing schema, it should already be in the columnsToAdd Set
266
+ .filter (name -> {
267
+ // Big Query Columns are case-insensitive, first find the correctly cased key if it exists
268
+ return matchingKey (existingSchema .keySet (), name )
269
+ // if it does exist, only include it in this set if the type (the value in each respective map)
270
+ // is different between the stream and existing schemas
271
+ .map (key -> !existingSchema .get (key ).equals (streamSchema .get (name )))
272
+ // if there is no matching key, then don't include it because it is probably already in columnsToAdd
273
+ .orElse (false );
274
+ }),
275
+
276
+ // OR columns that used to have a non-null constraint and shouldn't
277
+ // (https://github.com/airbytehq/airbyte/pull/31082)
278
+ existingTable .getSchema ().getFields ().stream ()
279
+ .filter (field -> pks .contains (field .getName ()))
280
+ .filter (field -> field .getMode () == Field .Mode .REQUIRED )
281
+ .map (Field ::getName ))
282
+ .collect (Collectors .toSet ());
283
+
284
+ final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns (existingSchema .keySet ());
285
+
286
+ return new AlterTableReport (columnsToAdd , columnsToRemove , columnsToChangeType , isDestinationV2Format );
287
+ }
288
+
289
+ @ VisibleForTesting
290
+ public static boolean clusteringMatches (final StreamConfig stream , final StandardTableDefinition existingTable ) {
291
+ return existingTable .getClustering () != null
292
+ && containsAllIgnoreCase (
293
+ new HashSet <>(existingTable .getClustering ().getFields ()),
294
+ clusteringColumns (stream ));
295
+ }
296
+
297
+ @ VisibleForTesting
298
+ public static boolean partitioningMatches (final StandardTableDefinition existingTable ) {
299
+ return existingTable .getTimePartitioning () != null
300
+ && existingTable .getTimePartitioning ()
301
+ .getField ()
302
+ .equalsIgnoreCase ("_airbyte_extracted_at" )
303
+ && TimePartitioning .Type .DAY .equals (existingTable .getTimePartitioning ().getType ());
304
+ }
305
+
306
+ /**
307
+ * Checks the schema to determine whether the table contains all expected final table airbyte
308
+ * columns
309
+ *
310
+ * @param columnNames the column names of the schema to check
311
+ * @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present
312
+ */
313
+ @ VisibleForTesting
314
+ public static boolean schemaContainAllFinalTableV2AirbyteColumns (final Collection <String > columnNames ) {
315
+ return JavaBaseConstants .V2_FINAL_TABLE_METADATA_COLUMNS .stream ()
316
+ .allMatch (column -> containsIgnoreCase (columnNames , column ));
317
+ }
318
+
319
+ private static Set <String > getPks (final StreamConfig stream ) {
320
+ return stream .primaryKey () != null ? stream .primaryKey ().stream ().map (ColumnId ::name ).collect (Collectors .toSet ()) : Collections .emptySet ();
321
+ }
322
+
175
323
}
0 commit comments