Skip to content

Commit 91513f5

Browse files
VitaliiMaltsevvmaltsev
andauthored
Destination Snowflake: update check method to verify permissions for staging (#8781)
* Destination Snowflake update check method to verify permission for stages * fix for jdk 17 * fix for jdk 17 * fix with ci secrets * fix with ci secrets * removed snowflake secrets from ci_credentials.sh * bump version Co-authored-by: vmaltsev <[email protected]>
1 parent 3740f25 commit 91513f5

File tree

7 files changed

+41
-3
lines changed

7 files changed

+41
-3
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
33
"name": "Snowflake",
44
"dockerRepository": "airbyte/destination-snowflake",
5-
"dockerImageTag": "0.3.19",
5+
"dockerImageTag": "0.3.21",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake",
77
"icon": "snowflake.svg"
88
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@
173173
- name: Snowflake
174174
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
175175
dockerRepository: airbyte/destination-snowflake
176-
dockerImageTag: 0.3.20
176+
dockerImageTag: 0.3.21
177177
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
178178
icon: snowflake.svg
179179
- name: MariaDB ColumnStore

airbyte-integrations/connectors/destination-snowflake/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
1818

1919
RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

21-
LABEL io.airbyte.version=0.3.20
21+
LABEL io.airbyte.version=0.3.21
2222
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public static Connection getConnection(final JsonNode config) throws SQLExceptio
4242
// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
4343
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
4444
properties.put("application", "airbyte");
45+
// Needed for JDK17 - see https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
46+
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");
4547

4648
return DriverManager.getConnection(connectUrl, properties);
4749
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,49 @@
1010
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1111
import io.airbyte.integrations.base.Destination;
1212
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
13+
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1314
import io.airbyte.protocol.models.AirbyteMessage;
1415
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.util.UUID;
1520
import java.util.function.Consumer;
1621

1722
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {
1823

24+
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
25+
1926
public SnowflakeInternalStagingDestination() {
2027
super("", new SnowflakeSQLNameTransformer(), new SnowflakeStagingSqlOperations());
2128
}
2229

30+
@Override
31+
public AirbyteConnectionStatus check(JsonNode config) {
32+
SnowflakeSQLNameTransformer nameTransformer = new SnowflakeSQLNameTransformer();
33+
SnowflakeStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeStagingSqlOperations();
34+
try (final JdbcDatabase database = getDatabase(config)) {
35+
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
36+
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
37+
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
38+
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
39+
} catch (final Exception e) {
40+
LOGGER.error("Exception while checking connection: ", e);
41+
return new AirbyteConnectionStatus()
42+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
43+
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
44+
}
45+
}
46+
47+
private static void attemptSQLCreateAndDropStages(String outputSchema, JdbcDatabase database, SnowflakeSQLNameTransformer namingResolver, SnowflakeStagingSqlOperations sqlOperations) throws Exception {
48+
49+
// verify we have permissions to create/drop stage
50+
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
51+
String stageName = namingResolver.getStageName(outputSchema, outputTableName);;
52+
sqlOperations.createStageIfNotExists(database, stageName);
53+
sqlOperations.dropStageIfExists(database,stageName);
54+
}
55+
2356
@Override
2457
protected JdbcDatabase getDatabase(final JsonNode config) {
2558
return SnowflakeDatabase.getDatabase(config);

docs/integrations/destinations/snowflake.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ By default, Airbyte uses batches of `INSERT` commands to add data to a temporary
152152

153153
Internal named stages are storage location objects within a Snowflake database/schema. Because they are database objects, the same security permissions apply as with any other database objects. No need to provide additional properties for internal staging
154154

155+
**Operating on a stage also requires the USAGE privilege on the parent database and schema.**
156+
155157
### AWS S3
156158

157159
For AWS S3, you will need to create a bucket and provide credentials to access the bucket. We recommend creating a bucket that is only used for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this bucket.
@@ -194,6 +196,7 @@ Finally, you need to add read/write permissions to your bucket with that email.
194196

195197
| Version | Date | Pull Request | Subject |
196198
| :------ | :-------- | :----- | :------ |
199+
| 0.3.21 | 2021-12-15 | [#8781](https://github.com/airbytehq/airbyte/pull/8781) | Updated check method to verify permissions to create/drop stage for internal staging; compatibility fix for Java 17 |
197200
| 0.3.20 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management; compatibility fix for Java 17 |
198201
| 0.3.19 | 2021-12-06 | [#8528](https://github.com/airbytehq/airbyte/pull/8528) | Set Internal Staging as default choice |
199202
| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support |

tools/bin/ci_credentials.sh

Whitespace-only changes.

0 commit comments

Comments
 (0)