From cd0cfec3eed8585c27be8b80b395c03c2c59977c Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 11 Feb 2022 16:31:32 -0800 Subject: [PATCH 1/8] Avoid redundant adapter construction --- .../destination/jdbc/JdbcSqlOperations.java | 11 +++++------ .../destination/postgres/PostgresSqlOperations.java | 9 +++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java index f8e580f64d0fd..b3f0a66e22ec2 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java @@ -19,15 +19,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class JdbcSqlOperations implements SqlOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSqlOperations.class); protected static final String SHOW_SCHEMAS = "show schemas;"; protected static final String NAME = "name"; @@ -137,7 +135,8 @@ public final void insertRecords(final JdbcDatabase database, throws Exception { AirbyteSentry.executeWithTracing("InsertRecords", () -> { - records.forEach(airbyteRecordMessage -> getDataAdapter().adapt(airbyteRecordMessage.getData())); + final Optional dataAdapter = getDataAdapter(); + dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData()))); insertRecordsInternal(database, records, schemaName, tableName); }, Map.of("schema", Objects.requireNonNullElse(schemaName, "null"), "table", tableName, "recordCount", records.size())); @@ -149,8 +148,8 @@ protected abstract void insertRecordsInternal(JdbcDatabase database, String tableName) throws Exception; - protected DataAdapter getDataAdapter() { - return new DataAdapter(j -> false, c -> c); + protected Optional getDataAdapter() { + return Optional.empty(); } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java index 734221840b981..f8aab3dff1e5e 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java @@ -15,15 +15,12 @@ import java.nio.file.Files; import java.sql.SQLException; import java.util.List; +import java.util.Optional; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PostgresSqlOperations extends JdbcSqlOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlOperations.class); - @Override public void insertRecordsInternal(final JdbcDatabase database, final List records, @@ -59,8 +56,8 @@ public void insertRecordsInternal(final JdbcDatabase database, } @Override - protected DataAdapter getDataAdapter() { - return new PostgresDataAdapter(); + protected Optional getDataAdapter() { + return Optional.of(new PostgresDataAdapter()); } } From 3aa23896e5456451dab6fc02af4e138c472b25a3 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 11 Feb 2022 16:33:46 -0800 Subject: [PATCH 2/8] Remove unused logger --- .../io/airbyte/integrations/destination/jdbc/DataAdapter.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java index 1bf4824d1250f..fac3ff8989d2d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java @@ -8,13 +8,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.function.Function; import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DataAdapter { - private static final Logger LOGGER = LoggerFactory.getLogger(DataAdapter.class); - private final Predicate filterValueNode; private final Function valueNodeAdapter; From 8b9a6aaa49b4f5c22098b4039834cfc15ab726b4 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 11 Feb 2022 22:20:46 -0800 Subject: [PATCH 3/8] Avoid redundant creation of buffer map --- .../BufferedStreamConsumer.java | 46 ++++++++----------- .../destination/jdbc/JdbcSqlOperations.java | 11 +---- .../snowflake/SnowflakeDestination.java | 4 -- 3 files changed, 21 insertions(+), 40 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index d2944e2b25209..8dd6acd0155df 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -23,10 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,13 +79,13 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume private final RecordWriter recordWriter; private final CheckedConsumer onClose; private final Set streamNames; - private final List buffer; private final ConfiguredAirbyteCatalog catalog; private final CheckedFunction isValidRecord; - private final Map pairToIgnoredRecordCount; + private final Map streamToIgnoredRecordCount; private final Consumer outputRecordCollector; private final long maxQueueSizeInBytes; private long bufferSizeInBytes; + private Map> streamBuffer; private boolean hasStarted; private boolean hasClosed; @@ -112,9 +110,9 @@ public BufferedStreamConsumer(final Consumer outputRecordCollect this.catalog = catalog; this.streamNames = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); this.isValidRecord = isValidRecord; - this.buffer = new ArrayList<>(10_000); this.bufferSizeInBytes = 0; - this.pairToIgnoredRecordCount = new HashMap<>(); + this.streamToIgnoredRecordCount = new HashMap<>(); + this.streamBuffer = new HashMap<>(); } @Override @@ -123,7 +121,7 @@ protected void startTracked() throws Exception { Preconditions.checkState(!hasStarted, "Consumer has already been started."); hasStarted = true; - pairToIgnoredRecordCount.clear(); + streamToIgnoredRecordCount.clear(); LOGGER.info("{} started.", BufferedStreamConsumer.class); onStart.call(); @@ -141,7 +139,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { } if (!isValidRecord.apply(message.getRecord().getData())) { - pairToIgnoredRecordCount.put(stream, pairToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L); + streamToIgnoredRecordCount.put(stream, streamToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L); return; } @@ -151,15 +149,12 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { LOGGER.info("Flushing buffer..."); - AirbyteSentry.executeWithTracing("FlushBuffer", - this::flushQueueToDestination, - Map.of("stream", stream.getName(), - "namespace", Objects.requireNonNullElse(stream.getNamespace(), "null"), - "bufferSizeInBytes", bufferSizeInBytes)); + flushQueueToDestination(bufferSizeInBytes); bufferSizeInBytes = 0; } - buffer.add(message); + final List bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>()); + bufferedRecords.add(message.getRecord()); bufferSizeInBytes += messageSizeInBytes; } else if (message.getType() == Type.STATE) { @@ -170,16 +165,13 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { } - private void flushQueueToDestination() throws Exception { - final Map> recordsByStream = buffer.stream() - .map(AirbyteMessage::getRecord) - .collect(Collectors.groupingBy(AirbyteStreamNameNamespacePair::fromRecordMessage)); - - buffer.clear(); - - for (final Map.Entry> entry : recordsByStream.entrySet()) { - recordWriter.accept(entry.getKey(), entry.getValue()); - } + private void flushQueueToDestination(long bufferSizeInBytes) throws Exception { + AirbyteSentry.executeWithTracing("FlushBuffer", () -> { + for (final Map.Entry> entry : streamBuffer.entrySet()) { + recordWriter.accept(entry.getKey(), entry.getValue()); + } + }, Map.of("bufferSizeInBytes", bufferSizeInBytes)); + streamBuffer = new HashMap<>(); if (pendingState != null) { lastFlushedState = pendingState; @@ -199,13 +191,13 @@ protected void close(final boolean hasFailed) throws Exception { Preconditions.checkState(!hasClosed, "Has already closed."); hasClosed = true; - pairToIgnoredRecordCount - .forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair)); + streamToIgnoredRecordCount.forEach((pair, count) -> + LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair)); if (hasFailed) { LOGGER.error("executing on failed close procedure."); } else { LOGGER.info("executing on success close procedure."); - flushQueueToDestination(); + flushQueueToDestination(bufferSizeInBytes); } try { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java index b3f0a66e22ec2..c16eb4935ba3e 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java @@ -61,21 +61,14 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN } protected void writeBatchToFile(final File tmpFile, final List records) throws Exception { - PrintWriter writer = null; - try { - writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8); - final var csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - + try (final PrintWriter writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8); + final CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) { for (final AirbyteRecordMessage record : records) { final var uuid = UUID.randomUUID().toString(); final var jsonData = Jsons.serialize(formatData(record.getData())); final var emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt())); csvPrinter.printRecord(uuid, jsonData, emittedAt); } - } finally { - if (writer != null) { - writer.close(); - } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 833605b402914..e7da160fbe0bf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -10,13 +10,9 @@ import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SnowflakeDestination extends SwitchingDestination { - private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestination.class); - enum DestinationType { COPY_S3, COPY_GCS, From 069f6100684a65d5a47e50717fb5ab40bd2ea271 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 11 Feb 2022 22:26:44 -0800 Subject: [PATCH 4/8] Decrease max batch byte size to 128 mb --- .../snowflake/SnowflakeInternalStagingConsumerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java index d3079108b4ff6..79df935ad7128 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java @@ -46,7 +46,7 @@ public class SnowflakeInternalStagingConsumerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class); - private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb + private static final long MAX_BATCH_SIZE_BYTES = 128 * 1024 * 1024; // 128mb private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString(); public AirbyteMessageConsumer create(final Consumer outputRecordCollector, From b2668ecb914d44144644f7685cf47c576f6b5afa Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 14 Feb 2022 21:27:06 -0800 Subject: [PATCH 5/8] Format code --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 8dd6acd0155df..56890a1870ca6 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -191,8 +191,8 @@ protected void close(final boolean hasFailed) throws Exception { Preconditions.checkState(!hasClosed, "Has already closed."); hasClosed = true; - streamToIgnoredRecordCount.forEach((pair, count) -> - LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair)); + streamToIgnoredRecordCount + .forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair)); if (hasFailed) { LOGGER.error("executing on failed close procedure."); } else { From 80d1f1019b10e70bfcb5da190f21ac66f8a3bfbc Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 14 Feb 2022 21:37:09 -0800 Subject: [PATCH 6/8] Move data adapter to an instance variable --- .../destination/jdbc/JdbcSqlOperations.java | 17 ++++++++++++----- .../postgres/PostgresSqlOperations.java | 11 ++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java index c16eb4935ba3e..9ee5011259209 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java @@ -24,11 +24,23 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public abstract class JdbcSqlOperations implements SqlOperations { protected static final String SHOW_SCHEMAS = "show schemas;"; protected static final String NAME = "name"; + // this adapter modifies record message before inserting them to the destination + protected final Optional dataAdapter; + + protected JdbcSqlOperations() { + this.dataAdapter = Optional.empty(); + } + + protected JdbcSqlOperations(final DataAdapter dataAdapter) { + this.dataAdapter = Optional.of(dataAdapter); + } + @Override public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception { if (!isSchemaExists(database, schemaName)) { @@ -128,7 +140,6 @@ public final void insertRecords(final JdbcDatabase database, throws Exception { AirbyteSentry.executeWithTracing("InsertRecords", () -> { - final Optional dataAdapter = getDataAdapter(); dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData()))); insertRecordsInternal(database, records, schemaName, tableName); }, @@ -141,8 +152,4 @@ protected abstract void insertRecordsInternal(JdbcDatabase database, String tableName) throws Exception; - protected Optional getDataAdapter() { - return Optional.empty(); - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java index f8aab3dff1e5e..27d1caa5f67cd 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.postgres; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.destination.jdbc.DataAdapter; import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.BufferedReader; @@ -15,12 +14,15 @@ import java.nio.file.Files; import java.sql.SQLException; import java.util.List; -import java.util.Optional; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; public class PostgresSqlOperations extends JdbcSqlOperations { + public PostgresSqlOperations() { + super(new PostgresDataAdapter()); + } + @Override public void insertRecordsInternal(final JdbcDatabase database, final List records, @@ -55,9 +57,4 @@ public void insertRecordsInternal(final JdbcDatabase database, }); } - @Override - protected Optional getDataAdapter() { - return Optional.of(new PostgresDataAdapter()); - } - } From d5a0c2373cc3850e9f821e0f6e9f214a14f3dd7b Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 14 Feb 2022 22:32:09 -0800 Subject: [PATCH 7/8] Bump version --- .../connectors/destination-snowflake/Dockerfile | 4 ++-- docs/integrations/destinations/snowflake.md | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index c24339b138303..a448cb43ce7e2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -ENV APPLICATION_VERSION 0.4.9 +ENV APPLICATION_VERSION 0.4.10 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.9 +LABEL io.airbyte.version=0.4.10 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index e2c429984c58a..78bfe4c4a4f42 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -217,6 +217,8 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.4.10 | 2022-02-14 | [\#10297](https://github.com/airbytehq/airbyte/pull/10297) | Halve the record buffer size to reduce memory consumption. | +| 0.4.9 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `ExitOnOutOfMemoryError` JVM flag. | | 0.4.8 | 2022-02-01 | [\#9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. | | 0.4.7 | 2022-01-29 | [\#9745](https://github.com/airbytehq/airbyte/pull/9745) | Integrate with Sentry. | | 0.4.6 | 2022-01-28 | [#9623](https://github.com/airbytehq/airbyte/pull/9623) | Add jdbc_url_params support for optional JDBC parameters | From 062de0786edfcaaacfa373144127109165e3a779 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 14 Feb 2022 23:34:47 -0800 Subject: [PATCH 8/8] Bump version in seed --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 24 +++++++++---------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 1aaa37fb58ac9..603c846cbac6d 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -185,7 +185,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.9 + dockerImageTag: 0.4.10 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg - name: MariaDB ColumnStore diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 04a925d3bfddd..bc78d65ea8038 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3817,7 +3817,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.9" +- dockerImage: "airbyte/destination-snowflake:0.4.10" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: @@ -3899,32 +3899,30 @@ description: "The loading method used to send data to Snowflake." order: 8 oneOf: - - title: "[Recommended] Internal Staging" + - title: "Select another option" additionalProperties: false - description: "Writes large batches of records to a file, uploads the file\ - \ to Snowflake, then uses
COPY INTO table
to upload the file.\ - \ Recommended for large production workloads for better speed and scalability." + description: "Select another option" required: - "method" properties: method: type: "string" enum: - - "Internal Staging" - default: "Internal Staging" - - title: "Standard Inserts" + - "Standard" + default: "Standard" + - title: "[Recommended] Internal Staging" additionalProperties: false - description: "Uses
INSERT
statements to send batches of records\ - \ to Snowflake. Easiest (no setup) but not recommended for large production\ - \ workloads due to slow speed." + description: "Writes large batches of records to a file, uploads the file\ + \ to Snowflake, then uses
COPY INTO table
to upload the file.\ + \ Recommended for large production workloads for better speed and scalability." required: - "method" properties: method: type: "string" enum: - - "Standard" - default: "Standard" + - "Internal Staging" + default: "Internal Staging" - title: "AWS S3 Staging" additionalProperties: false description: "Writes large batches of records to a file, uploads the file\