Skip to content

Commit 978142e

Browse files
fix kotlin warnings in destination CDK submodules (#37481)
cleaning up kotlin warnings
1 parent 9413578 commit 978142e

File tree

48 files changed

+178
-238
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+178
-238
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/function/DestinationFlushFunction.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ interface DestinationFlushFunction {
3131
/**
3232
* Flush a batch of data to the destination.
3333
*
34-
* @param decs the Airbyte stream the data stream belongs to
34+
* @param streamDescriptor the Airbyte stream the data stream belongs to
3535
* @param stream a bounded [AirbyteMessage] stream ideally of [.getOptimalBatchSizeBytes] size
3636
* @throws Exception
3737
*/
3838
@Throws(Exception::class)
3939
fun flush(
40-
decs: StreamDescriptor,
40+
streamDescriptor: StreamDescriptor,
4141
stream: Stream<PartialAirbyteMessage>,
4242
)
4343

airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle

-11
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,3 @@
1-
java {
2-
// TODO: rewrite code to avoid javac wornings in the first place
3-
compileJava {
4-
options.compilerArgs += "-Xlint:-deprecation,-removal,-this-escape"
5-
}
6-
compileTestFixturesJava {
7-
options.compilerArgs += "-Xlint:-try,-this-escape"
8-
}
9-
}
10-
11-
compileKotlin.compilerOptions.allWarningsAsErrors = false
121
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
132
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
143

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ object JdbcBufferedConsumerFactory {
129129
val finalSchema = Optional.ofNullable(abStream.namespace).orElse(defaultSchemaName)
130130
val rawName = concatenateRawTableName(finalSchema, streamName)
131131
tableName = namingResolver.convertStreamName(rawName)
132-
tmpTableName = namingResolver.getTmpTableName(rawName)
132+
tmpTableName = @Suppress("deprecation") namingResolver.getTmpTableName(rawName)
133133
} else {
134-
tableName = namingResolver.getRawTableName(streamName)
135-
tmpTableName = namingResolver.getTmpTableName(streamName)
134+
tableName = @Suppress("deprecation") namingResolver.getRawTableName(streamName)
135+
tmpTableName = @Suppress("deprecation") namingResolver.getTmpTableName(streamName)
136136
}
137137
val syncMode = stream.destinationSyncMode
138138

@@ -269,7 +269,7 @@ object JdbcBufferedConsumerFactory {
269269
/** Tear down functionality */
270270
private fun onCloseFunction(typerDeduper: TyperDeduper): OnCloseFunction {
271271
return OnCloseFunction {
272-
hasFailed: Boolean,
272+
_: Boolean,
273273
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary> ->
274274
try {
275275
typerDeduper.typeAndDedupe(streamSyncSummaries)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ class JdbcInsertFlushFunction(
1515
override val optimalBatchSizeBytes: Long
1616
) : DestinationFlushFunction {
1717
@Throws(Exception::class)
18-
override fun flush(desc: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
18+
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
1919
recordWriter.accept(
20-
AirbyteStreamNameNamespacePair(desc.name, desc.namespace),
20+
AirbyteStreamNameNamespacePair(streamDescriptor.name, streamDescriptor.namespace),
2121
stream.toList()
2222
)
2323
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.commons.csv.CSVPrinter
2323
abstract class JdbcSqlOperations : SqlOperations {
2424
protected val schemaSet: MutableSet<String?> = HashSet()
2525

26-
protected constructor() {}
26+
protected constructor()
2727

2828
@Throws(Exception::class)
2929
override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) {
@@ -45,7 +45,9 @@ abstract class JdbcSqlOperations : SqlOperations {
4545
* @param e the exception to check.
4646
* @return A ConfigErrorException with a message with actionable feedback to the user.
4747
*/
48-
protected fun checkForKnownConfigExceptions(e: Exception?): Optional<ConfigErrorException> {
48+
protected open fun checkForKnownConfigExceptions(
49+
e: Exception?
50+
): Optional<ConfigErrorException> {
4951
return Optional.empty()
5052
}
5153

@@ -166,15 +168,15 @@ abstract class JdbcSqlOperations : SqlOperations {
166168
override fun insertTableQuery(
167169
database: JdbcDatabase?,
168170
schemaName: String?,
169-
srcTableName: String?,
170-
dstTableName: String?
171+
sourceTableName: String?,
172+
destinationTableName: String?
171173
): String? {
172174
return String.format(
173175
"INSERT INTO %s.%s SELECT * FROM %s.%s;\n",
174176
schemaName,
175-
dstTableName,
177+
destinationTableName,
176178
schemaName,
177-
srcTableName
179+
sourceTableName
178180
)
179181
}
180182

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import java.sql.SQLException
1414
import java.sql.Timestamp
1515
import java.time.Instant
1616
import java.util.*
17-
import java.util.function.Consumer
1817
import java.util.function.Supplier
1918

2019
object SqlOperationsUtils {
@@ -111,9 +110,7 @@ object SqlOperationsUtils {
111110
// default
112111
for (partition in Iterables.partition(records, 10000)) {
113112
val sql = StringBuilder(insertQueryComponent)
114-
partition.forEach(
115-
Consumer { r: PartialAirbyteMessage? -> sql.append(recordQueryComponent) }
116-
)
113+
partition.forEach { _ -> sql.append(recordQueryComponent) }
117114
val s = sql.toString()
118115
val s1 = s.substring(0, s.length - 2) + (if (sem) ";" else "")
119116

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt

+25-21
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,26 @@ object CopyConsumerFactory {
4747

4848
val pairToIgnoredRecordCount: MutableMap<AirbyteStreamNameNamespacePair, Long> = HashMap()
4949
return BufferedStreamConsumer(
50-
outputRecordCollector,
51-
onStartFunction(pairToIgnoredRecordCount),
52-
InMemoryRecordBufferingStrategy(
53-
recordWriterFunction(pairToCopier, sqlOperations, pairToIgnoredRecordCount),
54-
removeStagingFilePrinter(pairToCopier),
55-
GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong()
56-
),
57-
onCloseFunction(
58-
pairToCopier,
59-
database,
60-
sqlOperations,
61-
pairToIgnoredRecordCount,
62-
dataSource
63-
),
64-
catalog
65-
) { data: JsonNode? -> sqlOperations.isValidData(data) }
50+
outputRecordCollector = outputRecordCollector,
51+
onStart = onStartFunction(pairToIgnoredRecordCount),
52+
bufferingStrategy =
53+
InMemoryRecordBufferingStrategy(
54+
recordWriterFunction(pairToCopier, sqlOperations, pairToIgnoredRecordCount),
55+
removeStagingFilePrinter(pairToCopier),
56+
GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong()
57+
),
58+
onClose =
59+
onCloseFunction(
60+
pairToCopier,
61+
database,
62+
sqlOperations,
63+
pairToIgnoredRecordCount,
64+
dataSource
65+
),
66+
catalog = catalog,
67+
isValidRecord = { data: JsonNode? -> sqlOperations.isValidData(data) },
68+
defaultNamespace = null,
69+
)
6670
}
6771

6872
private fun <T> createWriteConfigs(
@@ -171,15 +175,15 @@ object CopyConsumerFactory {
171175
sqlOperations: SqlOperations,
172176
dataSource: DataSource
173177
) {
174-
var hasFailed = hasFailed
178+
var failed = hasFailed
175179
var firstException: Exception? = null
176180
val streamCopiers: List<StreamCopier?> = ArrayList(pairToCopier.values)
177181
try {
178182
val queries: MutableList<String> = ArrayList()
179183
for (copier in streamCopiers) {
180184
try {
181-
copier!!.closeStagingUploader(hasFailed)
182-
if (!hasFailed) {
185+
copier!!.closeStagingUploader(failed)
186+
if (!failed) {
183187
copier.createDestinationSchema()
184188
copier.createTemporaryTable()
185189
copier.copyStagingFileToTemporaryTable()
@@ -191,13 +195,13 @@ object CopyConsumerFactory {
191195
val message =
192196
String.format("Failed to finalize copy to temp table due to: %s", e)
193197
LOGGER.error(message)
194-
hasFailed = true
198+
failed = true
195199
if (firstException == null) {
196200
firstException = e
197201
}
198202
}
199203
}
200-
if (!hasFailed) {
204+
if (!failed) {
201205
sqlOperations.executeTransaction(db, queries)
202206
}
203207
} finally {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/SwitchingDestination.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ open class SwitchingDestination<T : Enum<T>>(
5050
@Throws(Exception::class)
5151
override fun check(config: JsonNode): AirbyteConnectionStatus? {
5252
val destinationType = configToType.apply(config)
53-
LOGGER.info("Using destination type: " + destinationType!!.name)
53+
LOGGER.info("Using destination type: " + destinationType.name)
5454
return typeToDestination[destinationType]!!.check(config)
5555
}
5656

@@ -61,7 +61,7 @@ open class SwitchingDestination<T : Enum<T>>(
6161
outputRecordCollector: Consumer<AirbyteMessage>
6262
): AirbyteMessageConsumer? {
6363
val destinationType = configToType.apply(config)
64-
LOGGER.info("Using destination type: " + destinationType!!.name)
64+
LOGGER.info("Using destination type: " + destinationType.name)
6565
return typeToDestination[destinationType]!!.getConsumer(
6666
config,
6767
catalog,
@@ -76,7 +76,7 @@ open class SwitchingDestination<T : Enum<T>>(
7676
outputRecordCollector: Consumer<AirbyteMessage>
7777
): SerializedAirbyteMessageConsumer? {
7878
val destinationType = configToType.apply(config)
79-
LOGGER.info("Using destination type: " + destinationType!!.name)
79+
LOGGER.info("Using destination type: " + destinationType.name)
8080
return typeToDestination[destinationType]!!.getSerializedMessageConsumer(
8181
config,
8282
catalog,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
5959
return !jdbcDatabase.queryBoolean(
6060
dslContext
6161
.select(
62-
DSL.field(
62+
field(
6363
DSL.exists(
6464
DSL.selectOne().from(DSL.name(id.finalNamespace, id.finalName)).limit(1)
6565
)
@@ -100,7 +100,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
100100
CheckedFunction { conn: Connection ->
101101
conn.prepareStatement(
102102
dslContext
103-
.select(DSL.field("MIN(_airbyte_extracted_at)").`as`("min_timestamp"))
103+
.select(field("MIN(_airbyte_extracted_at)").`as`("min_timestamp"))
104104
.from(DSL.name(id.rawNamespace, id.rawName))
105105
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
106106
.sql
@@ -129,7 +129,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
129129
CheckedFunction { conn: Connection ->
130130
conn.prepareStatement(
131131
dslContext
132-
.select(DSL.field("MAX(_airbyte_extracted_at)").`as`("min_timestamp"))
132+
.select(field("MAX(_airbyte_extracted_at)").`as`("min_timestamp"))
133133
.from(DSL.name(id.rawNamespace, id.rawName))
134134
.sql
135135
)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ object GeneralStagingFunctions {
155155
typerDeduper: TyperDeduper
156156
): OnCloseFunction {
157157
return OnCloseFunction {
158-
hasFailed: Boolean,
158+
_: Boolean,
159159
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary> ->
160160
// After moving data from staging area to the target table (airybte_raw) clean up the
161161
// staging

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,9 @@ object SerialFlush {
7979
return FlushBufferFunction {
8080
pair: AirbyteStreamNameNamespacePair,
8181
writer: SerializableBuffer ->
82-
log.info(
83-
"Flushing buffer for stream {} ({}) to staging",
84-
pair.name,
85-
FileUtils.byteCountToDisplaySize(writer.byteCount)
86-
)
82+
log.info {
83+
"Flushing buffer for stream ${pair.name} (${FileUtils.byteCountToDisplaySize(writer.byteCount)}) to staging"
84+
}
8785
require(pairToWriteConfig.containsKey(pair)) {
8886
String.format(
8987
"Message contained record from a stream that was not in the catalog. \ncatalog: %s",
@@ -128,7 +126,9 @@ object SerialFlush {
128126
)
129127
}
130128
} catch (e: Exception) {
131-
log.error("Failed to flush and commit buffer data into destination's raw table", e)
129+
log.error(e) {
130+
"Failed to flush and commit buffer data into destination's raw table"
131+
}
132132
throw RuntimeException(
133133
"Failed to upload buffer to stage and commit to destination",
134134
e

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,10 @@ open class SerialStagingConsumerFactory {
155155
} else {
156156
outputSchema =
157157
getOutputSchema(abStream, config["schema"].asText(), namingResolver)
158-
tableName = namingResolver.getRawTableName(streamName)
158+
tableName = @Suppress("deprecation") namingResolver.getRawTableName(streamName)
159159
}
160-
val tmpTableName = namingResolver.getTmpTableName(streamName)
160+
val tmpTableName =
161+
@Suppress("deprecation") namingResolver.getTmpTableName(streamName)
161162
val syncMode = stream.destinationSyncMode
162163

163164
val writeConfig =

airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle

-16
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
java {
2-
// TODO: rewrite code to avoid javac wornings in the first place
3-
compileJava {
4-
options.compilerArgs += "-Xlint:-deprecation"
5-
}
6-
compileTestFixturesJava {
7-
options.compilerArgs += "-Xlint:-deprecation"
8-
}
9-
}
10-
11-
compileKotlin {
12-
compilerOptions {
13-
allWarningsAsErrors = false
14-
}
15-
}
16-
171
compileTestFixturesKotlin {
182
compilerOptions {
193
allWarningsAsErrors = false

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,22 @@ abstract class BaseGcsDestination : BaseConnector(), Destination {
7070

7171
override fun getConsumer(
7272
config: JsonNode,
73-
configuredCatalog: ConfiguredAirbyteCatalog,
73+
catalog: ConfiguredAirbyteCatalog,
7474
outputRecordCollector: Consumer<AirbyteMessage>
7575
): AirbyteMessageConsumer? {
76-
val gcsConfig: GcsDestinationConfig =
77-
GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
76+
val gcsConfig: GcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config)
7877
return S3ConsumerFactory()
7978
.create(
8079
outputRecordCollector,
8180
GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig),
82-
nameTransformer,
8381
getCreateFunction(
8482
gcsConfig,
8583
Function<String, BufferStorage> { fileExtension: String ->
8684
FileBuffer(fileExtension)
8785
}
8886
),
8987
gcsConfig,
90-
configuredCatalog
88+
catalog
9189
)
9290
}
9391

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ class GcsDestinationConfig(
3232
) :
3333
S3DestinationConfig(
3434
GCS_ENDPOINT,
35-
bucketName!!,
36-
bucketPath!!,
35+
bucketName,
36+
bucketPath,
3737
bucketRegion,
3838
S3DestinationConstants.DEFAULT_PATH_FORMAT,
3939
gcsCredentialConfig.s3CredentialConfig.orElseThrow(),
40-
formatConfig!!,
40+
formatConfig,
4141
null,
4242
null,
4343
false,

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ class GcsStorageOperations(
1515
nameTransformer: NamingConventionTransformer,
1616
s3Client: AmazonS3,
1717
s3Config: S3DestinationConfig
18-
) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) {
18+
) : S3StorageOperations(nameTransformer, s3Client, s3Config) {
1919
/** GCS only supports the legacy AmazonS3#doesBucketExist method. */
2020
override fun doesBucketExist(bucket: String?): Boolean {
21-
return s3Client.doesBucketExist(bucket)
21+
@Suppress("deprecation") return s3Client.doesBucketExist(bucket)
2222
}
2323

2424
/**

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class GcsCsvWriter(
6868
this.csvPrinter =
6969
CSVPrinter(
7070
PrintWriter(outputStream, true, StandardCharsets.UTF_8),
71+
@Suppress("deprecation")
7172
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
7273
.withHeader(*csvSheetGenerator.getHeaderRow().toTypedArray<String>())
7374
)

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.kt

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class GcsParquetWriter(
6565
val formatConfig = config.formatConfig as UploadParquetFormatConfig
6666
val hadoopConfig = getHadoopConfig(config)
6767
this.parquetWriter =
68+
@Suppress("deprecation")
6869
AvroParquetWriter.builder<GenericData.Record>(
6970
HadoopOutputFile.fromPath(path, hadoopConfig)
7071
)

0 commit comments

Comments
 (0)