-
Notifications
You must be signed in to change notification settings - Fork 4.6k
✨Switch redshift staging to async mode #28619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 75 commits
e6f0ba0
0ce1aac
51fbe49
4263a79
978cfd2
5aa307b
6a083d1
b1b9343
fd8acfb
57844fd
5123f87
f15ca3f
e5745b9
e8bb0f4
c01e7ef
7a1ae97
73643b3
f110532
ccada79
cd30db3
a707c68
36df2a1
eb6d6e0
65fdd53
6a3564b
4bcd309
d3d341d
803c615
0f2a0f8
1b88cd5
3038b81
a80af5e
ccbe015
81238ee
387f8cf
5573fd9
84daced
0ae6b96
80d5b95
5d9f8c1
57b39b8
08a2939
731fd2a
4e16ec6
105712c
6fa7de4
508dd8c
8f9f590
de8cd71
abff487
f5a37f1
7ce1582
25ff279
32ab872
bb4b154
5f2d069
4c40626
63e66f6
0165f9e
5c6547e
d2dc6f4
1e7b4b7
a5c33a5
6c7abe9
19766d6
0f3d5cb
b837d20
74d6c61
086d050
d3357da
78a70db
7de0a1b
e2b54dd
c96d32c
0fa168c
2da179d
f2feec4
97ac3fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.AirbyteTraceMessageUtility; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper; | ||
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; | ||
import io.airbyte.integrations.base.ssh.SshWrappedDestination; | ||
|
@@ -46,6 +47,8 @@ | |
import java.util.Map; | ||
import java.util.function.Consumer; | ||
import javax.sql.DataSource; | ||
|
||
import org.apache.commons.lang3.NotImplementedException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -136,15 +139,20 @@ public JsonNode toJdbcConfig(final JsonNode config) { | |
} | ||
|
||
@Override | ||
@Deprecated | ||
public AirbyteMessageConsumer getConsumer(final JsonNode config, | ||
final ConfiguredAirbyteCatalog catalog, | ||
final Consumer<AirbyteMessage> outputRecordCollector) { | ||
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead"); | ||
} | ||
|
||
@Override | ||
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) throws Exception { | ||
final EncryptionConfig encryptionConfig = | ||
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption(); | ||
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption(); | ||
final JsonNode s3Options = findS3Options(config); | ||
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options); | ||
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options); | ||
|
||
if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can also get rid of the file buffers option since the Async framework accounts for this. I think we should do so in a follow up PR and patch version bump. @edgao any preference? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Edward is ok, I can do it after merging this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. followup pr works for me! |
||
LOGGER.warn(""" | ||
Increasing the number of file buffers past {} can lead to increased performance but | ||
|
@@ -153,12 +161,11 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, | |
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size()); | ||
} | ||
|
||
return new StagingConsumerFactory().create( | ||
return new StagingConsumerFactory().createAsync( | ||
outputRecordCollector, | ||
getDatabase(getDataSource(config)), | ||
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig), | ||
getNamingResolver(), | ||
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, numberOfFileBuffers)), | ||
config, | ||
catalog, | ||
isPurgeStagingData(s3Options), | ||
|
Uh oh!
There was an error while loading. Please reload this page.