36
36
import io .airbyte .integrations .base .destination .typing_deduping .AlterTableReport ;
37
37
import io .airbyte .integrations .base .destination .typing_deduping .ColumnId ;
38
38
import io .airbyte .integrations .base .destination .typing_deduping .DestinationHandler ;
39
- import io .airbyte .integrations .base .destination .typing_deduping .DestinationInitialState ;
40
- import io .airbyte .integrations .base .destination .typing_deduping .InitialRawTableState ;
39
+ import io .airbyte .integrations .base .destination .typing_deduping .DestinationInitialStatus ;
40
+ import io .airbyte .integrations .base .destination .typing_deduping .InitialRawTableStatus ;
41
41
import io .airbyte .integrations .base .destination .typing_deduping .Sql ;
42
42
import io .airbyte .integrations .base .destination .typing_deduping .StreamConfig ;
43
43
import io .airbyte .integrations .base .destination .typing_deduping .StreamId ;
44
44
import io .airbyte .integrations .base .destination .typing_deduping .TableNotMigratedException ;
45
45
import io .airbyte .integrations .base .destination .typing_deduping .migrators .MinimumDestinationState ;
46
+ import io .airbyte .integrations .base .destination .typing_deduping .migrators .MinimumDestinationState .Impl ;
46
47
import java .math .BigInteger ;
47
48
import java .util .ArrayList ;
48
49
import java .util .Collection ;
@@ -82,11 +83,11 @@ public boolean isFinalTableEmpty(final StreamId id) {
82
83
return BigInteger .ZERO .equals (bq .getTable (TableId .of (id .finalNamespace (), id .finalName ())).getNumRows ());
83
84
}
84
85
85
- public InitialRawTableState getInitialRawTableState (final StreamId id ) throws Exception {
86
+ public InitialRawTableStatus getInitialRawTableState (final StreamId id ) throws Exception {
86
87
final Table rawTable = bq .getTable (TableId .of (id .rawNamespace (), id .rawName ()));
87
88
if (rawTable == null ) {
88
89
// Table doesn't exist. There are no unprocessed records, and no timestamp.
89
- return new InitialRawTableState (false , false , Optional .empty ());
90
+ return new InitialRawTableStatus (false , false , Optional .empty ());
90
91
}
91
92
92
93
final FieldValue unloadedRecordTimestamp = bq .query (QueryJobConfiguration .newBuilder (new StringSubstitutor (Map .of (
@@ -102,7 +103,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
102
103
// If it's not null, then we can return immediately - we've found some unprocessed records and their
103
104
// timestamp.
104
105
if (!unloadedRecordTimestamp .isNull ()) {
105
- return new InitialRawTableState (true , true , Optional .of (unloadedRecordTimestamp .getTimestampInstant ()));
106
+ return new InitialRawTableStatus (true , true , Optional .of (unloadedRecordTimestamp .getTimestampInstant ()));
106
107
}
107
108
108
109
final FieldValue loadedRecordTimestamp = bq .query (QueryJobConfiguration .newBuilder (new StringSubstitutor (Map .of (
@@ -116,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
116
117
// So we just need to get the timestamp of the most recent record.
117
118
if (loadedRecordTimestamp .isNull ()) {
118
119
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
119
- return new InitialRawTableState (true , false , Optional .empty ());
120
+ return new InitialRawTableStatus (true , false , Optional .empty ());
120
121
} else {
121
122
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
122
- return new InitialRawTableState (true , false , Optional .of (loadedRecordTimestamp .getTimestampInstant ()));
123
+ return new InitialRawTableStatus (true , false , Optional .of (loadedRecordTimestamp .getTimestampInstant ()));
123
124
}
124
125
}
125
126
@@ -191,18 +192,18 @@ public void execute(final Sql sql) throws InterruptedException {
191
192
}
192
193
193
194
@ Override
194
- public List <DestinationInitialState < MinimumDestinationState . Impl >> gatherInitialState (List <StreamConfig > streamConfigs ) throws Exception {
195
- final List <DestinationInitialState <MinimumDestinationState .Impl >> initialStates = new ArrayList <>();
195
+ public List <DestinationInitialStatus < Impl >> gatherInitialState (List <StreamConfig > streamConfigs ) throws Exception {
196
+ final List <DestinationInitialStatus <MinimumDestinationState .Impl >> initialStates = new ArrayList <>();
196
197
for (final StreamConfig streamConfig : streamConfigs ) {
197
198
final StreamId id = streamConfig .id ();
198
199
final Optional <TableDefinition > finalTable = findExistingTable (id );
199
- final InitialRawTableState rawTableState = getInitialRawTableState (id );
200
- initialStates .add (new DestinationInitialState <>(
200
+ final InitialRawTableStatus rawTableState = getInitialRawTableState (id );
201
+ initialStates .add (new DestinationInitialStatus <>(
201
202
streamConfig ,
202
203
finalTable .isPresent (),
203
204
rawTableState ,
204
205
finalTable .isPresent () && !existingSchemaMatchesStreamConfig (streamConfig , finalTable .get ()),
205
- ! finalTable .isPresent () || isFinalTableEmpty (id ),
206
+ finalTable .isEmpty () || isFinalTableEmpty (id ),
206
207
// Return a default state blob since we don't actually track state.
207
208
new MinimumDestinationState .Impl (false )));
208
209
}
0 commit comments