Skip to content

Commit 53c22de

Browse files
benmoriceaudavinchia
authored andcommitted
✨Switch redshift staging to async mode (airbytehq#28619)
* Async snowflake * Use async in destination implenentation * Format * Switch redshif to asyn mode * Remove old unused consumer creation * Add new version * Fix non staging mode * Change switcing to use the get serialized consumer * Automated Commit - Format and Process Resources Changes * Test * Automated Commit - Format and Process Resources Changes * Use method * Test smaller buffer * Test smaller buffer for redshift * Automated Commit - Format and Process Resources Changes * Bigger ratio * Remove snowflake changes * Implement the new interface * Automated Commit - Format and Process Resources Changes * push ratio to 0.8 * Smaller Optimal buffer size * Automated Commit - Format and Process Resources Changes * Bigger buffer * Use a buffer of 10 Mb * Use a buffer of 75 Mb * Test reduce lib thread * Add flags for remote profiler. * Part size to match the async part size * Part size to 100 Mb * restore default * Try with 1 thread * Go back to default * Clean up * Bump version * Restore gradle * Re-add vm capture * Test reduce allowed buffer size * Use all the memory available * only 3 threads for the lib * Automated Commit - Format and Process Resources Changes * test with 1 * Automated Commit - Format and Process Resources Changes * Add local log ling. * Do not use all RAM for heap. * Fix build * Clean up * Clean up * Update airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/AsyncFlush.java Co-authored-by: Davin Chia <[email protected]> * Automated Commit - Format and Process Resources Changes --------- Co-authored-by: Davin Chia <[email protected]> Co-authored-by: benmoriceau <[email protected]>
1 parent c7c1ae1 commit 53c22de

File tree

5 files changed

+34
-14
lines changed

5 files changed

+34
-14
lines changed

airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/AsyncFlush.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,36 @@ class AsyncFlush implements DestinationFlushFunction {
3434
private final ConfiguredAirbyteCatalog catalog;
3535
private final TypeAndDedupeOperationValve typerDeduperValve;
3636
private final TyperDeduper typerDeduper;
37+
private final long optimalBatchSizeBytes;
3738

3839
public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
3940
final StagingOperations stagingOperations,
4041
final JdbcDatabase database,
4142
final ConfiguredAirbyteCatalog catalog,
4243
final TypeAndDedupeOperationValve typerDeduperValve,
4344
final TyperDeduper typerDeduper) {
45+
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024);
46+
}
47+
48+
public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
49+
final StagingOperations stagingOperations,
50+
final JdbcDatabase database,
51+
final ConfiguredAirbyteCatalog catalog,
52+
final TypeAndDedupeOperationValve typerDeduperValve,
53+
final TyperDeduper typerDeduper,
54+
// In general, this size is chosen to improve the performance of lower memory connectors. With 1 Gi
55+
// of
56+
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
57+
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
58+
// freed earlier similar to a sliding window effect
59+
long optimalBatchSizeBytes) {
4460
this.streamDescToWriteConfig = streamDescToWriteConfig;
4561
this.stagingOperations = stagingOperations;
4662
this.database = database;
4763
this.catalog = catalog;
4864
this.typerDeduperValve = typerDeduperValve;
4965
this.typerDeduper = typerDeduper;
66+
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
5067
}
5168

5269
@Override
@@ -110,12 +127,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
110127

111128
@Override
112129
public long getOptimalBatchSizeBytes() {
113-
// todo(ryankfu): this should be per-destination specific. currently this is for Snowflake.
114-
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
115-
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
116-
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
117-
// freed earlier similar to a sliding window effect
118-
return 50 * 1024 * 1024;
130+
return optimalBatchSizeBytes;
119131
}
120132

121133
}

airbyte-integrations/connectors/destination-redshift/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ ENV APPLICATION destination-redshift
4646

4747
COPY --from=build /airbyte /airbyte
4848

49-
LABEL io.airbyte.version=0.6.3
49+
LABEL io.airbyte.version=0.6.4
5050
LABEL io.airbyte.name=airbyte/destination-redshift
5151

5252
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"

airbyte-integrations/connectors/destination-redshift/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
5-
dockerImageTag: 0.6.3
5+
dockerImageTag: 0.6.4
66
dockerRepository: airbyte/destination-redshift
77
githubIssueLabel: destination-redshift
88
icon: redshift.svg

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airbyte.integrations.base.AirbyteMessageConsumer;
2323
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
2424
import io.airbyte.integrations.base.Destination;
25+
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
2526
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
2627
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
2728
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
@@ -46,6 +47,8 @@
4647
import java.util.Map;
4748
import java.util.function.Consumer;
4849
import javax.sql.DataSource;
50+
51+
import org.apache.commons.lang3.NotImplementedException;
4952
import org.slf4j.Logger;
5053
import org.slf4j.LoggerFactory;
5154

@@ -136,15 +139,20 @@ public JsonNode toJdbcConfig(final JsonNode config) {
136139
}
137140

138141
@Override
142+
@Deprecated
139143
public AirbyteMessageConsumer getConsumer(final JsonNode config,
140144
final ConfiguredAirbyteCatalog catalog,
141145
final Consumer<AirbyteMessage> outputRecordCollector) {
146+
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
147+
}
148+
149+
@Override
150+
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
142151
final EncryptionConfig encryptionConfig =
143-
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
152+
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
144153
final JsonNode s3Options = findS3Options(config);
145154
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
146155
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);
147-
148156
if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
149157
LOGGER.warn("""
150158
Increasing the number of file buffers past {} can lead to increased performance but
@@ -153,12 +161,11 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
153161
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
154162
}
155163

156-
return new StagingConsumerFactory().create(
164+
return new StagingConsumerFactory().createAsync(
157165
outputRecordCollector,
158166
getDatabase(getDataSource(config)),
159167
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
160168
getNamingResolver(),
161-
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, numberOfFileBuffers)),
162169
config,
163170
catalog,
164171
isPurgeStagingData(s3Options),

docs/integrations/destinations/redshift.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
155155
## Changelog
156156

157157
| Version | Date | Pull Request | Subject |
158-
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
158+
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
159+
| 0.6.4 | 2023-08-10 | [\#28619](https://github.com/airbytehq/airbyte/pull/28619) | Use async method for staging |
159160
| 0.6.3 | 2023-08-07 | [\#29188](https://github.com/airbytehq/airbyte/pull/29188) | Internal code refactoring |
160161
| 0.6.2 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation |
161162
| 0.6.1 | 2023-07-14 | [\#28345](https://github.com/airbytehq/airbyte/pull/28345) | Increment patch to trigger a rebuild |
@@ -175,7 +176,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
175176
| 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
176177
| 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
177178
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Flatten JSON arrays to fix maximum size check for SUPER field |
178-
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
179+
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
179180
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
180181
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
181182
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |

0 commit comments

Comments
 (0)