Skip to content

🎉 Snowflake destination: reduce memory footprint #10394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void run(final String[] args) throws Exception {
integration.getClass().getSimpleName(),
parsed.getCommand().toString(),
true);
LOGGER.info("Sentry transaction event: {}", transaction.getEventId());
try {
runInternal(transaction, parsed);
transaction.finish(SpanStatus.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.bytes.ByteUtils;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand Down Expand Up @@ -85,6 +84,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private final Map<AirbyteStreamNameNamespacePair, Long> streamToIgnoredRecordCount;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final long maxQueueSizeInBytes;
private final RecordSizeEstimator recordSizeEstimator;
private long bufferSizeInBytes;
private Map<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> streamBuffer;
private String fileName;
Expand Down Expand Up @@ -128,6 +128,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.bufferSizeInBytes = 0;
this.streamToIgnoredRecordCount = new HashMap<>();
this.streamBuffer = new HashMap<>();
this.recordSizeEstimator = new RecordSizeEstimator();
}

@Override
Expand Down Expand Up @@ -158,13 +159,9 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
return;
}

// TODO use a more efficient way to compute bytes that doesn't require double serialization (records
// are serialized again when writing to
// the destination
final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(recordMessage);
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
LOGGER.info("Flushing buffer...");
flushQueueToDestination(bufferSizeInBytes);
flushQueueToDestination();
bufferSizeInBytes = 0;
}

Expand All @@ -180,9 +177,12 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {

}

private void flushQueueToDestination(long bufferSizeInBytes) throws Exception {
private void flushQueueToDestination() throws Exception {
LOGGER.info("Flushing buffer: {} bytes", bufferSizeInBytes);

AirbyteSentry.executeWithTracing("FlushBuffer", () -> {
for (final Map.Entry<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> entry : streamBuffer.entrySet()) {
LOGGER.info("Flushing {}: {} records", entry.getKey().getName(), entry.getValue().size());
recordWriter.accept(entry.getKey(), entry.getValue());
if (checkAndRemoveRecordWriter != null) {
fileName = checkAndRemoveRecordWriter.apply(entry.getKey(), fileName);
Expand Down Expand Up @@ -215,7 +215,7 @@ protected void close(final boolean hasFailed) throws Exception {
LOGGER.error("executing on failed close procedure.");
} else {
LOGGER.info("executing on success close procedure.");
flushQueueToDestination(bufferSizeInBytes);
flushQueueToDestination();
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.buffered_stream_consumer;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.HashMap;
import java.util.Map;

/**
* This class estimate the byte size of the record message. To reduce memory footprint, 1) it
* assumes that a character is always four bytes, and 2) it only performs a sampling every N
* records. The size of the samples are averaged together to protect the estimation against
* outliers.
*/
public class RecordSizeEstimator {

// by default, perform one estimation for every 20 records
private static final int DEFAULT_SAMPLE_BATCH_SIZE = 20;

// latest estimated record message size for each stream
private final Map<String, Long> streamRecordSizeEstimation;
// number of record messages until next real sampling for each stream
private final Map<String, Integer> streamSampleCountdown;
// number of record messages
private final int sampleBatchSize;

/**
* The estimator will perform a real calculation once per sample batch. The size of the batch is
* determined by {@code sampleBatchSize}.
*/
public RecordSizeEstimator(final int sampleBatchSize) {
this.streamRecordSizeEstimation = new HashMap<>();
this.streamSampleCountdown = new HashMap<>();
this.sampleBatchSize = sampleBatchSize;
}

public RecordSizeEstimator() {
this(DEFAULT_SAMPLE_BATCH_SIZE);
}

public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) {
final String stream = recordMessage.getStream();
final Integer countdown = streamSampleCountdown.get(stream);

// this is a new stream; initialize its estimation
if (countdown == null) {
final long byteSize = getStringByteSize(recordMessage.getData());
streamRecordSizeEstimation.put(stream, byteSize);
streamSampleCountdown.put(stream, sampleBatchSize - 1);
return byteSize;
}

// this stream needs update; compute a new estimation
if (countdown <= 0) {
final long prevMeanByteSize = streamRecordSizeEstimation.get(stream);
final long currentByteSize = getStringByteSize(recordMessage.getData());
final long newMeanByteSize = prevMeanByteSize / 2 + currentByteSize / 2;
streamRecordSizeEstimation.put(stream, newMeanByteSize);
streamSampleCountdown.put(stream, sampleBatchSize - 1);
return newMeanByteSize;
}

// this stream does not need update; return current estimation
streamSampleCountdown.put(stream, countdown - 1);
return streamRecordSizeEstimation.get(stream);
}

@VisibleForTesting
static long getStringByteSize(final JsonNode data) {
// assume UTF-8 encoding, and each char is 4 bytes long
return Jsons.serialize(data).length() * 4L;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.bytes.ByteUtils;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand Down Expand Up @@ -316,7 +315,7 @@ private static List<AirbyteMessage> generateRecords(final long targetSizeInBytes
long bytesCounter = 0;
for (int i = 0;; i++) {
JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i)));
long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload));
long sizeInBytes = RecordSizeEstimator.getStringByteSize(payload);
bytesCounter += sizeInBytes;
AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.buffered_stream_consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import org.junit.jupiter.api.Test;

class RecordSizeEstimatorTest {

private static final JsonNode DATA_0 = Jsons.deserialize("{}");
private static final JsonNode DATA_1 = Jsons.deserialize("{ \"field1\": true }");
private static final JsonNode DATA_2 = Jsons.deserialize("{ \"field1\": 10000 }");
private static final long DATA_0_SIZE = RecordSizeEstimator.getStringByteSize(DATA_0);
private static final long DATA_1_SIZE = RecordSizeEstimator.getStringByteSize(DATA_1);
private static final long DATA_2_SIZE = RecordSizeEstimator.getStringByteSize(DATA_2);

@Test
public void testPeriodicSampling() {
// the estimate performs a size sampling every 3 records
final RecordSizeEstimator sizeEstimator = new RecordSizeEstimator(3);
final String stream = "stream";
final AirbyteRecordMessage record0 = new AirbyteRecordMessage().withStream(stream).withData(DATA_0);
final AirbyteRecordMessage record1 = new AirbyteRecordMessage().withStream(stream).withData(DATA_1);
final AirbyteRecordMessage record2 = new AirbyteRecordMessage().withStream(stream).withData(DATA_2);

// sample record message 1
final long firstEstimation = DATA_1_SIZE;
assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record1));
// next two calls return the first sampling result
assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record0));
assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record0));

// sample record message 2
final long secondEstimation = firstEstimation / 2 + DATA_2_SIZE / 2;
assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record2));
// next two calls return the second sampling result
assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record0));
assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record0));

// sample record message 1
final long thirdEstimation = secondEstimation / 2 + DATA_1_SIZE / 2;
assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record1));
// next two calls return the first sampling result
assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record0));
assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record0));
}

@Test
public void testDifferentEstimationPerStream() {
final RecordSizeEstimator sizeEstimator = new RecordSizeEstimator();
final AirbyteRecordMessage record0 = new AirbyteRecordMessage().withStream("stream1").withData(DATA_0);
final AirbyteRecordMessage record1 = new AirbyteRecordMessage().withStream("stream2").withData(DATA_1);
final AirbyteRecordMessage record2 = new AirbyteRecordMessage().withStream("stream3").withData(DATA_2);
assertEquals(DATA_0_SIZE, sizeEstimator.getEstimatedByteSize(record0));
assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record1));
assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record2));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV APPLICATION_VERSION 0.4.12
ENV APPLICATION_VERSION 0.4.13
ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.12
LABEL io.airbyte.version=0.4.13
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void insertRecordsInternal(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final String schemaName,
final String stage) {
LOGGER.info("actual size of batch for staging: {}", records.size());
LOGGER.info("Writing {} records to {}", records.size(), stage);

if (records.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
}
}
}
LOGGER.info("Total records read: {}", recordsRead);
try {
destination.notifyEndOfStream();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public Process create(final String jobId,
IOs.writeFile(jobRoot, file.getKey(), file.getValue());
}

LOGGER.info("Creating docker job ID: {}", jobId);
final List<String> cmd = Lists.newArrayList(
"docker",
"run",
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.4.13 | 2022-02-16 | [\#10394](https://github.com/airbytehq/airbyte/pull/10394) | Reduce memory footprint. |
| 0.4.12 | 2022-02-15 | [\#10342](https://github.com/airbytehq/airbyte/pull/10342) | Use connection pool, and fix connection leak. |
| 0.4.11 | 2022-02-14 | [\#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. |
| 0.4.10 | 2022-02-14 | [\#10297](https://github.com/airbytehq/airbyte/pull/10297) | Halve the record buffer size to reduce memory consumption. |
Expand Down