Skip to content

Commit 990bd5b

Browse files
fix compiler errors
1 parent 4695a81 commit 990bd5b

File tree

41 files changed

+355
-471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+355
-471
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -93,23 +93,23 @@ protected DSLContext getDslContext() {
9393
}
9494

9595
private Optional<TableDefinition> findExistingTable(final StreamId id) throws Exception {
96-
return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace(), id.finalName());
96+
return findExistingTable(jdbcDatabase, databaseName, id.getFinalNamespace(), id.getFinalName());
9797
}
9898

9999
private boolean isFinalTableEmpty(final StreamId id) throws Exception {
100100
return !jdbcDatabase.queryBoolean(
101101
getDslContext().select(
102102
field(exists(
103103
selectOne()
104-
.from(name(id.finalNamespace(), id.finalName()))
104+
.from(name(id.getFinalNamespace(), id.getFinalName()))
105105
.limit(1))))
106106
.getSQL(ParamType.INLINED));
107107
}
108108

109109
private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
110110
boolean tableExists = jdbcDatabase.executeMetadataQuery(dbmetadata -> {
111-
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.rawNamespace(), id.rawName());
112-
try (final ResultSet table = dbmetadata.getTables(databaseName, id.rawNamespace(), id.rawName(), null)) {
111+
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.getRawNamespace(), id.getRawName());
112+
try (final ResultSet table = dbmetadata.getTables(databaseName, id.getRawNamespace(), id.getRawName(), null)) {
113113
return table.next();
114114
} catch (SQLException e) {
115115
LOGGER.error("Failed to retrieve table info from metadata", e);
@@ -128,7 +128,7 @@ private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws
128128
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
129129
conn -> conn.prepareStatement(
130130
getDslContext().select(field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
131-
.from(name(id.rawNamespace(), id.rawName()))
131+
.from(name(id.getRawNamespace(), id.getRawName()))
132132
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
133133
.getSQL()),
134134
record -> record.getTimestamp("min_timestamp"))) {
@@ -147,7 +147,7 @@ record -> record.getTimestamp("min_timestamp"))) {
147147
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
148148
conn -> conn.prepareStatement(
149149
getDslContext().select(field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
150-
.from(name(id.rawNamespace(), id.rawName()))
150+
.from(name(id.getRawNamespace(), id.getRawName()))
151151
.getSQL()),
152152
record -> record.getTimestamp("min_timestamp"))) {
153153
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
@@ -158,7 +158,7 @@ record -> record.getTimestamp("min_timestamp"))) {
158158

159159
@Override
160160
public void execute(final Sql sql) throws Exception {
161-
final List<List<String>> transactions = sql.transactions();
161+
final List<List<String>> transactions = sql.transactions;
162162
final UUID queryId = UUID.randomUUID();
163163
for (final List<String> transaction : transactions) {
164164
final UUID transactionId = UUID.randomUUID();
@@ -236,20 +236,20 @@ private CompletionStage<DestinationInitialStatus<DestinationState>> retrieveStat
236236
final StreamConfig streamConfig) {
237237
return destinationStatesFuture.thenApply(destinationStates -> {
238238
try {
239-
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.id());
239+
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.getId());
240240
final boolean isSchemaMismatch;
241241
final boolean isFinalTableEmpty;
242242
if (finalTableDefinition.isPresent()) {
243243
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, finalTableDefinition.get());
244-
isFinalTableEmpty = isFinalTableEmpty(streamConfig.id());
244+
isFinalTableEmpty = isFinalTableEmpty(streamConfig.getId());
245245
} else {
246246
// If the final table doesn't exist, then by definition it doesn't have a schema mismatch and has no
247247
// records.
248248
isSchemaMismatch = false;
249249
isFinalTableEmpty = true;
250250
}
251-
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id());
252-
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
251+
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.getId());
252+
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.getId().asPair(), toDestinationState(Jsons.emptyObject()));
253253
return new DestinationInitialStatus<>(streamConfig, finalTableDefinition.isPresent(), initialRawTableState,
254254
isSchemaMismatch, isFinalTableEmpty, destinationState);
255255
} catch (Exception e) {
@@ -318,9 +318,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
318318
// Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset
319319
return false;
320320
}
321-
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
321+
final LinkedHashMap<String, String> intendedColumns = stream.getColumns().entrySet().stream()
322322
.collect(LinkedHashMap::new,
323-
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
323+
(map, column) -> map.put(column.getKey().getName(), toJdbcTypeName(column.getValue())),
324324
LinkedHashMap::putAll);
325325

326326
// Filter out Meta columns since they don't exist in stream config.
@@ -335,16 +335,16 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
335335
}
336336

337337
@Override
338-
public void commitDestinationStates(final Map<StreamId, DestinationState> destinationStates) throws Exception {
338+
public void commitDestinationStates(final Map<StreamId, ? extends DestinationState> destinationStates) throws Exception {
339339
if (destinationStates.isEmpty()) {
340340
return;
341341
}
342342

343343
// Delete all state records where the stream name+namespace match one of our states
344344
String deleteStates = getDslContext().deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)))
345345
.where(destinationStates.keySet().stream()
346-
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.originalName())
347-
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.originalNamespace())))
346+
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.getOriginalName())
347+
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.getOriginalNamespace())))
348348
.reduce(
349349
DSL.falseCondition(),
350350
Condition::or))
@@ -362,10 +362,10 @@ public void commitDestinationStates(final Map<StreamId, DestinationState> destin
362362
// and assume the destination can cast it appropriately.
363363
// Destination-specific timestamp syntax is weird and annoying.
364364
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), String.class));
365-
for (Map.Entry<StreamId, DestinationState> destinationState : destinationStates.entrySet()) {
365+
for (Map.Entry<StreamId, ? extends DestinationState> destinationState : destinationStates.entrySet()) {
366366
final StreamId streamId = destinationState.getKey();
367367
final String stateJson = Jsons.serialize(destinationState.getValue());
368-
insertStatesStep = insertStatesStep.values(streamId.originalName(), streamId.originalNamespace(), stateJson, OffsetDateTime.now().toString());
368+
insertStatesStep = insertStatesStep.values(streamId.getOriginalName(), streamId.getOriginalNamespace(), stateJson, OffsetDateTime.now().toString());
369369
}
370370
String insertStates = insertStatesStep.getSQL(ParamType.INLINED);
371371

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java

+26-26
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ List<Field<?>> buildFinalTableFields(final LinkedHashMap<ColumnId, AirbyteType>
195195
final List<Field<?>> fields =
196196
metaColumns.entrySet().stream().map(metaColumn -> field(quotedName(metaColumn.getKey()), metaColumn.getValue())).collect(toList());
197197
final List<Field<?>> dataFields =
198-
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().name()), toDialectType(column.getValue()))).collect(
198+
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().getName()), toDialectType(column.getValue()))).collect(
199199
toList());
200200
dataFields.addAll(fields);
201201
return dataFields;
@@ -258,16 +258,16 @@ public Sql createSchema(final String schema) {
258258
@Override
259259
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
260260
// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
261-
final String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
261+
final String finalTableIdentifier = stream.getId().getFinalName() + suffix.toLowerCase();
262262
if (!force) {
263263
return transactionally(Stream.concat(
264-
Stream.of(createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
264+
Stream.of(createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
265265
createIndexSql(stream, suffix).stream()).toList());
266266
}
267267
return transactionally(Stream.concat(
268268
Stream.of(
269-
dropTableIfExists(quotedName(stream.id().finalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
270-
createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
269+
dropTableIfExists(quotedName(stream.getId().getFinalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
270+
createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
271271
createIndexSql(stream, suffix).stream()).toList());
272272
}
273273

@@ -285,18 +285,18 @@ public Sql updateTable(final StreamConfig streamConfig,
285285
@Override
286286
public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) {
287287
return transactionally(
288-
dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED),
289-
alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix))
290-
.renameTo(name(stream.finalName()))
288+
dropTableIfExists(name(stream.getFinalNamespace(), stream.getFinalName())).getSQL(ParamType.INLINED),
289+
alterTable(name(stream.getFinalNamespace(), stream.getFinalName() + finalSuffix))
290+
.renameTo(name(stream.getFinalName()))
291291
.getSQL());
292292
}
293293

294294
@Override
295295
public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
296-
final Name rawTableName = name(streamId.rawNamespace(), streamId.rawName());
296+
final Name rawTableName = name(streamId.getRawNamespace(), streamId.getRawName());
297297
final DSLContext dsl = getDslContext();
298298
return transactionally(
299-
dsl.createSchemaIfNotExists(streamId.rawNamespace()).getSQL(),
299+
dsl.createSchemaIfNotExists(streamId.getRawNamespace()).getSQL(),
300300
dsl.dropTableIfExists(rawTableName).getSQL(),
301301
DSL.createTable(rawTableName)
302302
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
@@ -315,7 +315,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi
315315

316316
@Override
317317
public Sql clearLoadedAt(final StreamId streamId) {
318-
return Sql.of(update(table(name(streamId.rawNamespace(), streamId.rawName())))
318+
return Sql.of(update(table(name(streamId.getRawNamespace(), streamId.getRawName())))
319319
.set(field(COLUMN_NAME_AB_LOADED_AT), inline((String) null))
320320
.getSQL());
321321
}
@@ -350,28 +350,28 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
350350
final String finalSuffix,
351351
final Optional<Instant> minRawTimestamp,
352352
final boolean useExpensiveSaferCasting) {
353-
final String finalSchema = streamConfig.id().finalNamespace();
354-
final String finalTable = streamConfig.id().finalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
355-
final String rawSchema = streamConfig.id().rawNamespace();
356-
final String rawTable = streamConfig.id().rawName();
353+
final String finalSchema = streamConfig.getId().getFinalNamespace();
354+
final String finalTable = streamConfig.getId().getFinalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
355+
final String rawSchema = streamConfig.getId().getRawNamespace();
356+
final String rawTable = streamConfig.getId().getRawName();
357357

358358
// Poor person's guarantee of ordering of fields by using same source of ordered list of columns to
359359
// generate fields.
360360
final CommonTableExpression<Record> rawTableRowsWithCast = name(TYPING_CTE_ALIAS).as(
361-
selectFromRawTable(rawSchema, rawTable, streamConfig.columns(),
361+
selectFromRawTable(rawSchema, rawTable, streamConfig.getColumns(),
362362
getFinalTableMetaColumns(false),
363-
rawTableCondition(streamConfig.destinationSyncMode(),
364-
streamConfig.columns().containsKey(cdcDeletedAtColumn),
363+
rawTableCondition(streamConfig.getDestinationSyncMode(),
364+
streamConfig.getColumns().containsKey(cdcDeletedAtColumn),
365365
minRawTimestamp),
366366
useExpensiveSaferCasting));
367-
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.columns(), getFinalTableMetaColumns(true));
368-
final Field<Integer> rowNumber = getRowNumber(streamConfig.primaryKey(), streamConfig.cursor());
367+
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.getColumns(), getFinalTableMetaColumns(true));
368+
final Field<Integer> rowNumber = getRowNumber(streamConfig.getPrimaryKey(), streamConfig.getCursor());
369369
final CommonTableExpression<Record> filteredRows = name(NUMBERED_ROWS_CTE_ALIAS).as(
370370
select(asterisk(), rowNumber).from(rawTableRowsWithCast));
371371

372372
// Used for append-dedupe mode.
373373
final String insertStmtWithDedupe =
374-
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
374+
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
375375
.select(with(rawTableRowsWithCast)
376376
.with(filteredRows)
377377
.select(finalTableFields)
@@ -383,17 +383,17 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
383383

384384
// Used for append and overwrite modes.
385385
final String insertStmt =
386-
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
386+
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
387387
.select(with(rawTableRowsWithCast)
388388
.select(finalTableFields)
389389
.from(rawTableRowsWithCast))
390390
.getSQL(ParamType.INLINED);
391-
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor());
391+
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.getPrimaryKey(), streamConfig.getCursor());
392392
final String deleteCdcDeletesStmt =
393-
streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
393+
streamConfig.getColumns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
394394
final String checkpointStmt = checkpointRawTable(rawSchema, rawTable, minRawTimestamp);
395395

396-
if (streamConfig.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
396+
if (streamConfig.getDestinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
397397
return transactionally(
398398
insertStmt,
399399
checkpointStmt);
@@ -470,7 +470,7 @@ private String deleteFromFinalTable(final String schemaName,
470470
private String deleteFromFinalTableCdcDeletes(final String schema, final String tableName) {
471471
final DSLContext dsl = getDslContext();
472472
return dsl.deleteFrom(table(quotedName(schema, tableName)))
473-
.where(field(quotedName(cdcDeletedAtColumn.name())).isNotNull())
473+
.where(field(quotedName(cdcDeletedAtColumn.getName())).isNotNull())
474474
.getSQL(ParamType.INLINED);
475475
}
476476

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransf
3535

3636
@SneakyThrows
3737
@Override
38-
protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
38+
public boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
3939
final String retrievedSchema = database.executeMetadataQuery(dbMetadata -> {
40-
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.id().rawNamespace())) {
40+
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.getId().getRawNamespace())) {
4141
String schema = "";
4242
while (columns.next()) {
4343
// Catalog can be null, so don't do anything with it.
@@ -54,22 +54,22 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon
5454
}
5555

5656
@Override
57-
protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
57+
public boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
5858
return existingTable.columns().keySet().containsAll(columns);
5959
}
6060

6161
@SneakyThrows
6262
@Override
63-
protected Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
63+
public Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
6464
return JdbcDestinationHandler.findExistingTable(database, databaseName, namespace, tableName);
6565
}
6666

6767
@Override
68-
protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
68+
public NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
6969
@SuppressWarnings("deprecation")
70-
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.id().originalName());
70+
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.getId().getOriginalName());
7171
return new NamespacedTableName(
72-
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
72+
this.namingConventionTransformer.getIdentifier(streamConfig.getId().getOriginalNamespace()),
7373
tableName);
7474
}
7575

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(fina
111111
final String outputSchema;
112112
final String tableName;
113113
if (useDestinationsV2Columns) {
114-
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).id();
115-
outputSchema = streamId.rawNamespace();
116-
tableName = streamId.rawName();
114+
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).getId();
115+
outputSchema = streamId.getRawNamespace();
116+
tableName = streamId.getRawName();
117117
} else {
118118
outputSchema = getOutputSchema(abStream, config.get("schema").asText(), namingResolver);
119119
tableName = namingResolver.getRawTableName(streamName);

0 commit comments

Comments
 (0)