Skip to content

Commit 19ef4c3

Browse files
Destination S3-V2: Bug Fix: File xfer uses part size for part, not file size
1 parent e2edfd7 commit 19ef4c3

File tree

1 file changed

+6
-1
lines changed
  • airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage

1 file changed

+6
-1
lines changed

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFile
1616
import io.airbyte.cdk.load.message.MultiProducerChannel
1717
import io.airbyte.cdk.load.message.object_storage.LoadablePart
1818
import io.airbyte.cdk.load.write.FileBatchAccumulator
19+
import io.github.oshai.kotlinlogging.KotlinLogging
1920
import java.io.File
2021
import java.nio.file.Path
2122

@@ -28,6 +29,8 @@ class FilePartAccumulator(
2829
private val stream: DestinationStream,
2930
private val outputQueue: MultiProducerChannel<BatchEnvelope<*>>,
3031
) : FileBatchAccumulator {
32+
val log = KotlinLogging.logger {}
33+
3134
override suspend fun processFilePart(file: DestinationFile, index: Long) {
3235
val key =
3336
Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
@@ -44,8 +47,9 @@ class FilePartAccumulator(
4447

4548
while (true) {
4649
val bytePart =
47-
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt())
50+
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt())
4851
val read = fileInputStream.read(bytePart)
52+
log.info { "Read $read bytes from file" }
4953

5054
if (read == -1) {
5155
val filePart: ByteArray? = null
@@ -62,6 +66,7 @@ class FilePartAccumulator(
6266
handleFilePart(batch, stream.descriptor, index)
6367
}
6468
}
69+
fileInputStream.close()
6570
localFile.delete()
6671
}
6772

0 commit comments

Comments
 (0)