|
12 | 12 | import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;
|
13 | 13 |
|
14 | 14 | import com.fasterxml.jackson.databind.JsonNode;
|
15 |
| -import com.google.common.annotations.VisibleForTesting; |
16 | 15 | import io.airbyte.cdk.db.factory.DataSourceFactory;
|
17 | 16 | import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
|
18 | 17 | import io.airbyte.cdk.db.jdbc.JdbcDatabase;
|
|
30 | 29 | import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
|
31 | 30 | import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
|
32 | 31 | import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
|
33 |
| -import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; |
34 | 32 | import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption;
|
35 | 33 | import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
|
36 | 34 | import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig;
|
@@ -225,14 +223,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
|
225 | 223 | : new NoEncryption();
|
226 | 224 | final JsonNode s3Options = findS3Options(config);
|
227 | 225 | final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
|
228 |
| - final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options); |
229 |
| - if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) { |
230 |
| - LOGGER.warn(""" |
231 |
| - Increasing the number of file buffers past {} can lead to increased performance but |
232 |
| - leads to increased memory usage. If the number of file buffers exceeds the number |
233 |
| - of streams {} this will create more buffers than necessary, leading to nonexistent gains |
234 |
| - """, FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size()); |
235 |
| - } |
236 | 226 |
|
237 | 227 | final String defaultNamespace = config.get("schema").asText();
|
238 | 228 | for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
|
@@ -287,26 +277,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
|
287 | 277 | .createAsync();
|
288 | 278 | }
|
289 | 279 |
|
290 |
| - /** |
291 |
| - * Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of |
292 |
| - * file buffers and sets the minimum number to the default |
293 |
| - * <p> |
294 |
| - * NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has |
295 |
| - * not been thoroughly load tested across all instance sizes |
296 |
| - * |
297 |
| - * @param config user configurations |
298 |
| - * @return number of file buffers if configured otherwise default |
299 |
| - */ |
300 |
| - @VisibleForTesting |
301 |
| - public int getNumberOfFileBuffers(final JsonNode config) { |
302 |
| - int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER; |
303 |
| - if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) { |
304 |
| - numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER); |
305 |
| - } |
306 |
| - // Only allows for values 10 <= numOfFileBuffers <= 50 |
307 |
| - return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER); |
308 |
| - } |
309 |
| - |
310 | 280 | private boolean isPurgeStagingData(final JsonNode config) {
|
311 | 281 | return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
|
312 | 282 | }
|
|
0 commit comments