Skip to content

Commit 9cd72c3

Browse files
enable spotbugs for s3-destinations submodule (#36706)
1 parent 9cdbf00 commit 9cd72c3

File tree

6 files changed

+33
-37
lines changed

6 files changed

+33
-37
lines changed

airbyte-cdk/java/airbyte-cdk/s3-destinations/build.gradle

-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ compileKotlin {
3232
}
3333
}
3434

35-
spotbugsTest.enabled = false
36-
spotbugsTestFixtures.enabled = false
37-
3835
dependencies {
3936
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies')
4037
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaTypeTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ class JsonSchemaTypeTest {
1616
@ParameterizedTest
1717
@ArgumentsSource(JsonSchemaTypeProvider::class)
1818
fun testFromJsonSchemaType(
19-
type: String?,
19+
type: String,
2020
airbyteType: String?,
2121
expectedJsonSchemaType: JsonSchemaType?
2222
) {
23-
Assertions.assertEquals(expectedJsonSchemaType, fromJsonSchemaType(type!!, airbyteType))
23+
Assertions.assertEquals(expectedJsonSchemaType, fromJsonSchemaType(type, airbyteType))
2424
}
2525

2626
class JsonSchemaTypeProvider : ArgumentsProvider {

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ abstract class S3AvroParquetDestinationAcceptanceTest protected constructor(s3Fo
115115

116116
@Throws(Exception::class)
117117
protected abstract fun retrieveDataTypesFromPersistedFiles(
118-
streamName: String?,
119-
namespace: String?
120-
): Map<String?, Set<Schema.Type?>?>
118+
streamName: String,
119+
namespace: String
120+
): Map<String, Set<Schema.Type>>
121121

122122
protected fun getTypes(record: GenericData.Record): Map<String, Set<Schema.Type>> {
123123
val fieldList =

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseAvroDestinationAcceptanceTest.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :
4343
namespace: String,
4444
streamSchema: JsonNode
4545
): List<JsonNode> {
46-
val nameUpdater =
47-
AvroRecordHelper.getFieldNameUpdater(streamName!!, namespace, streamSchema)
46+
val nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema)
4847

4948
val objectSummaries = getAllSyncedObjects(streamName, namespace)
5049
val jsonRecords: MutableList<JsonNode> = LinkedList()
@@ -75,11 +74,11 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :
7574

7675
@Throws(Exception::class)
7776
override fun retrieveDataTypesFromPersistedFiles(
78-
streamName: String?,
79-
namespace: String?
80-
): Map<String?, Set<Schema.Type?>?> {
77+
streamName: String,
78+
namespace: String
79+
): Map<String, Set<Schema.Type>> {
8180
val objectSummaries = getAllSyncedObjects(streamName, namespace)
82-
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
81+
val resultDataTypes: MutableMap<String, Set<Schema.Type>> = HashMap()
8382

8483
for (objectSummary in objectSummaries) {
8584
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseParquetDestinationAcceptanceTest.kt

+10-11
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,13 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
3333
namespace: String,
3434
streamSchema: JsonNode
3535
): List<JsonNode> {
36-
val nameUpdater =
37-
AvroRecordHelper.getFieldNameUpdater(streamName!!, namespace, streamSchema)
36+
val nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema)
3837

3938
val objectSummaries = getAllSyncedObjects(streamName, namespace)
4039
val jsonRecords: MutableList<JsonNode> = LinkedList()
4140

42-
for (objectSummary in objectSummaries!!) {
43-
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
41+
for (objectSummary in objectSummaries) {
42+
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
4443
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
4544
val path = Path(uri)
4645
val hadoopConfig = S3ParquetWriter.getHadoopConfig(s3DestinationConfig)
@@ -68,14 +67,14 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
6867

6968
@Throws(Exception::class)
7069
override fun retrieveDataTypesFromPersistedFiles(
71-
streamName: String?,
72-
namespace: String?
73-
): Map<String?, Set<Schema.Type?>?> {
70+
streamName: String,
71+
namespace: String
72+
): Map<String, Set<Schema.Type>> {
7473
val objectSummaries = getAllSyncedObjects(streamName, namespace)
75-
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
74+
val resultDataTypes: MutableMap<String, Set<Schema.Type>> = HashMap()
7675

77-
for (objectSummary in objectSummaries!!) {
78-
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
76+
for (objectSummary in objectSummaries) {
77+
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
7978
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
8079
val path = Path(uri)
8180
val hadoopConfig = S3ParquetWriter.getHadoopConfig(s3DestinationConfig)
@@ -87,7 +86,7 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
8786
var record: GenericData.Record?
8887
while ((parquetReader.read().also { record = it }) != null) {
8988
val actualDataTypes = getTypes(record!!)
90-
resultDataTypes.putAll(actualDataTypes!!)
89+
resultDataTypes.putAll(actualDataTypes)
9190
}
9291
}
9392
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt

+13-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.stream.Collectors
2323
import org.apache.commons.lang3.RandomStringUtils
2424
import org.joda.time.DateTime
2525
import org.joda.time.DateTimeZone
26+
import org.mockito.Mockito.mock
2627
import org.slf4j.Logger
2728
import org.slf4j.LoggerFactory
2829

@@ -40,9 +41,9 @@ abstract class S3DestinationAcceptanceTest
4041
protected constructor(protected val outputFormat: S3Format) : DestinationAcceptanceTest() {
4142
protected val secretFilePath: String = "secrets/config.json"
4243
protected var configJson: JsonNode? = null
43-
protected lateinit var s3DestinationConfig: S3DestinationConfig
44+
protected var s3DestinationConfig: S3DestinationConfig = mock()
4445
protected var s3Client: AmazonS3? = null
45-
protected lateinit var s3nameTransformer: NamingConventionTransformer
46+
protected var s3nameTransformer: NamingConventionTransformer = mock()
4647
protected var s3StorageOperations: S3StorageOperations? = null
4748

4849
protected val baseConfigJson: JsonNode
@@ -68,23 +69,23 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta
6869

6970
/** Helper method to retrieve all synced objects inside the configured bucket path. */
7071
protected fun getAllSyncedObjects(
71-
streamName: String?,
72-
namespace: String?
72+
streamName: String,
73+
namespace: String
7374
): List<S3ObjectSummary> {
74-
val namespaceStr = s3nameTransformer!!.getNamespace(namespace!!)
75-
val streamNameStr = s3nameTransformer!!.getIdentifier(streamName!!)
75+
val namespaceStr = s3nameTransformer.getNamespace(namespace)
76+
val streamNameStr = s3nameTransformer.getIdentifier(streamName)
7677
val outputPrefix =
7778
s3StorageOperations!!.getBucketObjectPath(
7879
namespaceStr,
7980
streamNameStr,
8081
DateTime.now(DateTimeZone.UTC),
81-
s3DestinationConfig!!.pathFormat!!
82+
s3DestinationConfig.pathFormat!!
8283
)
8384
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
8485
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
8586
val objectSummaries =
8687
s3Client!!
87-
.listObjects(s3DestinationConfig!!.bucketName, parentFolder)
88+
.listObjects(s3DestinationConfig.bucketName, parentFolder)
8889
.objectSummaries
8990
.stream()
9091
.filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") }
@@ -141,7 +142,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta
141142
val keysToDelete: MutableList<DeleteObjectsRequest.KeyVersion> = LinkedList()
142143
val objects =
143144
s3Client!!
144-
.listObjects(s3DestinationConfig!!.bucketName, s3DestinationConfig!!.bucketPath)
145+
.listObjects(s3DestinationConfig.bucketName, s3DestinationConfig.bucketPath)
145146
.objectSummaries
146147
for (`object` in objects) {
147148
keysToDelete.add(DeleteObjectsRequest.KeyVersion(`object`.key))
@@ -150,12 +151,12 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta
150151
if (keysToDelete.size > 0) {
151152
LOGGER.info(
152153
"Tearing down test bucket path: {}/{}",
153-
s3DestinationConfig!!.bucketName,
154-
s3DestinationConfig!!.bucketPath
154+
s3DestinationConfig.bucketName,
155+
s3DestinationConfig.bucketPath
155156
)
156157
val result =
157158
s3Client!!.deleteObjects(
158-
DeleteObjectsRequest(s3DestinationConfig!!.bucketName).withKeys(keysToDelete)
159+
DeleteObjectsRequest(s3DestinationConfig.bucketName).withKeys(keysToDelete)
159160
)
160161
LOGGER.info("Deleted {} file(s).", result.deletedObjects.size)
161162
}

0 commit comments

Comments
 (0)