@@ -195,7 +195,7 @@ List<Field<?>> buildFinalTableFields(final LinkedHashMap<ColumnId, AirbyteType>
195
195
final List <Field <?>> fields =
196
196
metaColumns .entrySet ().stream ().map (metaColumn -> field (quotedName (metaColumn .getKey ()), metaColumn .getValue ())).collect (toList ());
197
197
final List <Field <?>> dataFields =
198
- columns .entrySet ().stream ().map (column -> field (quotedName (column .getKey ().name ()), toDialectType (column .getValue ()))).collect (
198
+ columns .entrySet ().stream ().map (column -> field (quotedName (column .getKey ().getName ()), toDialectType (column .getValue ()))).collect (
199
199
toList ());
200
200
dataFields .addAll (fields );
201
201
return dataFields ;
@@ -258,16 +258,16 @@ public Sql createSchema(final String schema) {
258
258
@ Override
259
259
public Sql createTable (final StreamConfig stream , final String suffix , final boolean force ) {
260
260
// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
261
- final String finalTableIdentifier = stream .id ().finalName () + suffix .toLowerCase ();
261
+ final String finalTableIdentifier = stream .getId ().getFinalName () + suffix .toLowerCase ();
262
262
if (!force ) {
263
263
return transactionally (Stream .concat (
264
- Stream .of (createTableSql (stream .id ().finalNamespace (), finalTableIdentifier , stream .columns ())),
264
+ Stream .of (createTableSql (stream .getId ().getFinalNamespace (), finalTableIdentifier , stream .getColumns ())),
265
265
createIndexSql (stream , suffix ).stream ()).toList ());
266
266
}
267
267
return transactionally (Stream .concat (
268
268
Stream .of (
269
- dropTableIfExists (quotedName (stream .id ().finalNamespace (), finalTableIdentifier )).getSQL (ParamType .INLINED ),
270
- createTableSql (stream .id ().finalNamespace (), finalTableIdentifier , stream .columns ())),
269
+ dropTableIfExists (quotedName (stream .getId ().getFinalNamespace (), finalTableIdentifier )).getSQL (ParamType .INLINED ),
270
+ createTableSql (stream .getId ().getFinalNamespace (), finalTableIdentifier , stream .getColumns ())),
271
271
createIndexSql (stream , suffix ).stream ()).toList ());
272
272
}
273
273
@@ -285,18 +285,18 @@ public Sql updateTable(final StreamConfig streamConfig,
285
285
@ Override
286
286
public Sql overwriteFinalTable (final StreamId stream , final String finalSuffix ) {
287
287
return transactionally (
288
- dropTableIfExists (name (stream .finalNamespace (), stream .finalName ())).getSQL (ParamType .INLINED ),
289
- alterTable (name (stream .finalNamespace (), stream .finalName () + finalSuffix ))
290
- .renameTo (name (stream .finalName ()))
288
+ dropTableIfExists (name (stream .getFinalNamespace (), stream .getFinalName ())).getSQL (ParamType .INLINED ),
289
+ alterTable (name (stream .getFinalNamespace (), stream .getFinalName () + finalSuffix ))
290
+ .renameTo (name (stream .getFinalName ()))
291
291
.getSQL ());
292
292
}
293
293
294
294
@ Override
295
295
public Sql migrateFromV1toV2 (final StreamId streamId , final String namespace , final String tableName ) {
296
- final Name rawTableName = name (streamId .rawNamespace (), streamId .rawName ());
296
+ final Name rawTableName = name (streamId .getRawNamespace (), streamId .getRawName ());
297
297
final DSLContext dsl = getDslContext ();
298
298
return transactionally (
299
- dsl .createSchemaIfNotExists (streamId .rawNamespace ()).getSQL (),
299
+ dsl .createSchemaIfNotExists (streamId .getRawNamespace ()).getSQL (),
300
300
dsl .dropTableIfExists (rawTableName ).getSQL (),
301
301
DSL .createTable (rawTableName )
302
302
.column (COLUMN_NAME_AB_RAW_ID , SQLDataType .VARCHAR (36 ).nullable (false ))
@@ -315,7 +315,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi
315
315
316
316
@ Override
317
317
public Sql clearLoadedAt (final StreamId streamId ) {
318
- return Sql .of (update (table (name (streamId .rawNamespace (), streamId .rawName ())))
318
+ return Sql .of (update (table (name (streamId .getRawNamespace (), streamId .getRawName ())))
319
319
.set (field (COLUMN_NAME_AB_LOADED_AT ), inline ((String ) null ))
320
320
.getSQL ());
321
321
}
@@ -350,28 +350,28 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
350
350
final String finalSuffix ,
351
351
final Optional <Instant > minRawTimestamp ,
352
352
final boolean useExpensiveSaferCasting ) {
353
- final String finalSchema = streamConfig .id ().finalNamespace ();
354
- final String finalTable = streamConfig .id ().finalName () + (finalSuffix != null ? finalSuffix .toLowerCase () : "" );
355
- final String rawSchema = streamConfig .id ().rawNamespace ();
356
- final String rawTable = streamConfig .id ().rawName ();
353
+ final String finalSchema = streamConfig .getId ().getFinalNamespace ();
354
+ final String finalTable = streamConfig .getId ().getFinalName () + (finalSuffix != null ? finalSuffix .toLowerCase () : "" );
355
+ final String rawSchema = streamConfig .getId ().getRawNamespace ();
356
+ final String rawTable = streamConfig .getId ().getRawName ();
357
357
358
358
// Poor person's guarantee of ordering of fields by using same source of ordered list of columns to
359
359
// generate fields.
360
360
final CommonTableExpression <Record > rawTableRowsWithCast = name (TYPING_CTE_ALIAS ).as (
361
- selectFromRawTable (rawSchema , rawTable , streamConfig .columns (),
361
+ selectFromRawTable (rawSchema , rawTable , streamConfig .getColumns (),
362
362
getFinalTableMetaColumns (false ),
363
- rawTableCondition (streamConfig .destinationSyncMode (),
364
- streamConfig .columns ().containsKey (cdcDeletedAtColumn ),
363
+ rawTableCondition (streamConfig .getDestinationSyncMode (),
364
+ streamConfig .getColumns ().containsKey (cdcDeletedAtColumn ),
365
365
minRawTimestamp ),
366
366
useExpensiveSaferCasting ));
367
- final List <Field <?>> finalTableFields = buildFinalTableFields (streamConfig .columns (), getFinalTableMetaColumns (true ));
368
- final Field <Integer > rowNumber = getRowNumber (streamConfig .primaryKey (), streamConfig .cursor ());
367
+ final List <Field <?>> finalTableFields = buildFinalTableFields (streamConfig .getColumns (), getFinalTableMetaColumns (true ));
368
+ final Field <Integer > rowNumber = getRowNumber (streamConfig .getPrimaryKey (), streamConfig .getCursor ());
369
369
final CommonTableExpression <Record > filteredRows = name (NUMBERED_ROWS_CTE_ALIAS ).as (
370
370
select (asterisk (), rowNumber ).from (rawTableRowsWithCast ));
371
371
372
372
// Used for append-dedupe mode.
373
373
final String insertStmtWithDedupe =
374
- insertIntoFinalTable (finalSchema , finalTable , streamConfig .columns (), getFinalTableMetaColumns (true ))
374
+ insertIntoFinalTable (finalSchema , finalTable , streamConfig .getColumns (), getFinalTableMetaColumns (true ))
375
375
.select (with (rawTableRowsWithCast )
376
376
.with (filteredRows )
377
377
.select (finalTableFields )
@@ -383,17 +383,17 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
383
383
384
384
// Used for append and overwrite modes.
385
385
final String insertStmt =
386
- insertIntoFinalTable (finalSchema , finalTable , streamConfig .columns (), getFinalTableMetaColumns (true ))
386
+ insertIntoFinalTable (finalSchema , finalTable , streamConfig .getColumns (), getFinalTableMetaColumns (true ))
387
387
.select (with (rawTableRowsWithCast )
388
388
.select (finalTableFields )
389
389
.from (rawTableRowsWithCast ))
390
390
.getSQL (ParamType .INLINED );
391
- final String deleteStmt = deleteFromFinalTable (finalSchema , finalTable , streamConfig .primaryKey (), streamConfig .cursor ());
391
+ final String deleteStmt = deleteFromFinalTable (finalSchema , finalTable , streamConfig .getPrimaryKey (), streamConfig .getCursor ());
392
392
final String deleteCdcDeletesStmt =
393
- streamConfig .columns ().containsKey (cdcDeletedAtColumn ) ? deleteFromFinalTableCdcDeletes (finalSchema , finalTable ) : "" ;
393
+ streamConfig .getColumns ().containsKey (cdcDeletedAtColumn ) ? deleteFromFinalTableCdcDeletes (finalSchema , finalTable ) : "" ;
394
394
final String checkpointStmt = checkpointRawTable (rawSchema , rawTable , minRawTimestamp );
395
395
396
- if (streamConfig .destinationSyncMode () != DestinationSyncMode .APPEND_DEDUP ) {
396
+ if (streamConfig .getDestinationSyncMode () != DestinationSyncMode .APPEND_DEDUP ) {
397
397
return transactionally (
398
398
insertStmt ,
399
399
checkpointStmt );
@@ -470,7 +470,7 @@ private String deleteFromFinalTable(final String schemaName,
470
470
private String deleteFromFinalTableCdcDeletes (final String schema , final String tableName ) {
471
471
final DSLContext dsl = getDslContext ();
472
472
return dsl .deleteFrom (table (quotedName (schema , tableName )))
473
- .where (field (quotedName (cdcDeletedAtColumn .name ())).isNotNull ())
473
+ .where (field (quotedName (cdcDeletedAtColumn .getName ())).isNotNull ())
474
474
.getSQL (ParamType .INLINED );
475
475
}
476
476
0 commit comments