Skip to content

✨Switch redshift staging to async mode #28619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 78 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 75 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
e6f0ba0
Async snowflake
benmoriceau Jul 19, 2023
0ce1aac
Use async in destination implenentation
benmoriceau Jul 20, 2023
51fbe49
Format
benmoriceau Jul 20, 2023
4263a79
Merge branch 'master' into bmoric/async-snowflake
benmoriceau Jul 21, 2023
978cfd2
Merge branch 'master' into bmoric/async-snowflake
davinchia Jul 21, 2023
5aa307b
Switch redshif to asyn mode
benmoriceau Jul 24, 2023
6a083d1
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Jul 24, 2023
b1b9343
Remove old unused consumer creation
benmoriceau Jul 24, 2023
fd8acfb
Add new version
benmoriceau Jul 24, 2023
57844fd
Merge branch 'bmoric/async-snowflake' of github.com:airbytehq/airbyte…
benmoriceau Jul 25, 2023
5123f87
Merge branch 'master' into bmoric/async-snowflake
benmoriceau Jul 25, 2023
f15ca3f
Merge branch 'master' into bmoric/async-snowflake
benmoriceau Jul 25, 2023
e5745b9
Merge branch 'master' into bmoric/async-snowflake
benmoriceau Jul 25, 2023
e8bb0f4
Merge branch 'bmoric/async-snowflake' into bmoric/async-redshift
benmoriceau Jul 25, 2023
c01e7ef
Fix non staging mode
benmoriceau Jul 27, 2023
7a1ae97
Change switcing to use the get serialized consumer
benmoriceau Jul 27, 2023
73643b3
Automated Commit - Format and Process Resources Changes
benmoriceau Jul 27, 2023
f110532
Test
benmoriceau Jul 27, 2023
ccada79
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Jul 27, 2023
cd30db3
Automated Commit - Format and Process Resources Changes
benmoriceau Jul 27, 2023
a707c68
Use method
benmoriceau Jul 27, 2023
36df2a1
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Jul 27, 2023
eb6d6e0
Test smaller buffer
benmoriceau Jul 31, 2023
65fdd53
Test smaller buffer for redshift
benmoriceau Jul 31, 2023
6a3564b
Automated Commit - Format and Process Resources Changes
benmoriceau Jul 31, 2023
4bcd309
Bigger ratio
benmoriceau Jul 31, 2023
d3d341d
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Jul 31, 2023
803c615
Remove snowflake changes
benmoriceau Jul 31, 2023
0f2a0f8
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Jul 31, 2023
1b88cd5
Implement the new interface
benmoriceau Jul 31, 2023
3038b81
Automated Commit - Format and Process Resources Changes
benmoriceau Jul 31, 2023
a80af5e
push ratio to 0.8
benmoriceau Jul 31, 2023
ccbe015
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Jul 31, 2023
81238ee
Smaller Optimal buffer size
benmoriceau Jul 31, 2023
387f8cf
Automated Commit - Format and Process Resources Changes
benmoriceau Jul 31, 2023
5573fd9
Bigger buffer
benmoriceau Jul 31, 2023
84daced
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Jul 31, 2023
0ae6b96
Use a buffer of 10 Mb
benmoriceau Jul 31, 2023
80d5b95
Use a buffer of 75 Mb
benmoriceau Jul 31, 2023
5d9f8c1
Merge branch 'master' into bmoric/async-redshift
benmoriceau Jul 31, 2023
57b39b8
Test reduce lib thread
benmoriceau Aug 1, 2023
08a2939
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Aug 1, 2023
731fd2a
Add flags for remote profiler.
davinchia Aug 1, 2023
4e16ec6
Part size to match the async part size
benmoriceau Aug 1, 2023
105712c
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Aug 1, 2023
6fa7de4
Part size to 100 Mb
benmoriceau Aug 1, 2023
508dd8c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 1, 2023
8f9f590
restore default
benmoriceau Aug 1, 2023
de8cd71
Try with 1 thread
benmoriceau Aug 1, 2023
abff487
Merge branch 'master' into bmoric/async-redshift
benmoriceau Aug 1, 2023
f5a37f1
Go back to default
benmoriceau Aug 1, 2023
7ce1582
Clean up
benmoriceau Aug 2, 2023
25ff279
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 2, 2023
32ab872
Bump version
benmoriceau Aug 2, 2023
bb4b154
Merge branch 'master' into bmoric/async-redshift
benmoriceau Aug 2, 2023
5f2d069
Merge branch 'master' into bmoric/async-redshift
benmoriceau Aug 2, 2023
4c40626
Restore gradle
benmoriceau Aug 4, 2023
63e66f6
Re-add vm capture
benmoriceau Aug 4, 2023
0165f9e
Test reduce allowed buffer size
benmoriceau Aug 4, 2023
5c6547e
Use all the memory available
benmoriceau Aug 4, 2023
d2dc6f4
only 3 threads for the lib
benmoriceau Aug 4, 2023
1e7b4b7
Automated Commit - Format and Process Resources Changes
benmoriceau Aug 4, 2023
a5c33a5
test with 1
benmoriceau Aug 4, 2023
6c7abe9
Merge branch 'bmoric/async-redshift' of github.com:airbytehq/airbyte …
benmoriceau Aug 4, 2023
19766d6
Automated Commit - Format and Process Resources Changes
benmoriceau Aug 4, 2023
0f3d5cb
Add local log ling.
davinchia Aug 4, 2023
b837d20
Do not use all RAM for heap.
davinchia Aug 5, 2023
74d6c61
Merge branch 'master' into bmoric/async-redshift
benmoriceau Aug 8, 2023
086d050
Merge branch 'master' into bmoric/async-redshift
benmoriceau Aug 8, 2023
d3357da
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 10, 2023
78a70db
Fix build
benmoriceau Aug 10, 2023
7de0a1b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 11, 2023
e2b54dd
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 11, 2023
c96d32c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/asy…
benmoriceau Aug 14, 2023
0fa168c
Clean up
benmoriceau Aug 14, 2023
2da179d
Clean up
benmoriceau Aug 14, 2023
f2feec4
Update airbyte-integrations/bases/bases-destination-jdbc/src/main/jav…
benmoriceau Aug 14, 2023
97ac3fb
Automated Commit - Format and Process Resources Changes
benmoriceau Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,35 @@ class AsyncFlush implements DestinationFlushFunction {
private final ConfiguredAirbyteCatalog catalog;
private final TypeAndDedupeOperationValve typerDeduperValve;
private final TyperDeduper typerDeduper;
private final long optimalBatchSizeBytes;

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper) {
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024);
}

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
long optimalBatchSizeBytes) {
this.streamDescToWriteConfig = streamDescToWriteConfig;
this.stagingOperations = stagingOperations;
this.database = database;
this.catalog = catalog;
this.typerDeduperValve = typerDeduperValve;
this.typerDeduper = typerDeduper;
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
}

@Override
Expand Down Expand Up @@ -110,12 +126,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

@Override
public long getOptimalBatchSizeBytes() {
// todo(ryankfu): this should be per-destination specific. currently this is for Snowflake.
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
return 50 * 1024 * 1024;
return optimalBatchSizeBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.3
LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.name=airbyte/destination-redshift

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.6.3
dockerImageTag: 0.6.4
dockerRepository: airbyte/destination-redshift
githubIssueLabel: destination-redshift
icon: redshift.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public ConnectorSpecification spec() throws Exception {
}

public static void main(final String[] args) throws Exception {
LOGGER.info("====== Davin's Local");
final Destination destination = new RedshiftDestination();
LOGGER.info("starting destination: {}", RedshiftDestination.class);
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
Expand All @@ -46,6 +47,8 @@
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;

import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,15 +139,20 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
@Deprecated
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);

if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also get rid of the file buffers option since the Async framework accounts for this. I think we should do so in a follow up PR and patch version bump. @edgao any preference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Edward is ok, I can do it after merging this

Copy link
Contributor

@edgao edgao Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followup pr works for me!

LOGGER.warn("""
Increasing the number of file buffers past {} can lead to increased performance but
Expand All @@ -153,12 +161,11 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}

return new StagingConsumerFactory().create(
return new StagingConsumerFactory().createAsync(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, numberOfFileBuffers)),
config,
catalog,
isPurgeStagingData(s3Options),
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.4 | 2023-08-10 | [\#28619](https://github.com/airbytehq/airbyte/pull/28619) | Use async method for staging |
| 0.6.3 | 2023-08-07 | [\#29188](https://github.com/airbytehq/airbyte/pull/29188) | Internal code refactoring |
| 0.6.2 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation |
| 0.6.1 | 2023-07-14 | [\#28345](https://github.com/airbytehq/airbyte/pull/28345) | Increment patch to trigger a rebuild |
Expand All @@ -175,7 +176,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
| 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Flatten JSON arrays to fix maximum size check for SUPER field |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
Expand Down