Skip to content

Commit 8078674

Browse files
fix compiler errors
1 parent 5381ce0 commit 8078674

File tree

22 files changed

+52
-45
lines changed

22 files changed

+52
-45
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,5 @@ interface StreamCopier {
6565
fun prepareStagingFile(): String?
6666

6767
/** @return current staging file name */
68-
val currentFile: String
68+
val currentFile: String?
6969
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ java {
88
}
99
}
1010

11+
compileKotlin {
12+
compilerOptions {
13+
allWarningsAsErrors = false
14+
}
15+
}
16+
17+
compileTestFixturesKotlin {
18+
compilerOptions {
19+
allWarningsAsErrors = false
20+
}
21+
}
22+
1123
dependencies {
1224
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
1325
implementation project(':airbyte-cdk:java:airbyte-cdk:core')

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.airbyte.cdk.integrations.base.Destination
1212
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage
1313
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
1414
import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage
15+
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
1516
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload
1617
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload
1718
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
@@ -64,7 +65,7 @@ abstract class BaseGcsDestination : BaseConnector(), Destination {
6465
outputRecordCollector,
6566
GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig),
6667
nameTransformer,
67-
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String? -> FileBuffer(fileExtension) }),
68+
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String -> FileBuffer(fileExtension) }),
6869
gcsConfig,
6970
configuredCatalog)
7071
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
2626
class GcsDestinationConfig(bucketName: String?,
2727
bucketPath: String?,
2828
bucketRegion: String?,
29-
val gcsCredentialConfig: GcsCredentialConfig?,
29+
val gcsCredentialConfig: GcsCredentialConfig,
3030
formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT,
3131
bucketName!!,
3232
bucketPath!!,
3333
bucketRegion,
3434
S3DestinationConstants.DEFAULT_PATH_FORMAT,
35-
gcsCredentialConfig.getS3CredentialConfig().orElseThrow(),
35+
gcsCredentialConfig.s3CredentialConfig.orElseThrow(),
3636
formatConfig!!,
3737
null,
3838
null,
3939
false,
4040
S3StorageOperations.DEFAULT_UPLOAD_THREADS) {
4141
override fun createS3Client(): AmazonS3 {
42-
when (gcsCredentialConfig!!.credentialType) {
42+
when (gcsCredentialConfig.credentialType) {
4343
GcsCredentialType.HMAC_KEY -> {
44-
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig?
45-
val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret())
44+
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig
45+
val awsCreds = BasicAWSCredentials(hmacKeyCredential.hmacKeyAccessId, hmacKeyCredential.hmacKeySecret)
4646

4747
return AmazonS3ClientBuilder.standard()
4848
.withEndpointConfiguration(

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
1414
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
1515
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
1616
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
17+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
1718
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1819
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
1920
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
@@ -57,7 +58,7 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,
5758

5859
this.avroRecordFactory = AvroRecordFactory(schema, converter)
5960
this.uploadManager = create(config.bucketName, outputPath, s3Client)
60-
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
61+
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
6162
.get()
6263
// We only need one output stream as we only have one input stream. This is reasonably performant.
6364
this.outputStream = uploadManager.multiPartOutputStreams[0]

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig
88
import java.util.*
99

1010
interface GcsCredentialConfig : BlobStorageCredentialConfig<GcsCredentialType?> {
11-
val s3CredentialConfig: Optional<S3CredentialConfig?>
11+
val s3CredentialConfig: Optional<S3CredentialConfig>
1212
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig {
2525
override val credentialType: GcsCredentialType
2626
get() = GcsCredentialType.HMAC_KEY
2727

28-
override val s3CredentialConfig: Optional<S3CredentialConfig?>
28+
override val s3CredentialConfig: Optional<S3CredentialConfig>
2929
get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret))
3030
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
1313
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
1414
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create
1515
import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig
16+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
1617
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1718
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
1819
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
@@ -51,7 +52,7 @@ class GcsCsvWriter(config: GcsDestinationConfig,
5152
outputPath)
5253

5354
this.uploadManager = create(config.bucketName, outputPath, s3Client)
54-
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
55+
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
5556
.get()
5657
// We only need one output stream as we only have one input stream. This is reasonably performant.
5758
this.outputStream = uploadManager.multiPartOutputStreams[0]

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt

-5
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,6 @@ abstract class GcsStreamCopier(protected val stagingFolder: String,
164164
get() =// TODO need to update this method when updating whole class for using GcsWriter
165165
null
166166

167-
@VisibleForTesting
168-
fun getGcsStagingFiles(): Set<String> {
169-
return gcsStagingFiles
170-
}
171-
172167
@Throws(SQLException::class)
173168
abstract fun copyGcsCsvFileIntoTable(database: JdbcDatabase?,
174169
gcsFileLocation: String?,

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ abstract class GcsStreamCopierFactory : StreamCopierFactory<GcsConfig?> {
2222
/**
2323
* Used by the copy consumer.
2424
*/
25-
override fun create(configuredSchema: String?,
25+
fun create(configuredSchema: String?,
2626
gcsConfig: GcsConfig,
2727
stagingFolder: String?,
2828
configuredStream: ConfiguredAirbyteStream?,

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.google.common.collect.Lists
88
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
99
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
1010
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig
11+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
1112
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1213
import io.airbyte.commons.json.Jsons
1314
import org.apache.avro.file.DataFileConstants
@@ -108,7 +109,7 @@ internal class GcsAvroFormatConfigTest {
108109
.get()
109110

110111
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
111-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
112+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
112113
}
113114

114115
@Test
@@ -126,6 +127,6 @@ internal class GcsAvroFormatConfigTest {
126127
.get()
127128

128129
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
129-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
130+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
130131
}
131132
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
88
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
99
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
1010
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.Companion.fromValue
11+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
1112
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1213
import io.airbyte.commons.json.Jsons
1314
import org.apache.commons.lang3.reflect.FieldUtils
@@ -45,7 +46,7 @@ class GcsCsvFormatConfigTest {
4546
.get()
4647

4748
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
48-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
49+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
4950
}
5051

5152
@Test
@@ -63,6 +64,6 @@ class GcsCsvFormatConfigTest {
6364
.get()
6465

6566
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
66-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
67+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
6768
}
6869
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs.jsonl
66
import com.amazonaws.services.s3.internal.Constants
77
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
88
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
9+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
910
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1011
import io.airbyte.commons.json.Jsons
1112
import org.apache.commons.lang3.reflect.FieldUtils
@@ -33,7 +34,7 @@ class GcsJsonlFormatConfigTest {
3334
.get()
3435

3536
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
36-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
37+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
3738
}
3839

3940
@Test
@@ -51,6 +52,6 @@ class GcsJsonlFormatConfigTest {
5152
.get()
5253

5354
val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
54-
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
55+
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
5556
}
5657
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs
66
import com.fasterxml.jackson.databind.JsonNode
77
import io.airbyte.cdk.integrations.destination.s3.S3Format
88
import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType
9-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
109
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1110
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider
1211
import io.airbyte.commons.json.Jsons
@@ -102,8 +101,7 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs
102101

103102
@Throws(IOException::class)
104103
private fun readMessagesFromFile(messagesFilename: String): List<AirbyteMessage> {
105-
return MoreResources.readResource(messagesFilename).lines()
106-
.map<AirbyteMessage>(Function { record: String? -> Jsons.deserialize(record, AirbyteMessage::class.java) }).collect<List<AirbyteMessage>, Any>(Collectors.toList<AirbyteMessage>())
104+
return MoreResources.readResource(messagesFilename).lines().map { Jsons.deserialize(it, AirbyteMessage::class.java) }
107105
}
108106

109107
@Throws(Exception::class)

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
99
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
1010
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
1111
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
12-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
1312
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1413
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
1514
import io.airbyte.commons.json.Jsons
@@ -46,7 +45,7 @@ abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationA
4645
DataFileReader<GenericData.Record>(
4746
SeekableByteArrayInput(`object`.objectContent.readAllBytes()),
4847
GenericDatumReader<GenericData.Record>()).use { dataFileReader ->
49-
val jsonReader: ObjectReader = GcsDestinationAcceptanceTest.Companion.MAPPER.reader()
48+
val jsonReader: ObjectReader = MAPPER.reader()
5049
while (dataFileReader.hasNext()) {
5150
val record = dataFileReader.next()
5251
val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record)

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode
99
import io.airbyte.cdk.integrations.base.JavaBaseConstants
1010
import io.airbyte.cdk.integrations.destination.s3.S3Format
1111
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
12-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
1312
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1413
import io.airbyte.commons.json.Jsons
1514
import org.apache.commons.csv.CSVFormat
@@ -82,7 +81,7 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes
8281
}
8382

8483
private fun getJsonNode(input: Map<String, String>, fieldTypes: Map<String, String>): JsonNode {
85-
val json: ObjectNode = GcsDestinationAcceptanceTest.Companion.MAPPER.createObjectNode()
84+
val json: ObjectNode = MAPPER.createObjectNode()
8685

8786
if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) {
8887
return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA])

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs
66
import com.amazonaws.services.s3.model.S3Object
77
import com.fasterxml.jackson.databind.JsonNode
88
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
9-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
109
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1110
import io.airbyte.commons.json.Jsons
1211
import java.io.IOException

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import com.amazonaws.services.s3.model.S3Object
77
import com.fasterxml.jackson.databind.JsonNode
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
99
import io.airbyte.cdk.integrations.destination.s3.S3Format
10-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
1110
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1211
import io.airbyte.commons.json.Jsons
1312
import java.io.BufferedReader

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.gcs
55

66
import com.amazonaws.services.s3.model.S3Object
77
import com.fasterxml.jackson.databind.JsonNode
8-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
98
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
109
import io.airbyte.commons.json.Jsons
1110
import java.io.BufferedReader

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
1111
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig
1212
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
1313
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
14-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
1514
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1615
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
1716
import io.airbyte.commons.json.Jsons

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import com.google.common.collect.ImmutableMap
1313
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
1414
import io.airbyte.cdk.integrations.destination.s3.S3Format
1515
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
16-
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
1716
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest
1817
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
1918
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator
@@ -46,9 +45,9 @@ import java.util.stream.Collectors
4645
*/
4746
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) : DestinationAcceptanceTest() {
4847
protected var configJson: JsonNode? = null
49-
protected var config: GcsDestinationConfig? = null
50-
protected var s3Client: AmazonS3? = null
51-
protected var nameTransformer: NamingConventionTransformer? = null
48+
protected lateinit var config: GcsDestinationConfig
49+
protected lateinit var s3Client: AmazonS3
50+
protected lateinit var nameTransformer: NamingConventionTransformer
5251
protected var s3StorageOperations: S3StorageOperations? = null
5352

5453
protected val baseConfigJson: JsonNode
@@ -62,7 +61,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format
6261
return configJson!!
6362
}
6463

65-
override fun getDefaultSchema(config: JsonNode): String {
64+
override fun getDefaultSchema(config: JsonNode): String? {
6665
if (config.has("gcs_bucket_path")) {
6766
return config["gcs_bucket_path"].asText()
6867
}
@@ -245,6 +244,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format
245244

246245
companion object {
247246
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsDestinationAcceptanceTest::class.java)
247+
@JvmStatic
248248
protected val MAPPER: ObjectMapper = MoreMappers.initMapper()
249249

250250
protected const val SECRET_FILE_PATH: String = "secrets/config.json"

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ open class S3DestinationConfig {
4444
private val lock = Any()
4545
var s3Client: AmazonS3
4646
get() {
47-
synchronized(lock) {
48-
if (s3Client == null) {
49-
return resetS3Client()
47+
if (s3Client == null) {
48+
synchronized(lock) {
49+
if (s3Client == null) {
50+
s3Client = resetS3Client()
51+
}
5052
}
51-
return s3Client
5253
}
54+
return s3Client
5355
}
5456
private set
5557

@@ -84,7 +86,7 @@ open class S3DestinationConfig {
8486
pathFormat: String,
8587
credentialConfig: S3CredentialConfig,
8688
formatConfig: S3FormatConfig,
87-
s3Client: AmazonS3,
89+
s3Client: AmazonS3?,
8890
fileNamePattern: String?,
8991
checkIntegrity: Boolean,
9092
uploadThreadsCount: Int) {
@@ -95,7 +97,7 @@ open class S3DestinationConfig {
9597
this.pathFormat = pathFormat
9698
this.s3CredentialConfig = credentialConfig
9799
this.formatConfig = formatConfig
98-
this.s3Client = s3Client
100+
this.s3Client = s3Client ?: resetS3Client()
99101
this.fileNamePattern = fileNamePattern
100102
this.isCheckIntegrity = checkIntegrity
101103
this.uploadThreadsCount = uploadThreadsCount

0 commit comments

Comments
 (0)