Skip to content

Commit 6f28efc

Browse files
fix compiler errors
1 parent f3a4215 commit 6f28efc

37 files changed

+866
-517
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.9
1+
version=0.28.10

airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ java {
88
}
99
}
1010

11+
compileKotlin {
12+
compilerOptions {
13+
allWarningsAsErrors = false
14+
}
15+
}
16+
17+
compileTestFixturesKotlin {
18+
compilerOptions {
19+
allWarningsAsErrors = false
20+
}
21+
}
22+
1123
dependencies {
1224
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
1325
implementation project(':airbyte-cdk:java:airbyte-cdk:core')

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.java

Whitespace-only changes.

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.java

Whitespace-only changes.

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.java

Whitespace-only changes.

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt

+44-21
Original file line numberDiff line numberDiff line change
@@ -12,66 +12,89 @@ import io.airbyte.cdk.integrations.base.Destination
1212
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage
1313
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
1414
import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage
15+
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
1516
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload
1617
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload
1718
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
1819
import io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory.Companion.getCreateFunction
1920
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
2021
import io.airbyte.protocol.models.v0.AirbyteMessage
2122
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
22-
import org.slf4j.Logger
23-
import org.slf4j.LoggerFactory
2423
import java.util.function.Consumer
2524
import java.util.function.Function
25+
import org.slf4j.Logger
26+
import org.slf4j.LoggerFactory
2627

2728
abstract class BaseGcsDestination : BaseConnector(), Destination {
2829
private val nameTransformer: NamingConventionTransformer = GcsNameTransformer()
2930

3031
override fun check(config: JsonNode): AirbyteConnectionStatus? {
3132
try {
32-
val destinationConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
33-
val s3Client = destinationConfig.s3Client
33+
val destinationConfig: GcsDestinationConfig =
34+
GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
35+
val s3Client = destinationConfig.getS3Client()
3436

3537
// Test single upload (for small files) permissions
3638
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)
3739

3840
// Test multipart upload with stream transfer manager
39-
testMultipartUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)
41+
testMultipartUpload(
42+
s3Client,
43+
destinationConfig.bucketName,
44+
destinationConfig.bucketPath
45+
)
4046

4147
return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
4248
} catch (e: AmazonS3Exception) {
4349
LOGGER.error("Exception attempting to access the Gcs bucket", e)
4450
val message = getErrorMessage(e.errorCode, 0, e.message, e)
4551
emitConfigErrorTrace(e, message)
4652
return AirbyteConnectionStatus()
47-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
48-
.withMessage(message)
53+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
54+
.withMessage(message)
4955
} catch (e: Exception) {
50-
LOGGER.error("Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " + EXPECTED_ROLES, e)
56+
LOGGER.error(
57+
"Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " +
58+
EXPECTED_ROLES,
59+
e
60+
)
5161
emitConfigErrorTrace(e, e.message)
5262
return AirbyteConnectionStatus()
53-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
54-
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
55-
.message)
63+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
64+
.withMessage(
65+
"Could not connect to the Gcs bucket with the provided configuration. \n" +
66+
e.message
67+
)
5668
}
5769
}
5870

59-
override fun getConsumer(config: JsonNode,
60-
configuredCatalog: ConfiguredAirbyteCatalog?,
61-
outputRecordCollector: Consumer<AirbyteMessage?>?): AirbyteMessageConsumer? {
62-
val gcsConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
63-
return S3ConsumerFactory().create(
71+
override fun getConsumer(
72+
config: JsonNode,
73+
configuredCatalog: ConfiguredAirbyteCatalog,
74+
outputRecordCollector: Consumer<AirbyteMessage?>?
75+
): AirbyteMessageConsumer? {
76+
val gcsConfig: GcsDestinationConfig =
77+
GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
78+
return S3ConsumerFactory()
79+
.create(
6480
outputRecordCollector,
65-
GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig),
81+
GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig),
6682
nameTransformer,
67-
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String? -> FileBuffer(fileExtension) }),
83+
getCreateFunction(
84+
gcsConfig,
85+
Function<String, BufferStorage> { fileExtension: String ->
86+
FileBuffer(fileExtension)
87+
}
88+
),
6889
gcsConfig,
69-
configuredCatalog)
90+
configuredCatalog
91+
)
7092
}
7193

7294
companion object {
7395
private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsDestination::class.java)
74-
const val EXPECTED_ROLES: String = ("storage.multipartUploads.abort, storage.multipartUploads.create, "
75-
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list")
96+
const val EXPECTED_ROLES: String =
97+
("storage.multipartUploads.abort, storage.multipartUploads.create, " +
98+
"storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list")
7699
}
77100
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt

+34-21
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,47 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
2323
* Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config.
2424
* This may change in the future.
2525
*/
26-
class GcsDestinationConfig(bucketName: String?,
27-
bucketPath: String?,
28-
bucketRegion: String?,
29-
val gcsCredentialConfig: GcsCredentialConfig?,
30-
formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT,
26+
class GcsDestinationConfig(
27+
bucketName: String,
28+
bucketPath: String,
29+
bucketRegion: String?,
30+
val gcsCredentialConfig: GcsCredentialConfig,
31+
formatConfig: S3FormatConfig
32+
) :
33+
S3DestinationConfig(
34+
GCS_ENDPOINT,
3135
bucketName!!,
3236
bucketPath!!,
3337
bucketRegion,
3438
S3DestinationConstants.DEFAULT_PATH_FORMAT,
35-
gcsCredentialConfig.getS3CredentialConfig().orElseThrow(),
39+
gcsCredentialConfig.s3CredentialConfig.orElseThrow(),
3640
formatConfig!!,
3741
null,
3842
null,
3943
false,
40-
S3StorageOperations.DEFAULT_UPLOAD_THREADS) {
44+
S3StorageOperations.DEFAULT_UPLOAD_THREADS
45+
) {
4146
override fun createS3Client(): AmazonS3 {
42-
when (gcsCredentialConfig!!.credentialType) {
47+
when (gcsCredentialConfig.credentialType) {
4348
GcsCredentialType.HMAC_KEY -> {
44-
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig?
45-
val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret())
49+
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig
50+
val awsCreds =
51+
BasicAWSCredentials(
52+
hmacKeyCredential.hmacKeyAccessId,
53+
hmacKeyCredential.hmacKeySecret
54+
)
4655

4756
return AmazonS3ClientBuilder.standard()
48-
.withEndpointConfiguration(
49-
AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion))
50-
.withCredentials(AWSStaticCredentialsProvider(awsCreds))
51-
.build()
57+
.withEndpointConfiguration(
58+
AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion)
59+
)
60+
.withCredentials(AWSStaticCredentialsProvider(awsCreds))
61+
.build()
5262
}
53-
54-
else -> throw IllegalArgumentException("Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name)
63+
else ->
64+
throw IllegalArgumentException(
65+
"Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name
66+
)
5567
}
5668
}
5769

@@ -60,11 +72,12 @@ class GcsDestinationConfig(bucketName: String?,
6072

6173
fun getGcsDestinationConfig(config: JsonNode): GcsDestinationConfig {
6274
return GcsDestinationConfig(
63-
config["gcs_bucket_name"].asText(),
64-
config["gcs_bucket_path"].asText(),
65-
config["gcs_bucket_region"].asText(),
66-
GcsCredentialConfigs.getCredentialConfig(config),
67-
getS3FormatConfig(config))
75+
config["gcs_bucket_name"].asText(),
76+
config["gcs_bucket_path"].asText(),
77+
config["gcs_bucket_region"].asText(),
78+
GcsCredentialConfigs.getCredentialConfig(config),
79+
getS3FormatConfig(config)
80+
)
6881
}
6982
}
7083
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt

+13-9
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,25 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
1111
import org.slf4j.Logger
1212
import org.slf4j.LoggerFactory
1313

14-
class GcsStorageOperations(nameTransformer: NamingConventionTransformer?,
15-
s3Client: AmazonS3?,
16-
s3Config: S3DestinationConfig?) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) {
17-
/**
18-
* GCS only supports the legacy AmazonS3#doesBucketExist method.
19-
*/
14+
class GcsStorageOperations(
15+
nameTransformer: NamingConventionTransformer,
16+
s3Client: AmazonS3,
17+
s3Config: S3DestinationConfig
18+
) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) {
19+
/** GCS only supports the legacy AmazonS3#doesBucketExist method. */
2020
override fun doesBucketExist(bucket: String?): Boolean {
2121
return s3Client.doesBucketExist(bucket)
2222
}
2323

2424
/**
25-
* This method is overridden because GCS doesn't accept request to delete multiple objects. The only
26-
* difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject.
25+
* This method is overridden because GCS doesn't accept request to delete multiple objects. The
26+
* only difference is that the AmazonS3#deleteObjects method is replaced with
27+
* AmazonS3#deleteObject.
2728
*/
28-
override fun cleanUpObjects(bucket: String?, keysToDelete: List<DeleteObjectsRequest.KeyVersion>) {
29+
override fun cleanUpObjects(
30+
bucket: String?,
31+
keysToDelete: List<DeleteObjectsRequest.KeyVersion>
32+
) {
2933
for (keyToDelete in keysToDelete) {
3034
LOGGER.info("Deleting object {}", keyToDelete.key)
3135
s3Client.deleteObject(bucket, keyToDelete.key)

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt

+43-20
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,31 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
1414
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
1515
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
1616
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
17+
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
1718
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
1819
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
1920
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
2021
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
22+
import java.io.IOException
23+
import java.sql.Timestamp
24+
import java.util.*
2125
import org.apache.avro.file.DataFileWriter
2226
import org.apache.avro.generic.GenericData
2327
import org.apache.avro.generic.GenericDatumWriter
2428
import org.slf4j.Logger
2529
import org.slf4j.LoggerFactory
2630
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
27-
import java.io.IOException
28-
import java.sql.Timestamp
29-
import java.util.*
3031

31-
class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,
32-
s3Client: AmazonS3,
33-
configuredStream: ConfiguredAirbyteStream,
34-
uploadTimestamp: Timestamp,
35-
converter: JsonAvroConverter?,
36-
jsonSchema: JsonNode? = null) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter {
32+
class GcsAvroWriter
33+
@JvmOverloads
34+
constructor(
35+
config: GcsDestinationConfig,
36+
s3Client: AmazonS3,
37+
configuredStream: ConfiguredAirbyteStream,
38+
uploadTimestamp: Timestamp,
39+
converter: JsonAvroConverter?,
40+
jsonSchema: JsonNode? = null
41+
) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter {
3742
private val avroRecordFactory: AvroRecordFactory
3843
private val uploadManager: StreamTransferManager
3944
private val outputStream: MultiPartOutputStream
@@ -42,30 +47,48 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,
4247
override val outputPath: String
4348

4449
init {
45-
val schema = if (jsonSchema == null
46-
) GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false)
47-
else JsonToAvroSchemaConverter().getAvroSchema(jsonSchema, stream.name,
48-
stream.namespace, true, false, false, true)
50+
val schema =
51+
if (jsonSchema == null)
52+
GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false)
53+
else
54+
JsonToAvroSchemaConverter()
55+
.getAvroSchema(
56+
jsonSchema,
57+
stream.name,
58+
stream.namespace,
59+
true,
60+
false,
61+
false,
62+
true
63+
)
4964
LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false))
5065

51-
val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
66+
val outputFilename: String =
67+
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
5268
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
5369
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)
5470

55-
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.name, config.bucketName,
56-
outputPath)
71+
LOGGER.info(
72+
"Full GCS path for stream '{}': {}/{}",
73+
stream.name,
74+
config.bucketName,
75+
outputPath
76+
)
5777

5878
this.avroRecordFactory = AvroRecordFactory(schema, converter)
59-
this.uploadManager = create(config.bucketName, outputPath, s3Client)
60-
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
79+
this.uploadManager =
80+
create(config.bucketName, outputPath, s3Client)
81+
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
6182
.get()
62-
// We only need one output stream as we only have one input stream. This is reasonably performant.
83+
// We only need one output stream as we only have one input stream. This is reasonably
84+
// performant.
6385
this.outputStream = uploadManager.multiPartOutputStreams[0]
6486

6587
val formatConfig = config.formatConfig as S3AvroFormatConfig
6688
// The DataFileWriter always uses binary encoding.
6789
// If json encoding is needed in the future, use the GenericDatumWriter directly.
68-
this.dataFileWriter = DataFileWriter(GenericDatumWriter<GenericData.Record>())
90+
this.dataFileWriter =
91+
DataFileWriter(GenericDatumWriter<GenericData.Record>())
6992
.setCodec(formatConfig.codecFactory)
7093
.create(schema, outputStream)
7194
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig
88
import java.util.*
99

1010
interface GcsCredentialConfig : BlobStorageCredentialConfig<GcsCredentialType?> {
11-
val s3CredentialConfig: Optional<S3CredentialConfig?>
11+
val s3CredentialConfig: Optional<S3CredentialConfig>
1212
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfigs.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import java.util.*
1010
object GcsCredentialConfigs {
1111
fun getCredentialConfig(config: JsonNode): GcsCredentialConfig {
1212
val credentialConfig = config["credential"]
13-
val credentialType = GcsCredentialType.valueOf(credentialConfig["credential_type"].asText().uppercase(Locale.getDefault()))
13+
val credentialType =
14+
GcsCredentialType.valueOf(
15+
credentialConfig["credential_type"].asText().uppercase(Locale.getDefault())
16+
)
1417

1518
if (credentialType == GcsCredentialType.HMAC_KEY) {
1619
return GcsHmacKeyCredentialConfig(credentialConfig)

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig {
2525
override val credentialType: GcsCredentialType
2626
get() = GcsCredentialType.HMAC_KEY
2727

28-
override val s3CredentialConfig: Optional<S3CredentialConfig?>
28+
override val s3CredentialConfig: Optional<S3CredentialConfig>
2929
get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret))
3030
}

0 commit comments

Comments
 (0)