From bef31d16789221b595b2d54f96d52fce4abec94d Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 30 Jan 2025 11:12:47 -0800 Subject: [PATCH] Destination S3-V2: Bug Fix: File xfer uses part size for part, not file size --- .../cdk/load/write/object_storage/FilePartAccumulator.kt | 7 ++++++- .../load/write/object_storage/PartToObjectAccumulator.kt | 2 +- .../load/write/object_storage/FilePartAccumulatorTest.kt | 4 ++-- .../connectors/destination-s3/build.gradle | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index cbf0199082dff..874f484bb2367 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.object_storage.LoadablePart import io.airbyte.cdk.load.write.FileBatchAccumulator +import io.github.oshai.kotlinlogging.KotlinLogging import java.io.File import java.nio.file.Path @@ -28,6 +29,8 @@ class FilePartAccumulator( private val stream: DestinationStream, private val outputQueue: MultiProducerChannel>, ) : FileBatchAccumulator { + val log = KotlinLogging.logger {} + override suspend fun processFilePart(file: DestinationFile, index: Long) { val key = Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") @@ -44,8 +47,9 @@ class FilePartAccumulator( while (true) { val bytePart = - ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + ByteArray(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt()) val read = fileInputStream.read(bytePart) + log.info { "Read $read bytes from file" } if (read == -1) { val filePart: ByteArray? = null @@ -62,6 +66,7 @@ class FilePartAccumulator( handleFilePart(batch, stream.descriptor, index) } } + fileInputStream.close() localFile.delete() } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt index 1dba37ab3ee46..defb28688c527 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt @@ -48,7 +48,7 @@ class PartToObjectAccumulator>( val streamingUpload = upload.streamingUpload.await() log.info { - "Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (empty=${batch.part.isEmpty}; final=${batch.part.isFinal})" + "Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (size=${batch.part.bytes?.size}; final=${batch.part.isFinal})" } // Upload provided bytes and update indexes. diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt index 25d232bd33281..b13994faa0f0e 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt @@ -50,7 +50,7 @@ class FilePartAccumulatorTest { fun testFilePartAccumulatorExactlyPartSize() = runTest { val finalDirectory = "finalDirectory" every { pathFactory.getFinalDirectory(stream) } returns finalDirectory - val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt()) val index = 21L val fileMessage = createFileMessage(file) @@ -64,7 +64,7 @@ class FilePartAccumulatorTest { val finalDirectory = "finalDirectory" every { pathFactory.getFinalDirectory(stream) } returns finalDirectory val file = - createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt() + 1000) + createFile(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt() + 1000) val index = 21L val fileMessage = createFileMessage(file) diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index 1f26ebb2331d8..dc91cb3f4b368 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteBulkConnector { core = 'load' toolkits = ['load-s3', 'load-avro', 'load-aws'] - cdk = '0.296' + cdk = 'local' } application {