From 87f6596dbfa1fe550b890972170585dd21986218 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 11:46:06 -0800 Subject: [PATCH 01/19] Add detailed logging for flushing --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 3 ++- .../main/java/io/airbyte/workers/DefaultReplicationWorker.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index d670455e7b45e..f3f9b33449976 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -163,7 +163,6 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { // the destination final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { - LOGGER.info("Flushing buffer..."); flushQueueToDestination(bufferSizeInBytes); bufferSizeInBytes = 0; } @@ -181,8 +180,10 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { } private void flushQueueToDestination(long bufferSizeInBytes) throws Exception { + LOGGER.info("Flushing buffer: {} bytes", bufferSizeInBytes); AirbyteSentry.executeWithTracing("FlushBuffer", () -> { for (final Map.Entry> entry : streamBuffer.entrySet()) { + LOGGER.info("Flushing {}: {} records", entry.getKey().getName(), entry.getValue().size()); recordWriter.accept(entry.getKey(), entry.getValue()); if (checkAndRemoveRecordWriter != null) { fileName = checkAndRemoveRecordWriter.apply(entry.getKey(), fileName); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java index b3ffca261565c..66aee63c62df3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java @@ -301,6 +301,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, } } } + LOGGER.info("Total records read: {}", recordsRead); try { destination.notifyEndOfStream(); } catch (final Exception e) { From 5139b33f137c0d54c8457c51e5090d2c82ab0887 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 11:57:12 -0800 Subject: [PATCH 02/19] Log sentry transaction event id --- .../java/io/airbyte/integrations/base/IntegrationRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index f1cf6074daeda..fa007180851c9 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -85,6 +85,7 @@ public void run(final String[] args) throws Exception { integration.getClass().getSimpleName(), parsed.getCommand().toString(), true); + LOGGER.info("Sentry transaction event: {}", transaction.getEventId()); try { runInternal(transaction, parsed); transaction.finish(SpanStatus.OK); From 4af2bb23f19d773bb6c204260dedf124f97bc243 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 12:25:30 -0800 Subject: [PATCH 03/19] Adjust logging --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 6 +++--- .../snowflake/SnowflakeStagingSqlOperations.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index f3f9b33449976..6b6262aa172d9 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -163,7 +163,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { // the destination final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { - flushQueueToDestination(bufferSizeInBytes); + flushQueueToDestination(); bufferSizeInBytes = 0; } @@ -179,7 +179,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { } - private void flushQueueToDestination(long bufferSizeInBytes) throws Exception { + private void flushQueueToDestination() throws Exception { LOGGER.info("Flushing buffer: {} bytes", bufferSizeInBytes); AirbyteSentry.executeWithTracing("FlushBuffer", () -> { for (final Map.Entry> entry : streamBuffer.entrySet()) { @@ -216,7 +216,7 @@ protected void close(final boolean hasFailed) throws Exception { LOGGER.error("executing on failed close procedure."); } else { LOGGER.info("executing on success close procedure."); - flushQueueToDestination(bufferSizeInBytes); + flushQueueToDestination(); } try { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java index e11a5ec96b435..6ef030b42c5b6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java @@ -26,7 +26,7 @@ public void insertRecordsInternal(final JdbcDatabase database, final List records, final String schemaName, final String stage) { - LOGGER.info("actual size of batch for staging: {}", records.size()); + LOGGER.info("Writing {} records to {}", records.size(), stage); if (records.isEmpty()) { return; From be884a4cfa1a91f8210d8ccf2e2f7b9e2d9ba62a Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 12:40:17 -0800 Subject: [PATCH 04/19] Log memory usage --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 6b6262aa172d9..92fbefb6528e7 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -181,6 +181,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { private void flushQueueToDestination() throws Exception { LOGGER.info("Flushing buffer: {} bytes", bufferSizeInBytes); + + // TODO: remove this before merging: + final Runtime runtime = Runtime.getRuntime(); + LOGGER.info("Memory usage: total {}, max {}, free {}", runtime.totalMemory(), runtime.maxMemory(), runtime.freeMemory()); + AirbyteSentry.executeWithTracing("FlushBuffer", () -> { for (final Map.Entry> entry : streamBuffer.entrySet()) { LOGGER.info("Flushing {}: {} records", entry.getKey().getName(), entry.getValue().size()); From 1acb7ab697f8be0e109fb801ea9f54307f5fda52 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 18:12:26 -0800 Subject: [PATCH 05/19] Add jvm monitoring --- .../destination-snowflake/Dockerfile | 2 ++ .../destination-snowflake/build.gradle | 24 +++++++++---------- .../snowflake/SnowflakeDestination.java | 24 +++++++++++++++++++ .../workers/process/DockerProcessFactory.java | 9 +++++++ 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index ef9d102b78ac3..0212434daa46c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -10,12 +10,14 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION destination-snowflake +RUN apt-get update && apt-get install -y procps && rm -rf /var/lib/apt/lists/* # Needed for JDK17 (in turn, needed on M1 macs) - see https://github.com/snowflakedb/snowflake-jdbc/issues/589#issuecomment-983944767 ENV DESTINATION_SNOWFLAKE_OPTS "--add-opens java.base/java.nio=ALL-UNNAMED" COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar +EXPOSE 6000 RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV APPLICATION_VERSION 0.4.12 diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index ce455e15b71a7..5b97d1d03aa7e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -8,17 +8,17 @@ application { mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination' // enable when profiling applicationDefaultJvmArgs = [ - '-XX:+ExitOnOutOfMemoryError', - '-XX:MaxRAMPercentage=75.0', -// '-XX:NativeMemoryTracking=detail', -// "-Djava.rmi.server.hostname=localhost", -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// "-Dcom.sun.management.jmxremote.rmi.port=6000", -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false', -// '-agentpath:/usr/local/YourKit-JavaProfiler-2021.3/bin/linux-x86-64/libyjpagent.so=port=10001,listen=all' + '-XX:+ExitOnOutOfMemoryError', + '-XX:MaxRAMPercentage=75.0', + '-Xmx2000m', + '-XX:NativeMemoryTracking=detail', + '-Djava.rmi.server.hostname=localhost', + '-Dcom.sun.management.jmxremote=true', + '-Dcom.sun.management.jmxremote.port=6000', + "-Dcom.sun.management.jmxremote.rmi.port=6000", + '-Dcom.sun.management.jmxremote.local.only=false', + '-Dcom.sun.management.jmxremote.authenticate=false', + '-Dcom.sun.management.jmxremote.ssl=false', ] } @@ -29,7 +29,7 @@ dependencies { implementation 'net.snowflake:snowflake-jdbc:3.13.9' implementation 'org.apache.commons:commons-csv:1.4' implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' - implementation "io.aesy:datasize:1.0.0" + implementation 'io.aesy:datasize:1.0.0' implementation project(':airbyte-config:models') implementation project(':airbyte-db:lib') diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index e7da160fbe0bf..4a4585eaac70a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -9,10 +9,19 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SnowflakeDestination extends SwitchingDestination { + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestination.class); + enum DestinationType { COPY_S3, COPY_GCS, @@ -53,8 +62,23 @@ private static Map getTypeToDestination() { } public static void main(final String[] args) throws Exception { + int mb = 1024 * 1024; + MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); + long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb; + long xms = memoryBean.getHeapMemoryUsage().getInit() / mb; + LOGGER.info("Initial Memory (xms) : {} mb", xms); + LOGGER.info("Max Memory (xmx) : {} mb", xmx); + + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + service.scheduleAtFixedRate(() -> LOGGER.info( + "Used heap memory: {} mb, Used non-heap memory: {} mb", + memoryBean.getHeapMemoryUsage().getUsed() / mb, + memoryBean.getNonHeapMemoryUsage().getUsed() / mb), 0, 20, TimeUnit.SECONDS); + final Destination destination = new SnowflakeDestination(); new IntegrationRunner(destination).run(args); + + service.shutdown(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 6184690b5ad14..8fa61605e0b9b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -101,12 +101,15 @@ public Process create(final String jobId, IOs.writeFile(jobRoot, file.getKey(), file.getValue()); } + LOGGER.info("Creating docker job ID: {}", jobId); final List cmd = Lists.newArrayList( "docker", "run", "--rm", "--init", "-i", + "-p", + "9010:9010", "-w", rebasePath(jobRoot).toString(), // rebases the job root on the job data mount "--log-driver", @@ -117,6 +120,12 @@ public Process create(final String jobId, cmd.add(networkName); } + if (imageName.startsWith("airbyte/destination-snowflake")) { + LOGGER.info("Exposing image {} port 6000", imageName); + cmd.add("-p"); + cmd.add("6000:6000"); + } + if (workspaceMountSource != null) { cmd.add("-v"); cmd.add(String.format("%s:%s", workspaceMountSource, DATA_MOUNT_DESTINATION)); From 46e4690604c0ee65a5949665850758c9c8832584 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 20:13:17 -0800 Subject: [PATCH 06/19] Remove log --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 92fbefb6528e7..3ca1a69a80548 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -182,10 +182,6 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { private void flushQueueToDestination() throws Exception { LOGGER.info("Flushing buffer: {} bytes", bufferSizeInBytes); - // TODO: remove this before merging: - final Runtime runtime = Runtime.getRuntime(); - LOGGER.info("Memory usage: total {}, max {}, free {}", runtime.totalMemory(), runtime.maxMemory(), runtime.freeMemory()); - AirbyteSentry.executeWithTracing("FlushBuffer", () -> { for (final Map.Entry> entry : streamBuffer.entrySet()) { LOGGER.info("Flushing {}: {} records", entry.getKey().getName(), entry.getValue().size()); From 6e4e201dbeef0ca218398ec29c6a10ef3021bc0f Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 20:16:05 -0800 Subject: [PATCH 07/19] Remove port 9010 --- .../java/io/airbyte/workers/process/DockerProcessFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 8fa61605e0b9b..a7386da66b4a5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -108,8 +108,6 @@ public Process create(final String jobId, "--rm", "--init", "-i", - "-p", - "9010:9010", "-w", rebasePath(jobRoot).toString(), // rebases the job root on the job data mount "--log-driver", From 925c91c6e93c9d539abd30c8694a141530a5c48f Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 21:27:55 -0800 Subject: [PATCH 08/19] Remove host network mode --- .../connectors/destination-snowflake/build.gradle | 1 + .../io/airbyte/workers/process/DockerProcessFactory.java | 5 ----- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 5b97d1d03aa7e..453e0f201cfe5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -12,6 +12,7 @@ application { '-XX:MaxRAMPercentage=75.0', '-Xmx2000m', '-XX:NativeMemoryTracking=detail', + '-XX:+UsePerfData', '-Djava.rmi.server.hostname=localhost', '-Dcom.sun.management.jmxremote=true', '-Dcom.sun.management.jmxremote.port=6000', diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index a7386da66b4a5..f9a85cee788a9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -113,11 +113,6 @@ public Process create(final String jobId, "--log-driver", "none"); - if (networkName != null) { - cmd.add("--network"); - cmd.add(networkName); - } - if (imageName.startsWith("airbyte/destination-snowflake")) { LOGGER.info("Exposing image {} port 6000", imageName); cmd.add("-p"); From df0b605be6afb1de522d2c6b030421aa9126fd57 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 22:24:30 -0800 Subject: [PATCH 09/19] Sample record size --- .../io/airbyte/commons/bytes/ByteUtils.java | 23 -------- .../airbyte/commons/bytes/ByteUtilsTest.java | 24 -------- .../BufferedStreamConsumer.java | 8 +-- .../RecordSizeEstimator.java | 55 +++++++++++++++++++ .../BufferedStreamConsumerTest.java | 4 +- 5 files changed, 60 insertions(+), 54 deletions(-) delete mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java delete mode 100644 airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java b/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java deleted file mode 100644 index 1ba95a5b735d0..0000000000000 --- a/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.bytes; - -import java.nio.charset.StandardCharsets; - -public class ByteUtils { - - /** - * Encodes this String into a sequence of bytes using the given charset. UTF-8 is based on 8-bit - * code units. Each character is encoded as 1 to 4 bytes. The first 128 Unicode code points are - * encoded as 1 byte in UTF-8. - * - * @param s - string where charset length will be counted - * @return length of bytes for charset - */ - public static long getSizeInBytesForUTF8CharSet(String s) { - return s.getBytes(StandardCharsets.UTF_8).length; - } - -} diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java deleted file mode 100644 index f8f4c1d8d9bd7..0000000000000 --- a/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.bytes; - -import static org.junit.jupiter.api.Assertions.*; - -import java.nio.charset.StandardCharsets; -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.jupiter.api.Test; - -class ByteUtilsTest { - - @Test - public void testIt() { - for (int i = 1; i < 1000; i++) { - String s = RandomStringUtils.random(i); - // for now the formula is just hardcoded to str length * 2 - assertEquals(s.getBytes(StandardCharsets.UTF_8).length, ByteUtils.getSizeInBytesForUTF8CharSet(s), "The bytes length should be equal."); - } - } - -} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 3ca1a69a80548..58dd939a7ccec 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; -import io.airbyte.commons.bytes.ByteUtils; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -85,6 +84,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume private final Map streamToIgnoredRecordCount; private final Consumer outputRecordCollector; private final long maxQueueSizeInBytes; + private final RecordSizeEstimator recordSizeEstimator; private long bufferSizeInBytes; private Map> streamBuffer; private String fileName; @@ -128,6 +128,7 @@ public BufferedStreamConsumer(final Consumer outputRecordCollect this.bufferSizeInBytes = 0; this.streamToIgnoredRecordCount = new HashMap<>(); this.streamBuffer = new HashMap<>(); + this.recordSizeEstimator = new RecordSizeEstimator(); } @Override @@ -158,10 +159,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { return; } - // TODO use a more efficient way to compute bytes that doesn't require double serialization (records - // are serialized again when writing to - // the destination - final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); + final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(recordMessage); if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { flushQueueToDestination(); bufferSizeInBytes = 0; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java new file mode 100644 index 0000000000000..6366b9b2d5477 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -0,0 +1,55 @@ +package io.airbyte.integrations.destination.buffered_stream_consumer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.HashMap; +import java.util.Map; + +public class RecordSizeEstimator { + + // by default, perform one estimation for every 20 records + private static final int DEFAULT_SAMPLE_BATCH_SIZE = 20; + + // latest estimated record message size for each stream + private final Map streamRecordSizes; + // number of record messages until next real sampling for each stream + private final Map streamSampleCountdown; + // number of record messages + private final int sampleBatchSize; + + /** + * The estimator will perform a real calculation once per sample batch. + * The size of the batch is determined by {@code sampleBatchSize}. + */ + public RecordSizeEstimator(final int sampleBatchSize) { + this.streamRecordSizes = new HashMap<>(); + this.streamSampleCountdown = new HashMap<>(); + this.sampleBatchSize = sampleBatchSize; + } + + public RecordSizeEstimator() { + this(DEFAULT_SAMPLE_BATCH_SIZE); + } + + public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) { + final String stream = recordMessage.getStream(); + final Integer countdown = streamSampleCountdown.get(stream); + if (countdown == null || countdown <= 0) { + final long byteSize = getEstimatedByteSize(recordMessage.getData()); + streamRecordSizes.put(stream, byteSize); + streamSampleCountdown.put(stream, sampleBatchSize); + return byteSize; + } + streamSampleCountdown.put(stream, countdown - 1); + return streamRecordSizes.get(stream); + } + + @VisibleForTesting + public long getEstimatedByteSize(final JsonNode data) { + // assume UTF-8 encoding, and each char is 4 bytes long + return Jsons.serialize(data).length() * 4L; + } + +} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index ee52350bc1985..47d67ac7ee3e5 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.airbyte.commons.bytes.ByteUtils; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -62,6 +61,7 @@ public class BufferedStreamConsumerTest { private static final AirbyteMessage STATE_MESSAGE2 = new AirbyteMessage() .withType(Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("state_message_id", 2)))); + private static final RecordSizeEstimator RECORD_SIZE_ESTIMATOR = new RecordSizeEstimator(); private BufferedStreamConsumer consumer; private VoidCallable onStart; @@ -316,7 +316,7 @@ private static List generateRecords(final long targetSizeInBytes long bytesCounter = 0; for (int i = 0;; i++) { JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i))); - long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload)); + long sizeInBytes = RECORD_SIZE_ESTIMATOR.getEstimatedByteSize(payload); bytesCounter += sizeInBytes; AirbyteMessage airbyteMessage = new AirbyteMessage() .withType(Type.RECORD) From 097ec57869a64027f5b7858aa8bb9575844e8b76 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 16 Feb 2022 23:42:08 -0800 Subject: [PATCH 10/19] Remove profiling code --- .../destination-snowflake/Dockerfile | 2 -- .../destination-snowflake/build.gradle | 22 ++++++++--------- .../snowflake/SnowflakeDestination.java | 24 ------------------- .../workers/process/DockerProcessFactory.java | 7 +++--- 4 files changed, 14 insertions(+), 41 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 0212434daa46c..ef9d102b78ac3 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -10,14 +10,12 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION destination-snowflake -RUN apt-get update && apt-get install -y procps && rm -rf /var/lib/apt/lists/* # Needed for JDK17 (in turn, needed on M1 macs) - see https://github.com/snowflakedb/snowflake-jdbc/issues/589#issuecomment-983944767 ENV DESTINATION_SNOWFLAKE_OPTS "--add-opens java.base/java.nio=ALL-UNNAMED" COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar -EXPOSE 6000 RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV APPLICATION_VERSION 0.4.12 diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 453e0f201cfe5..3c9cc271a87a0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -6,20 +6,20 @@ plugins { application { mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination' -// enable when profiling applicationDefaultJvmArgs = [ '-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', - '-Xmx2000m', - '-XX:NativeMemoryTracking=detail', - '-XX:+UsePerfData', - '-Djava.rmi.server.hostname=localhost', - '-Dcom.sun.management.jmxremote=true', - '-Dcom.sun.management.jmxremote.port=6000', - "-Dcom.sun.management.jmxremote.rmi.port=6000", - '-Dcom.sun.management.jmxremote.local.only=false', - '-Dcom.sun.management.jmxremote.authenticate=false', - '-Dcom.sun.management.jmxremote.ssl=false', +// uncomment when profiling +// '-Xmx256m', +// '-XX:NativeMemoryTracking=detail', +// '-XX:+UsePerfData', +// '-Djava.rmi.server.hostname=localhost', +// '-Dcom.sun.management.jmxremote=true', +// '-Dcom.sun.management.jmxremote.port=6000', +// "-Dcom.sun.management.jmxremote.rmi.port=6000", +// '-Dcom.sun.management.jmxremote.local.only=false', +// '-Dcom.sun.management.jmxremote.authenticate=false', +// '-Dcom.sun.management.jmxremote.ssl=false', ] } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 4a4585eaac70a..e7da160fbe0bf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -9,19 +9,10 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SnowflakeDestination extends SwitchingDestination { - private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestination.class); - enum DestinationType { COPY_S3, COPY_GCS, @@ -62,23 +53,8 @@ private static Map getTypeToDestination() { } public static void main(final String[] args) throws Exception { - int mb = 1024 * 1024; - MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); - long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb; - long xms = memoryBean.getHeapMemoryUsage().getInit() / mb; - LOGGER.info("Initial Memory (xms) : {} mb", xms); - LOGGER.info("Max Memory (xmx) : {} mb", xmx); - - ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - service.scheduleAtFixedRate(() -> LOGGER.info( - "Used heap memory: {} mb, Used non-heap memory: {} mb", - memoryBean.getHeapMemoryUsage().getUsed() / mb, - memoryBean.getNonHeapMemoryUsage().getUsed() / mb), 0, 20, TimeUnit.SECONDS); - final Destination destination = new SnowflakeDestination(); new IntegrationRunner(destination).run(args); - - service.shutdown(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index f9a85cee788a9..709a12372567c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -113,10 +113,9 @@ public Process create(final String jobId, "--log-driver", "none"); - if (imageName.startsWith("airbyte/destination-snowflake")) { - LOGGER.info("Exposing image {} port 6000", imageName); - cmd.add("-p"); - cmd.add("6000:6000"); + if (networkName != null) { + cmd.add("--network"); + cmd.add(networkName); } if (workspaceMountSource != null) { From 9f7a73d0b107d31ae3962c9bf5b0411ea7b502e3 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 00:48:40 -0800 Subject: [PATCH 11/19] Add unit tests --- .../RecordSizeEstimator.java | 6 +-- .../BufferedStreamConsumerTest.java | 3 +- .../RecordSizeEstimatorTest.java | 52 +++++++++++++++++++ 3 files changed, 56 insertions(+), 5 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java index 6366b9b2d5477..35c919e71971d 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -37,9 +37,9 @@ public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) { final String stream = recordMessage.getStream(); final Integer countdown = streamSampleCountdown.get(stream); if (countdown == null || countdown <= 0) { - final long byteSize = getEstimatedByteSize(recordMessage.getData()); + final long byteSize = getStringByteSize(recordMessage.getData()); streamRecordSizes.put(stream, byteSize); - streamSampleCountdown.put(stream, sampleBatchSize); + streamSampleCountdown.put(stream, sampleBatchSize - 1); return byteSize; } streamSampleCountdown.put(stream, countdown - 1); @@ -47,7 +47,7 @@ public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) { } @VisibleForTesting - public long getEstimatedByteSize(final JsonNode data) { + static long getStringByteSize(final JsonNode data) { // assume UTF-8 encoding, and each char is 4 bytes long return Jsons.serialize(data).length() * 4L; } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 47d67ac7ee3e5..fea5e1c5d7e55 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -61,7 +61,6 @@ public class BufferedStreamConsumerTest { private static final AirbyteMessage STATE_MESSAGE2 = new AirbyteMessage() .withType(Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("state_message_id", 2)))); - private static final RecordSizeEstimator RECORD_SIZE_ESTIMATOR = new RecordSizeEstimator(); private BufferedStreamConsumer consumer; private VoidCallable onStart; @@ -316,7 +315,7 @@ private static List generateRecords(final long targetSizeInBytes long bytesCounter = 0; for (int i = 0;; i++) { JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i))); - long sizeInBytes = RECORD_SIZE_ESTIMATOR.getEstimatedByteSize(payload); + long sizeInBytes = RecordSizeEstimator.getStringByteSize(payload); bytesCounter += sizeInBytes; AirbyteMessage airbyteMessage = new AirbyteMessage() .withType(Type.RECORD) diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java new file mode 100644 index 0000000000000..6a972cdda54c9 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java @@ -0,0 +1,52 @@ +package io.airbyte.integrations.destination.buffered_stream_consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.junit.jupiter.api.Test; + +class RecordSizeEstimatorTest { + + private static final JsonNode DATA_0 = Jsons.deserialize("{}"); + private static final JsonNode DATA_1 = Jsons.deserialize("{ \"field1\": true }"); + private static final JsonNode DATA_2 = Jsons.deserialize("{ \"field1\": 10000 }"); + private static final long DATA_0_SIZE = RecordSizeEstimator.getStringByteSize(DATA_0); + private static final long DATA_1_SIZE = RecordSizeEstimator.getStringByteSize(DATA_1); + private static final long DATA_2_SIZE = RecordSizeEstimator.getStringByteSize(DATA_2); + + @Test + public void testPeriodicSampling() { + // the estimate performs a size sampling every 3 records + final RecordSizeEstimator sizeEstimator = new RecordSizeEstimator(3); + final String stream = "stream"; + final AirbyteRecordMessage record0 = new AirbyteRecordMessage().withStream(stream).withData(DATA_0); + final AirbyteRecordMessage record1 = new AirbyteRecordMessage().withStream(stream).withData(DATA_1); + final AirbyteRecordMessage record2 = new AirbyteRecordMessage().withStream(stream).withData(DATA_2); + + // sample record message 1 + assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record1)); + // next two calls return the first sampling result + assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + + // sample record message 2 + assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record2)); + // next two calls return the second sampling result + assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + } + + @Test + public void testDifferentEstimationPerStream() { + final RecordSizeEstimator sizeEstimator = new RecordSizeEstimator(); + final AirbyteRecordMessage record0 = new AirbyteRecordMessage().withStream("stream1").withData(DATA_0); + final AirbyteRecordMessage record1 = new AirbyteRecordMessage().withStream("stream2").withData(DATA_1); + final AirbyteRecordMessage record2 = new AirbyteRecordMessage().withStream("stream3").withData(DATA_2); + assertEquals(DATA_0_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record1)); + assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record2)); + } + +} From 907e684e09149a48d227a97d08677ca2373dbf81 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 00:59:52 -0800 Subject: [PATCH 12/19] Use average estimation --- .../RecordSizeEstimator.java | 22 ++++++++++++++++++- .../RecordSizeEstimatorTest.java | 21 +++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java index 35c919e71971d..2c4f879fb6ca2 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -7,6 +7,12 @@ import java.util.HashMap; import java.util.Map; +/** + * This class estimate the byte size of the record message. To reduce memory footprint, + * 1) it assumes that a character is always four bytes, and 2) it only performs a sampling + * every N records. The size of the samples are averaged together to protect the estimation + * against outliers. + */ public class RecordSizeEstimator { // by default, perform one estimation for every 20 records @@ -36,12 +42,26 @@ public RecordSizeEstimator() { public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) { final String stream = recordMessage.getStream(); final Integer countdown = streamSampleCountdown.get(stream); - if (countdown == null || countdown <= 0) { + + // this is a new stream; initialize its estimation + if (countdown == null) { final long byteSize = getStringByteSize(recordMessage.getData()); streamRecordSizes.put(stream, byteSize); streamSampleCountdown.put(stream, sampleBatchSize - 1); return byteSize; } + + // this stream needs update; compute a new estimation + if (countdown <= 0) { + final long prevMeanByteSize = streamRecordSizes.get(stream); + final long currentByteSize = getStringByteSize(recordMessage.getData()); + final long newMeanByteSize = prevMeanByteSize / 2 + currentByteSize / 2; + streamRecordSizes.put(stream, newMeanByteSize); + streamSampleCountdown.put(stream, sampleBatchSize - 1); + return newMeanByteSize; + } + + // this stream does not need update; return current estimation streamSampleCountdown.put(stream, countdown - 1); return streamRecordSizes.get(stream); } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java index 6a972cdda54c9..8cc0c6f63ca89 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java @@ -26,16 +26,25 @@ public void testPeriodicSampling() { final AirbyteRecordMessage record2 = new AirbyteRecordMessage().withStream(stream).withData(DATA_2); // sample record message 1 - assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record1)); + final long firstEstimation = DATA_1_SIZE; + assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record1)); // next two calls return the first sampling result - assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record0)); - assertEquals(DATA_1_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(firstEstimation, sizeEstimator.getEstimatedByteSize(record0)); // sample record message 2 - assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record2)); + final long secondEstimation = firstEstimation / 2 + DATA_2_SIZE / 2; + assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record2)); // next two calls return the second sampling result - assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record0)); - assertEquals(DATA_2_SIZE, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(secondEstimation, sizeEstimator.getEstimatedByteSize(record0)); + + // sample record message 1 + final long thirdEstimation = secondEstimation / 2 + DATA_1_SIZE / 2; + assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record1)); + // next two calls return the first sampling result + assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record0)); + assertEquals(thirdEstimation, sizeEstimator.getEstimatedByteSize(record0)); } @Test From a0bd988b746161b773cc0142668cb24db48f2872 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 01:01:13 -0800 Subject: [PATCH 13/19] Rename variable --- .../RecordSizeEstimator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java index 2c4f879fb6ca2..d125ac3105f19 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -19,7 +19,7 @@ public class RecordSizeEstimator { private static final int DEFAULT_SAMPLE_BATCH_SIZE = 20; // latest estimated record message size for each stream - private final Map streamRecordSizes; + private final Map streamRecordSizeEstimation; // number of record messages until next real sampling for each stream private final Map streamSampleCountdown; // number of record messages @@ -30,7 +30,7 @@ public class RecordSizeEstimator { * The size of the batch is determined by {@code sampleBatchSize}. */ public RecordSizeEstimator(final int sampleBatchSize) { - this.streamRecordSizes = new HashMap<>(); + this.streamRecordSizeEstimation = new HashMap<>(); this.streamSampleCountdown = new HashMap<>(); this.sampleBatchSize = sampleBatchSize; } @@ -46,24 +46,24 @@ public long getEstimatedByteSize(final AirbyteRecordMessage recordMessage) { // this is a new stream; initialize its estimation if (countdown == null) { final long byteSize = getStringByteSize(recordMessage.getData()); - streamRecordSizes.put(stream, byteSize); + streamRecordSizeEstimation.put(stream, byteSize); streamSampleCountdown.put(stream, sampleBatchSize - 1); return byteSize; } // this stream needs update; compute a new estimation if (countdown <= 0) { - final long prevMeanByteSize = streamRecordSizes.get(stream); + final long prevMeanByteSize = streamRecordSizeEstimation.get(stream); final long currentByteSize = getStringByteSize(recordMessage.getData()); final long newMeanByteSize = prevMeanByteSize / 2 + currentByteSize / 2; - streamRecordSizes.put(stream, newMeanByteSize); + streamRecordSizeEstimation.put(stream, newMeanByteSize); streamSampleCountdown.put(stream, sampleBatchSize - 1); return newMeanByteSize; } // this stream does not need update; return current estimation streamSampleCountdown.put(stream, countdown - 1); - return streamRecordSizes.get(stream); + return streamRecordSizeEstimation.get(stream); } @VisibleForTesting From e65f8f5332dd69b1004e478da7b6c216743899db Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 01:01:41 -0800 Subject: [PATCH 14/19] Format code --- .../RecordSizeEstimator.java | 16 ++++++++++------ .../RecordSizeEstimatorTest.java | 4 ++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java index d125ac3105f19..cceada7869961 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.buffered_stream_consumer; import com.fasterxml.jackson.databind.JsonNode; @@ -8,10 +12,10 @@ import java.util.Map; /** - * This class estimate the byte size of the record message. To reduce memory footprint, - * 1) it assumes that a character is always four bytes, and 2) it only performs a sampling - * every N records. The size of the samples are averaged together to protect the estimation - * against outliers. + * This class estimate the byte size of the record message. To reduce memory footprint, 1) it + * assumes that a character is always four bytes, and 2) it only performs a sampling every N + * records. The size of the samples are averaged together to protect the estimation against + * outliers. */ public class RecordSizeEstimator { @@ -26,8 +30,8 @@ public class RecordSizeEstimator { private final int sampleBatchSize; /** - * The estimator will perform a real calculation once per sample batch. - * The size of the batch is determined by {@code sampleBatchSize}. + * The estimator will perform a real calculation once per sample batch. The size of the batch is + * determined by {@code sampleBatchSize}. */ public RecordSizeEstimator(final int sampleBatchSize) { this.streamRecordSizeEstimation = new HashMap<>(); diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java index 8cc0c6f63ca89..64e21425ef7b6 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimatorTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.buffered_stream_consumer; import static org.junit.jupiter.api.Assertions.assertEquals; From 50af309d9214135a0d2844dc0b8020c589ee330d Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 01:03:41 -0800 Subject: [PATCH 15/19] Bump version --- .../connectors/destination-snowflake/Dockerfile | 4 ++-- docs/integrations/destinations/snowflake.md | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index ef9d102b78ac3..5fb9f66fb8500 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -ENV APPLICATION_VERSION 0.4.12 +ENV APPLICATION_VERSION 0.4.13 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.12 +LABEL io.airbyte.version=0.4.13 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index d6ab229ed5171..69ecae9723e78 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -224,6 +224,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.4.13 | 2022-02-16 | [\#10394](https://github.com/airbytehq/airbyte/pull/10394) | Reduce memory footprint. | | 0.4.12 | 2022-02-15 | [\#10342](https://github.com/airbytehq/airbyte/pull/10342) | Use connection pool, and fix connection leak. | | 0.4.11 | 2022-02-14 | [\#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | | 0.4.10 | 2022-02-14 | [\#10297](https://github.com/airbytehq/airbyte/pull/10297) | Halve the record buffer size to reduce memory consumption. | From 51d0a4facfb20b015710b5e5c638c966ca496c55 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 01:34:21 -0800 Subject: [PATCH 16/19] Revert unnecessary change --- .../destination-snowflake/build.gradle | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 3c9cc271a87a0..ce455e15b71a7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -6,20 +6,19 @@ plugins { application { mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination' +// enable when profiling applicationDefaultJvmArgs = [ - '-XX:+ExitOnOutOfMemoryError', - '-XX:MaxRAMPercentage=75.0', -// uncomment when profiling -// '-Xmx256m', -// '-XX:NativeMemoryTracking=detail', -// '-XX:+UsePerfData', -// '-Djava.rmi.server.hostname=localhost', -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// "-Dcom.sun.management.jmxremote.rmi.port=6000", -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false', + '-XX:+ExitOnOutOfMemoryError', + '-XX:MaxRAMPercentage=75.0', +// '-XX:NativeMemoryTracking=detail', +// "-Djava.rmi.server.hostname=localhost", +// '-Dcom.sun.management.jmxremote=true', +// '-Dcom.sun.management.jmxremote.port=6000', +// "-Dcom.sun.management.jmxremote.rmi.port=6000", +// '-Dcom.sun.management.jmxremote.local.only=false', +// '-Dcom.sun.management.jmxremote.authenticate=false', +// '-Dcom.sun.management.jmxremote.ssl=false', +// '-agentpath:/usr/local/YourKit-JavaProfiler-2021.3/bin/linux-x86-64/libyjpagent.so=port=10001,listen=all' ] } @@ -30,7 +29,7 @@ dependencies { implementation 'net.snowflake:snowflake-jdbc:3.13.9' implementation 'org.apache.commons:commons-csv:1.4' implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' - implementation 'io.aesy:datasize:1.0.0' + implementation "io.aesy:datasize:1.0.0" implementation project(':airbyte-config:models') implementation project(':airbyte-db:lib') From a46b54687fc2ddd769dd9e2743c51385d26582c7 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 11:22:36 -0800 Subject: [PATCH 17/19] Update doc --- docs/integrations/destinations/snowflake.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 0af5b7cb7bc2b..aa39e657f2b7f 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -224,7 +224,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | -| 0.4.14 | 2022-02-16 | [\#10394](https://github.com/airbytehq/airbyte/pull/10394) | Reduce memory footprint. | +| 0.4.14 | 2022-02-17 | [\#10394](https://github.com/airbytehq/airbyte/pull/10394) | Reduce memory footprint. | | 0.4.13 | 2022-02-16 | [\#10212](https://github.com/airbytehq/airbyte/pull/10212) | Execute COPY command in parallel for S3 and GCS staging | | 0.4.12 | 2022-02-15 | [\#10342](https://github.com/airbytehq/airbyte/pull/10342) | Use connection pool, and fix connection leak. | | 0.4.11 | 2022-02-14 | [\#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | From 9bed378105f4d782f9fe9cb9aa35954a4ab71973 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 11:57:20 -0800 Subject: [PATCH 18/19] Fix format --- airbyte-cdk/python/airbyte_cdk/sources/streams/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index fd6b323241283..f419c4422749d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -171,7 +171,7 @@ def state_checkpoint_interval(self) -> Optional[int]: """ return None - @deprecated(version='0.1.49', reason="You should use explicit state property instead, see IncrementalMixin docs.") + @deprecated(version="0.1.49", reason="You should use explicit state property instead, see IncrementalMixin docs.") def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): """Override to extract state from the latest record. Needed to implement incremental sync. From 524f7a3a8b5f85018fa021601f8ea4b879a58c55 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 17 Feb 2022 12:54:48 -0800 Subject: [PATCH 19/19] Bump version in seed --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index d055a53ed0210..ffdedd3827683 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -185,7 +185,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.13 + dockerImageTag: 0.4.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg - name: MariaDB ColumnStore diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index dd5f3182b6390..9ebd2407a385b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3825,7 +3825,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.13" +- dockerImage: "airbyte/destination-snowflake:0.4.14" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: