Skip to content

Commit 4c4a105

Browse files
S3 Destination: Descreased thread allocation & memory ratio for AsyncConsumer (#43714)
1 parent b775298 commit 4c4a105

File tree

7 files changed

+29
-17
lines changed

7 files changed

+29
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.44.4
1+
version=0.44.9

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ private val LOGGER = KotlinLogging.logger {}
2121
abstract class BaseS3Destination
2222
protected constructor(
2323
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory(),
24-
protected val environment: Map<String, String> = System.getenv()
24+
protected val environment: Map<String, String> = System.getenv(),
25+
private val memoryRatio: Double = 0.5,
26+
private val nThreads: Int = 5
2527
) : BaseConnector(), Destination {
2628
private val nameTransformer: NamingConventionTransformer = S3NameTransformer()
2729

@@ -74,7 +76,9 @@ protected constructor(
7476
outputRecordCollector,
7577
S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config),
7678
s3Config,
77-
catalog
79+
catalog,
80+
memoryRatio,
81+
nThreads
7882
)
7983
}
8084

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException
2424
import io.airbyte.commons.json.Jsons
2525
import io.airbyte.protocol.models.v0.*
2626
import io.github.oshai.kotlinlogging.KotlinLogging
27+
import java.util.concurrent.Executors
2728
import java.util.function.Consumer
2829
import java.util.function.Function
2930
import org.joda.time.DateTime
@@ -158,7 +159,9 @@ class S3ConsumerFactory {
158159
outputRecordCollector: Consumer<AirbyteMessage>,
159160
storageOps: S3StorageOperations,
160161
s3Config: S3DestinationConfig,
161-
catalog: ConfiguredAirbyteCatalog
162+
catalog: ConfiguredAirbyteCatalog,
163+
memoryRatio: Double,
164+
nThreads: Int
162165
): SerializedAirbyteMessageConsumer {
163166
val writeConfigs = createWriteConfigs(storageOps, s3Config, catalog)
164167
// Buffer creation function: yields a file buffer that converts
@@ -190,7 +193,11 @@ class S3ConsumerFactory {
190193
// S3 has no concept of default namespace
191194
// In the "namespace from destination case", the namespace
192195
// is simply omitted from the path.
193-
BufferManager(defaultNamespace = null)
196+
BufferManager(
197+
defaultNamespace = null,
198+
maxMemory = (Runtime.getRuntime().maxMemory() * memoryRatio).toLong()
199+
),
200+
workerPool = Executors.newFixedThreadPool(nThreads)
194201
)
195202
}
196203

airbyte-integrations/connectors/destination-s3/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.44.0'
7+
cdkVersionRequired = '0.44.9'
88
features = ['db-destinations', 's3-destinations']
9-
useLocalCdk = true // TODO: Version CDK, bump required version, and set this to false
9+
useLocalCdk = false // TODO: Version CDK, bump required version, and set this to false
1010
}
1111

1212
airbyteJavaConnector.addCdkDependencies()

airbyte-integrations/connectors/destination-s3/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: file
33
connectorType: destination
44
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
5-
dockerImageTag: 0.6.6
5+
dockerImageTag: 0.6.7
66
dockerRepository: airbyte/destination-s3
77
githubIssueLabel: destination-s3
88
icon: s3.svg

airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
1010
import io.airbyte.cdk.integrations.destination.s3.StorageProvider
1111

1212
open class S3Destination : BaseS3Destination {
13-
constructor()
13+
constructor() : super(nThreads = 2, memoryRatio = 0.5)
1414

1515
@VisibleForTesting
1616
constructor(

docs/integrations/destinations/s3.md

+9-8
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,15 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
514514

515515
| Version | Date | Pull Request | Subject |
516516
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
517-
| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 |
518-
| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |
519-
| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency |
520-
| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin |
521-
| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth |
522-
| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; |
523-
| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. |
524-
| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
517+
| 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 1) for async S3 uploads. |
518+
| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 |
519+
| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |
520+
| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency |
521+
| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin |
522+
| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth |
523+
| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; |
524+
| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. |
525+
| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
525526
| 0.5.8 | 2024-01-03 | [#33924](https://github.com/airbytehq/airbyte/pull/33924) | Add new ap-southeast-3 AWS region |
526527
| 0.5.7 | 2023-12-28 | [#33788](https://github.com/airbytehq/airbyte/pull/33788) | Thread-safe fix for file part names |
527528
| 0.5.6 | 2023-12-08 | [#33263](https://github.com/airbytehq/airbyte/pull/33263) | (incorrect filename format, do not use) Adopt java CDK version 0.7.0. |

0 commit comments

Comments
 (0)