Skip to content

Commit fad0737

Browse files
enable spotbugs for db-sources submodule (#36705)
1 parent 4d77401 commit fad0737

File tree

14 files changed

+41
-45
lines changed

14 files changed

+41
-45
lines changed

airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle

-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ compileKotlin.compilerOptions.allWarningsAsErrors = false
1515
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
1616
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
1717

18-
spotbugsTestFixtures.enabled = false
19-
spotbugsTest.enabled = false
20-
2118

2219
// Convert yaml to java: relationaldb.models
2320
jsonSchema2Pojo {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ interface CdcTargetPosition<T> {
4747
* @param sourceOffset source offset params from heartbeat change event
4848
* @return the heartbeat position in a heartbeat change event or null
4949
*/
50-
fun extractPositionFromHeartbeatOffset(sourceOffset: Map<String?, *>?): T
50+
fun extractPositionFromHeartbeatOffset(sourceOffset: Map<String?, *>): T
5151

5252
/**
5353
* This function checks if the event we are processing in the loop is already behind the offset

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import org.mockito.kotlin.mock
1919
class DebeziumMessageProducerTest {
2020
private var producer: DebeziumMessageProducer<*>? = null
2121

22-
lateinit var cdcStateHandler: CdcStateHandler
23-
lateinit var targetPosition: CdcTargetPosition<Any>
24-
lateinit var eventConverter: DebeziumEventConverter
25-
lateinit var offsetManager: AirbyteFileOffsetBackingStore
26-
lateinit var schemaHistoryManager: AirbyteSchemaHistoryStorage
22+
var cdcStateHandler: CdcStateHandler = mock()
23+
var targetPosition: CdcTargetPosition<Any> = mock()
24+
var eventConverter: DebeziumEventConverter = mock()
25+
var offsetManager: AirbyteFileOffsetBackingStore = mock()
26+
var schemaHistoryManager: AirbyteSchemaHistoryStorage = mock()
2727

2828
@BeforeEach
2929
fun setUp() {

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class DebeziumRecordIteratorTest {
2626
}
2727

2828
override fun extractPositionFromHeartbeatOffset(
29-
sourceOffset: Map<String?, *>?
29+
sourceOffset: Map<String?, *>
3030
): Long {
3131
return sourceOffset!!["lsn"] as Long
3232
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,16 @@ internal class DefaultJdbcSourceAcceptanceTest :
5959

6060
fun getConfigWithConnectionProperties(
6161
psqlDb: PostgreSQLContainer<*>,
62-
dbName: String?,
63-
additionalParameters: String?
62+
dbName: String,
63+
additionalParameters: String
6464
): JsonNode {
6565
return Jsons.jsonNode(
6666
ImmutableMap.builder<Any, Any?>()
6767
.put(JdbcUtils.HOST_KEY, resolveHost(psqlDb))
6868
.put(JdbcUtils.PORT_KEY, resolvePort(psqlDb))
6969
.put(JdbcUtils.DATABASE_KEY, dbName)
7070
.put(JdbcUtils.SCHEMAS_KEY, List.of(SCHEMA_NAME))
71-
.put(JdbcUtils.USERNAME_KEY, psqlDb!!.username)
71+
.put(JdbcUtils.USERNAME_KEY, psqlDb.username)
7272
.put(JdbcUtils.PASSWORD_KEY, psqlDb.password)
7373
.put(JdbcUtils.CONNECTION_PROPERTIES_KEY, additionalParameters)
7474
.build()

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorForTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
99

1010
class SourceStateIteratorForTest<T>(
1111
messageIterator: Iterator<T>,
12-
stream: ConfiguredAirbyteStream,
12+
stream: ConfiguredAirbyteStream?,
1313
sourceStateMessageProducer: SourceStateMessageProducer<T>,
1414
stateEmitFrequency: StateEmitFrequency
1515
) :

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import org.mockito.kotlin.any
1515
import org.mockito.kotlin.eq
1616

1717
class SourceStateIteratorTest {
18-
lateinit var mockProducer: SourceStateMessageProducer<AirbyteMessage>
19-
lateinit var messageIterator: Iterator<AirbyteMessage>
20-
lateinit var stream: ConfiguredAirbyteStream
18+
var mockProducer: SourceStateMessageProducer<AirbyteMessage> = mock()
19+
var messageIterator: Iterator<AirbyteMessage> = mock()
20+
var stream: ConfiguredAirbyteStream = mock()
2121

2222
var sourceStateIterator: SourceStateIteratorForTest<*>? = null
2323

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.slf4j.Logger
2525
import org.slf4j.LoggerFactory
2626

2727
abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
28-
protected lateinit var testdb: T
28+
protected var testdb: T = createTestDatabase()
2929

3030
protected fun createTableSqlFmt(): String {
3131
return "CREATE TABLE %s.%s(%s);"

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.mockito.Mockito
3939
"The static variables are updated in subclasses for convenience, and cannot be final."
4040
)
4141
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
42-
protected lateinit var testdb: T
42+
protected var testdb: T = createTestDatabase()
4343

4444
protected fun streamName(): String {
4545
return TABLE_NAME
@@ -314,8 +314,8 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
314314
filteredCatalog.streams
315315
.stream()
316316
.filter { stream: AirbyteStream ->
317-
TEST_SCHEMAS.stream().anyMatch { schemaName: String? ->
318-
stream.namespace.startsWith(schemaName!!)
317+
TEST_SCHEMAS.stream().anyMatch { schemaName: String ->
318+
stream.namespace.startsWith(schemaName)
319319
}
320320
}
321321
.collect(Collectors.toList())

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ abstract class AbstractSourceConnectorTest {
304304
protected fun runReadVerifyNumberOfReceivedMsgs(
305305
catalog: ConfiguredAirbyteCatalog,
306306
state: JsonNode?,
307-
mapOfExpectedRecordsCount: MutableMap<String?, Int>
308-
): Map<String?, Int> {
307+
mapOfExpectedRecordsCount: MutableMap<String, Int>
308+
): Map<String, Int> {
309309
val sourceConfig =
310310
WorkerSourceConfig()
311311
.withSourceConnectionConfiguration(config)

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() {
148148
val recordMessages =
149149
allMessages!!
150150
.stream()
151-
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.RECORD }
151+
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD }
152152
.toList()
153153
val expectedValues: MutableMap<String?, MutableList<String?>?> = HashMap()
154154
val missedValuesByStream: MutableMap<String?, ArrayList<MissedRecords>> = HashMap()

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
4747
get() = runExecutable(Command.GET_STATE)
4848

4949
@Throws(IOException::class)
50-
override fun assertFullRefreshMessages(allMessages: List<AirbyteMessage?>?) {
50+
override fun assertFullRefreshMessages(allMessages: List<AirbyteMessage>) {
5151
val regexTests =
5252
Streams.stream(
5353
runExecutable(Command.GET_REGEX_TESTS).withArray<JsonNode>("tests").elements()
@@ -57,7 +57,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
5757
val stringMessages =
5858
allMessages!!
5959
.stream()
60-
.map { `object`: AirbyteMessage? -> Jsons.serialize(`object`) }
60+
.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) }
6161
.toList()
6262
LOGGER.info("Running " + regexTests.size + " regex tests...")
6363
regexTests.forEach(
@@ -155,7 +155,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
155155
private val LOGGER: Logger = LoggerFactory.getLogger(PythonSourceAcceptanceTest::class.java)
156156
private const val OUTPUT_FILENAME = "output.json"
157157

158-
lateinit var IMAGE_NAME: String
158+
var IMAGE_NAME: String = "dummy_image_name"
159159
var PYTHON_CONTAINER_NAME: String? = null
160160
}
161161
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,15 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
131131
@Test
132132
@Throws(Exception::class)
133133
fun testDiscover() {
134-
val discoverOutput = runDiscover()
134+
runDiscover()
135135
val discoveredCatalog = lastPersistedCatalog
136136
Assertions.assertNotNull(discoveredCatalog, "Expected discover to produce a catalog")
137137
verifyCatalog(discoveredCatalog)
138138
}
139139

140140
/** Override this method to check the actual catalog. */
141141
@Throws(Exception::class)
142-
protected fun verifyCatalog(catalog: AirbyteCatalog?) {
142+
protected open fun verifyCatalog(catalog: AirbyteCatalog?) {
143143
// do nothing by default
144144
}
145145

@@ -167,7 +167,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
167167

168168
/** Override this method to perform more specific assertion on the messages. */
169169
@Throws(Exception::class)
170-
protected open fun assertFullRefreshMessages(allMessages: List<AirbyteMessage?>?) {
170+
protected open fun assertFullRefreshMessages(allMessages: List<AirbyteMessage>) {
171171
// do nothing by default
172172
}
173173

@@ -248,8 +248,8 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
248248
val stateMessages =
249249
airbyteMessages
250250
.stream()
251-
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.STATE }
252-
.map { obj: AirbyteMessage? -> obj!!.state }
251+
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.STATE }
252+
.map { obj: AirbyteMessage -> obj.state }
253253
.collect(Collectors.toList())
254254
Assertions.assertFalse(
255255
recordMessages.isEmpty(),
@@ -446,12 +446,12 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
446446

447447
@JvmStatic
448448
protected fun filterRecords(
449-
messages: Collection<AirbyteMessage?>?
449+
messages: Collection<AirbyteMessage>
450450
): List<AirbyteRecordMessage> {
451-
return messages!!
451+
return messages
452452
.stream()
453-
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.RECORD }
454-
.map { obj: AirbyteMessage? -> obj!!.record }
453+
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD }
454+
.map { obj: AirbyteMessage -> obj.record }
455455
.collect(Collectors.toList())
456456
}
457457
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt

+8-9
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,20 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest
7979
validateNumberOfReceivedMsgs(checkStatusMap)
8080
}
8181

82-
protected fun validateNumberOfReceivedMsgs(checkStatusMap: Map<String?, Int?>?) {
82+
protected fun validateNumberOfReceivedMsgs(checkStatusMap: Map<String, Int>) {
8383
// Iterate through all streams map and check for streams where
8484
val failedStreamsMap =
85-
checkStatusMap!!
86-
.entries
85+
checkStatusMap.entries
8786
.stream()
88-
.filter { el: Map.Entry<String?, Int?> -> el.value != 0 }
87+
.filter { el: Map.Entry<String, Int> -> el.value != 0 }
8988
.collect(
9089
Collectors.toMap(
91-
Function { obj: Map.Entry<String?, Int?> -> obj.key },
92-
Function { obj: Map.Entry<String?, Int?> -> obj.value }
90+
Function { obj: Map.Entry<String, Int> -> obj.key },
91+
Function { obj: Map.Entry<String, Int> -> obj.value }
9392
)
9493
)
9594

96-
if (!failedStreamsMap.isEmpty()) {
95+
if (failedStreamsMap.isNotEmpty()) {
9796
Assertions.fail<Any>("Non all messages were delivered. $failedStreamsMap")
9897
}
9998
c.info("Finished all checks, no issues found for {} of streams", checkStatusMap.size)
@@ -102,8 +101,8 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest
102101
protected fun prepareMapWithExpectedRecords(
103102
streamNumber: Int,
104103
expectedRecordsNumberInEachStream: Int
105-
): MutableMap<String?, Int> {
106-
val resultMap: MutableMap<String?, Int> = HashMap() // streamName&expected records in stream
104+
): MutableMap<String, Int> {
105+
val resultMap: MutableMap<String, Int> = HashMap() // streamName&expected records in stream
107106

108107
for (currentStream in 0 until streamNumber) {
109108
val streamName = String.format(testStreamNameTemplate, currentStream)

0 commit comments

Comments
 (0)