Skip to content

Commit 6301cfa

Browse files
authored
🎉 Destination snowflake: reduce memory consumption (#10297)
* Avoid redundant adapter construction * Remove unused logger * Avoid redundant creation of buffer map * Decrease max batch byte size to 128 mb * Format code * Move data adapter to an instance variable * Bump version * Bump version in seed
1 parent ed276f4 commit 6301cfa

File tree

10 files changed

+54
-77
lines changed

10 files changed

+54
-77
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@
185185
- name: Snowflake
186186
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
187187
dockerRepository: airbyte/destination-snowflake
188-
dockerImageTag: 0.4.9
188+
dockerImageTag: 0.4.10
189189
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
190190
icon: snowflake.svg
191191
- name: MariaDB ColumnStore

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+11-13
Original file line numberDiff line numberDiff line change
@@ -3817,7 +3817,7 @@
38173817
supported_destination_sync_modes:
38183818
- "overwrite"
38193819
- "append"
3820-
- dockerImage: "airbyte/destination-snowflake:0.4.9"
3820+
- dockerImage: "airbyte/destination-snowflake:0.4.10"
38213821
spec:
38223822
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
38233823
connectionSpecification:
@@ -3899,32 +3899,30 @@
38993899
description: "The loading method used to send data to Snowflake."
39003900
order: 8
39013901
oneOf:
3902-
- title: "[Recommended] Internal Staging"
3902+
- title: "Select another option"
39033903
additionalProperties: false
3904-
description: "Writes large batches of records to a file, uploads the file\
3905-
\ to Snowflake, then uses <pre>COPY INTO table</pre> to upload the file.\
3906-
\ Recommended for large production workloads for better speed and scalability."
3904+
description: "Select another option"
39073905
required:
39083906
- "method"
39093907
properties:
39103908
method:
39113909
type: "string"
39123910
enum:
3913-
- "Internal Staging"
3914-
default: "Internal Staging"
3915-
- title: "Standard Inserts"
3911+
- "Standard"
3912+
default: "Standard"
3913+
- title: "[Recommended] Internal Staging"
39163914
additionalProperties: false
3917-
description: "Uses <pre>INSERT</pre> statements to send batches of records\
3918-
\ to Snowflake. Easiest (no setup) but not recommended for large production\
3919-
\ workloads due to slow speed."
3915+
description: "Writes large batches of records to a file, uploads the file\
3916+
\ to Snowflake, then uses <pre>COPY INTO table</pre> to upload the file.\
3917+
\ Recommended for large production workloads for better speed and scalability."
39203918
required:
39213919
- "method"
39223920
properties:
39233921
method:
39243922
type: "string"
39253923
enum:
3926-
- "Standard"
3927-
default: "Standard"
3924+
- "Internal Staging"
3925+
default: "Internal Staging"
39283926
- title: "AWS S3 Staging"
39293927
additionalProperties: false
39303928
description: "Writes large batches of records to a file, uploads the file\

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java

+18-26
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.Objects;
2726
import java.util.Set;
2827
import java.util.function.Consumer;
29-
import java.util.stream.Collectors;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

@@ -81,13 +79,13 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
8179
private final RecordWriter recordWriter;
8280
private final CheckedConsumer<Boolean, Exception> onClose;
8381
private final Set<AirbyteStreamNameNamespacePair> streamNames;
84-
private final List<AirbyteMessage> buffer;
8582
private final ConfiguredAirbyteCatalog catalog;
8683
private final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord;
87-
private final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount;
84+
private final Map<AirbyteStreamNameNamespacePair, Long> streamToIgnoredRecordCount;
8885
private final Consumer<AirbyteMessage> outputRecordCollector;
8986
private final long maxQueueSizeInBytes;
9087
private long bufferSizeInBytes;
88+
private Map<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> streamBuffer;
9189

9290
private boolean hasStarted;
9391
private boolean hasClosed;
@@ -112,9 +110,9 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
112110
this.catalog = catalog;
113111
this.streamNames = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
114112
this.isValidRecord = isValidRecord;
115-
this.buffer = new ArrayList<>(10_000);
116113
this.bufferSizeInBytes = 0;
117-
this.pairToIgnoredRecordCount = new HashMap<>();
114+
this.streamToIgnoredRecordCount = new HashMap<>();
115+
this.streamBuffer = new HashMap<>();
118116
}
119117

120118
@Override
@@ -123,7 +121,7 @@ protected void startTracked() throws Exception {
123121
Preconditions.checkState(!hasStarted, "Consumer has already been started.");
124122
hasStarted = true;
125123

126-
pairToIgnoredRecordCount.clear();
124+
streamToIgnoredRecordCount.clear();
127125
LOGGER.info("{} started.", BufferedStreamConsumer.class);
128126

129127
onStart.call();
@@ -141,7 +139,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
141139
}
142140

143141
if (!isValidRecord.apply(message.getRecord().getData())) {
144-
pairToIgnoredRecordCount.put(stream, pairToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L);
142+
streamToIgnoredRecordCount.put(stream, streamToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L);
145143
return;
146144
}
147145

@@ -151,15 +149,12 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
151149
final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
152150
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
153151
LOGGER.info("Flushing buffer...");
154-
AirbyteSentry.executeWithTracing("FlushBuffer",
155-
this::flushQueueToDestination,
156-
Map.of("stream", stream.getName(),
157-
"namespace", Objects.requireNonNullElse(stream.getNamespace(), "null"),
158-
"bufferSizeInBytes", bufferSizeInBytes));
152+
flushQueueToDestination(bufferSizeInBytes);
159153
bufferSizeInBytes = 0;
160154
}
161155

162-
buffer.add(message);
156+
final List<AirbyteRecordMessage> bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>());
157+
bufferedRecords.add(message.getRecord());
163158
bufferSizeInBytes += messageSizeInBytes;
164159

165160
} else if (message.getType() == Type.STATE) {
@@ -170,16 +165,13 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
170165

171166
}
172167

173-
private void flushQueueToDestination() throws Exception {
174-
final Map<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> recordsByStream = buffer.stream()
175-
.map(AirbyteMessage::getRecord)
176-
.collect(Collectors.groupingBy(AirbyteStreamNameNamespacePair::fromRecordMessage));
177-
178-
buffer.clear();
179-
180-
for (final Map.Entry<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> entry : recordsByStream.entrySet()) {
181-
recordWriter.accept(entry.getKey(), entry.getValue());
182-
}
168+
private void flushQueueToDestination(long bufferSizeInBytes) throws Exception {
169+
AirbyteSentry.executeWithTracing("FlushBuffer", () -> {
170+
for (final Map.Entry<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> entry : streamBuffer.entrySet()) {
171+
recordWriter.accept(entry.getKey(), entry.getValue());
172+
}
173+
}, Map.of("bufferSizeInBytes", bufferSizeInBytes));
174+
streamBuffer = new HashMap<>();
183175

184176
if (pendingState != null) {
185177
lastFlushedState = pendingState;
@@ -199,13 +191,13 @@ protected void close(final boolean hasFailed) throws Exception {
199191
Preconditions.checkState(!hasClosed, "Has already closed.");
200192
hasClosed = true;
201193

202-
pairToIgnoredRecordCount
194+
streamToIgnoredRecordCount
203195
.forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair));
204196
if (hasFailed) {
205197
LOGGER.error("executing on failed close procedure.");
206198
} else {
207199
LOGGER.info("executing on success close procedure.");
208-
flushQueueToDestination();
200+
flushQueueToDestination(bufferSizeInBytes);
209201
}
210202

211203
try {

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java

-4
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,9 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import java.util.function.Function;
1010
import java.util.function.Predicate;
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
1311

1412
public class DataAdapter {
1513

16-
private static final Logger LOGGER = LoggerFactory.getLogger(DataAdapter.class);
17-
1814
private final Predicate<JsonNode> filterValueNode;
1915
private final Function<JsonNode, JsonNode> valueNodeAdapter;
2016

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java

+16-17
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,28 @@
1919
import java.util.List;
2020
import java.util.Map;
2121
import java.util.Objects;
22+
import java.util.Optional;
2223
import java.util.UUID;
2324
import org.apache.commons.csv.CSVFormat;
2425
import org.apache.commons.csv.CSVPrinter;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2726

27+
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
2828
public abstract class JdbcSqlOperations implements SqlOperations {
2929

30-
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSqlOperations.class);
3130
protected static final String SHOW_SCHEMAS = "show schemas;";
3231
protected static final String NAME = "name";
3332

33+
// this adapter modifies record message before inserting them to the destination
34+
protected final Optional<DataAdapter> dataAdapter;
35+
36+
protected JdbcSqlOperations() {
37+
this.dataAdapter = Optional.empty();
38+
}
39+
40+
protected JdbcSqlOperations(final DataAdapter dataAdapter) {
41+
this.dataAdapter = Optional.of(dataAdapter);
42+
}
43+
3444
@Override
3545
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
3646
if (!isSchemaExists(database, schemaName)) {
@@ -63,21 +73,14 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
6373
}
6474

6575
protected void writeBatchToFile(final File tmpFile, final List<AirbyteRecordMessage> records) throws Exception {
66-
PrintWriter writer = null;
67-
try {
68-
writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
69-
final var csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
70-
76+
try (final PrintWriter writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
77+
final CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) {
7178
for (final AirbyteRecordMessage record : records) {
7279
final var uuid = UUID.randomUUID().toString();
7380
final var jsonData = Jsons.serialize(formatData(record.getData()));
7481
final var emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
7582
csvPrinter.printRecord(uuid, jsonData, emittedAt);
7683
}
77-
} finally {
78-
if (writer != null) {
79-
writer.close();
80-
}
8184
}
8285
}
8386

@@ -137,7 +140,7 @@ public final void insertRecords(final JdbcDatabase database,
137140
throws Exception {
138141
AirbyteSentry.executeWithTracing("InsertRecords",
139142
() -> {
140-
records.forEach(airbyteRecordMessage -> getDataAdapter().adapt(airbyteRecordMessage.getData()));
143+
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData())));
141144
insertRecordsInternal(database, records, schemaName, tableName);
142145
},
143146
Map.of("schema", Objects.requireNonNullElse(schemaName, "null"), "table", tableName, "recordCount", records.size()));
@@ -149,8 +152,4 @@ protected abstract void insertRecordsInternal(JdbcDatabase database,
149152
String tableName)
150153
throws Exception;
151154

152-
protected DataAdapter getDataAdapter() {
153-
return new DataAdapter(j -> false, c -> c);
154-
}
155-
156155
}

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.integrations.destination.postgres;
66

77
import io.airbyte.db.jdbc.JdbcDatabase;
8-
import io.airbyte.integrations.destination.jdbc.DataAdapter;
98
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
109
import io.airbyte.protocol.models.AirbyteRecordMessage;
1110
import java.io.BufferedReader;
@@ -17,12 +16,12 @@
1716
import java.util.List;
1817
import org.postgresql.copy.CopyManager;
1918
import org.postgresql.core.BaseConnection;
20-
import org.slf4j.Logger;
21-
import org.slf4j.LoggerFactory;
2219

2320
public class PostgresSqlOperations extends JdbcSqlOperations {
2421

25-
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlOperations.class);
22+
public PostgresSqlOperations() {
23+
super(new PostgresDataAdapter());
24+
}
2625

2726
@Override
2827
public void insertRecordsInternal(final JdbcDatabase database,
@@ -58,9 +57,4 @@ public void insertRecordsInternal(final JdbcDatabase database,
5857
});
5958
}
6059

61-
@Override
62-
protected DataAdapter getDataAdapter() {
63-
return new PostgresDataAdapter();
64-
}
65-
6660
}

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
1818

1919
RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

21-
ENV APPLICATION_VERSION 0.4.9
21+
ENV APPLICATION_VERSION 0.4.10
2222
ENV ENABLE_SENTRY true
2323

24-
LABEL io.airbyte.version=0.4.9
24+
LABEL io.airbyte.version=0.4.10
2525
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java

-4
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,9 @@
1010
import io.airbyte.integrations.base.IntegrationRunner;
1111
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
1212
import java.util.Map;
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
1513

1614
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {
1715

18-
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestination.class);
19-
2016
enum DestinationType {
2117
COPY_S3,
2218
COPY_GCS,

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class SnowflakeInternalStagingConsumerFactory {
4646

4747
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class);
4848

49-
private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb
49+
private static final long MAX_BATCH_SIZE_BYTES = 128 * 1024 * 1024; // 128mb
5050
private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();
5151

5252
public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,

docs/integrations/destinations/snowflake.md

+2
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ Finally, you need to add read/write permissions to your bucket with that email.
217217

218218
| Version | Date | Pull Request | Subject |
219219
|:--------|:-----------| :----- | :------ |
220+
| 0.4.10 | 2022-02-14 | [\#10297](https://github.com/airbytehq/airbyte/pull/10297) | Halve the record buffer size to reduce memory consumption. |
221+
| 0.4.9 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `ExitOnOutOfMemoryError` JVM flag. |
220222
| 0.4.8 | 2022-02-01 | [\#9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. |
221223
| 0.4.7 | 2022-01-29 | [\#9745](https://github.com/airbytehq/airbyte/pull/9745) | Integrate with Sentry. |
222224
| 0.4.6 | 2022-01-28 | [#9623](https://github.com/airbytehq/airbyte/pull/9623) | Add jdbc_url_params support for optional JDBC parameters |

0 commit comments

Comments
 (0)