Skip to content

Commit 089c405

Browse files
authored
🐛Destination-Snowflake: updated check method to handle more errors (#18970)
* [16833] Destination-Snowflake: updated check method to handle "No Active Warehouse" error or user has incorrect permissions
1 parent c8a00ea commit 089c405

File tree

12 files changed

+123
-18
lines changed

12 files changed

+123
-18
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@
306306
- name: Snowflake
307307
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
308308
dockerRepository: airbyte/destination-snowflake
309-
dockerImageTag: 0.4.38
309+
dockerImageTag: 0.4.39
310310
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
311311
icon: snowflake.svg
312312
normalizationRepository: airbyte/normalization-snowflake

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5408,7 +5408,7 @@
54085408
supported_destination_sync_modes:
54095409
- "overwrite"
54105410
- "append"
5411-
- dockerImage: "airbyte/destination-snowflake:0.4.38"
5411+
- dockerImage: "airbyte/destination-snowflake:0.4.39"
54125412
spec:
54135413
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
54145414
connectionSpecification:

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import com.fasterxml.jackson.databind.JsonNode;
1010
import io.airbyte.commons.exceptions.ConnectionErrorException;
11+
import io.airbyte.commons.json.Jsons;
1112
import io.airbyte.commons.map.MoreMaps;
1213
import io.airbyte.db.factory.DataSourceFactory;
1314
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
@@ -21,8 +22,10 @@
2122
import io.airbyte.protocol.models.AirbyteConnectionStatus;
2223
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
2324
import io.airbyte.protocol.models.AirbyteMessage;
25+
import io.airbyte.protocol.models.AirbyteRecordMessage;
2426
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
2527
import java.sql.SQLException;
28+
import java.util.List;
2629
import java.util.Map;
2730
import java.util.Objects;
2831
import java.util.UUID;
@@ -84,11 +87,38 @@ public AirbyteConnectionStatus check(final JsonNode config) {
8487
}
8588
}
8689

90+
/**
91+
* This method is deprecated. It verifies table creation, but not insert right to a newly created
92+
* table. Use attemptTableOperations with the attemptInsert argument instead.
93+
*/
94+
@Deprecated
8795
public static void attemptSQLCreateAndDropTableOperations(final String outputSchema,
8896
final JdbcDatabase database,
8997
final NamingConventionTransformer namingResolver,
9098
final SqlOperations sqlOps)
9199
throws Exception {
100+
attemptTableOperations(outputSchema, database, namingResolver, sqlOps, false);
101+
}
102+
103+
/**
104+
* Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists. 2.
105+
* Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to true.
106+
* 4. Delete table created on step 2.
107+
*
108+
* @param outputSchema - schema to tests against.
109+
* @param database - database to tests against.
110+
* @param namingResolver - naming resolver.
111+
* @param sqlOps - SqlOperations object
112+
* @param attemptInsert - set true if need to make attempt to insert dummy records to newly created
113+
* table. Set false to skip insert step.
114+
* @throws Exception
115+
*/
116+
public static void attemptTableOperations(final String outputSchema,
117+
final JdbcDatabase database,
118+
final NamingConventionTransformer namingResolver,
119+
final SqlOperations sqlOps,
120+
final boolean attemptInsert)
121+
throws Exception {
92122
// verify we have write permissions on the target schema by creating a table with a random name,
93123
// then dropping that table
94124
try {
@@ -100,7 +130,14 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
100130
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
101131
sqlOps.createSchemaIfNotExists(database, outputSchema);
102132
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName);
103-
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
133+
// verify if user has permission to make SQL INSERT queries
134+
try {
135+
if (attemptInsert) {
136+
sqlOps.insertRecords(database, List.of(getDummyRecord()), outputSchema, outputTableName);
137+
}
138+
} finally {
139+
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
140+
}
104141
} catch (final SQLException e) {
105142
if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) {
106143
throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e);
@@ -113,6 +150,19 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
113150
}
114151
}
115152

153+
/**
154+
* Generates a dummy AirbyteRecordMessage with random values.
155+
*
156+
* @return AirbyteRecordMessage object with dummy values that may be used to test insert permission.
157+
*/
158+
private static AirbyteRecordMessage getDummyRecord() {
159+
final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }");
160+
return new AirbyteRecordMessage()
161+
.withStream("stream1")
162+
.withData(dummyDataToInsert)
163+
.withEmittedAt(1602637589000L);
164+
}
165+
116166
protected DataSource getDataSource(final JsonNode config) {
117167
final JsonNode jdbcConfig = toJdbcConfig(config);
118168
return DataSourceFactory.create(

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
1515
import io.airbyte.integrations.base.Destination;
1616
import io.airbyte.integrations.destination.ExtendedNameTransformer;
17+
import io.airbyte.integrations.destination.NamingConventionTransformer;
1718
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
1819
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1920
import io.airbyte.protocol.models.AirbyteConnectionStatus;
@@ -68,7 +69,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
6869
final JdbcDatabase database = getDatabase(dataSource);
6970
final var nameTransformer = getNameTransformer();
7071
final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText());
71-
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
72+
performCreateInsertTestOnDestination(outputSchema, database, nameTransformer);
7273

7374
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
7475
} catch (final ConnectionErrorException ex) {
@@ -92,4 +93,11 @@ public AirbyteConnectionStatus check(final JsonNode config) {
9293
}
9394
}
9495

96+
protected void performCreateInsertTestOnDestination(final String outputSchema,
97+
final JdbcDatabase database,
98+
final NamingConventionTransformer nameTransformer)
99+
throws Exception {
100+
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
101+
}
102+
95103
}

airbyte-integrations/connectors/destination-snowflake/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

2121
ENV ENABLE_SENTRY true
2222

23-
LABEL io.airbyte.version=0.4.38
23+
LABEL io.airbyte.version=0.4.39
2424
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ dependencies {
4747
// this is a configuration to make mockito work with final classes
4848
testImplementation 'org.mockito:mockito-inline:2.13.0'
4949

50-
50+
integrationTestJavaImplementation project(':airbyte-commons-worker')
5151
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
5252
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-snowflake')
5353
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyAzureBlobStorageDestination.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import io.airbyte.db.jdbc.JdbcDatabase;
1111
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1212
import io.airbyte.integrations.destination.ExtendedNameTransformer;
13+
import io.airbyte.integrations.destination.NamingConventionTransformer;
14+
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
1315
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1416
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
1517
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
@@ -63,6 +65,15 @@ public SqlOperations getSqlOperations() {
6365
return new SnowflakeSqlOperations();
6466
}
6567

68+
@Override
69+
protected void performCreateInsertTestOnDestination(final String outputSchema,
70+
final JdbcDatabase database,
71+
final NamingConventionTransformer nameTransformer)
72+
throws Exception {
73+
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer,
74+
getSqlOperations(), true);
75+
}
76+
6677
private String getConfiguredSchema(final JsonNode config) {
6778
return config.get("schema").asText();
6879
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
6060
try {
6161
final JdbcDatabase database = getDatabase(dataSource);
6262
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
63-
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations);
63+
64+
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations,
65+
true);
6466
attemptWriteAndDeleteGcsObject(gcsConfig, outputSchema);
6567

6668
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
4646
try {
4747
final JdbcDatabase database = getDatabase(dataSource);
4848
final String outputSchema = nameTransformer.getIdentifier(config.get("schema").asText());
49-
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
50-
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
49+
attemptTableOperations(outputSchema, database, nameTransformer,
50+
snowflakeInternalStagingSqlOperations, true);
51+
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
5152
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
5253
} catch (final Exception e) {
5354
LOGGER.error("Exception while checking connection: ", e);
@@ -63,10 +64,10 @@ public AirbyteConnectionStatus check(final JsonNode config) {
6364
}
6465
}
6566

66-
private static void attemptSQLCreateAndDropStages(final String outputSchema,
67-
final JdbcDatabase database,
68-
final NamingConventionTransformer namingResolver,
69-
final SnowflakeInternalStagingSqlOperations sqlOperations)
67+
private static void attemptStageOperations(final String outputSchema,
68+
final JdbcDatabase database,
69+
final NamingConventionTransformer namingResolver,
70+
final SnowflakeInternalStagingSqlOperations sqlOperations)
7071
throws Exception {
7172

7273
// verify we have permissions to create/drop stage

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
6060
try {
6161
final JdbcDatabase database = getDatabase(dataSource);
6262
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
63-
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
64-
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
63+
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations,
64+
true);
65+
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
6566
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
6667
} catch (final Exception e) {
6768
LOGGER.error("Exception while checking connection: ", e);
@@ -77,7 +78,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
7778
}
7879
}
7980

80-
private static void attemptSQLCreateAndDropStages(final String outputSchema,
81+
private static void attemptStageOperations(final String outputSchema,
8182
final JdbcDatabase database,
8283
final NamingConventionTransformer namingResolver,
8384
final SnowflakeS3StagingSqlOperations sqlOperations)

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.snowflake;
66

7+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
78
import static org.junit.jupiter.api.Assertions.assertEquals;
89
import static org.junit.jupiter.api.Assertions.assertTrue;
910

@@ -14,6 +15,7 @@
1415
import io.airbyte.commons.json.Jsons;
1516
import io.airbyte.commons.resources.MoreResources;
1617
import io.airbyte.commons.string.Strings;
18+
import io.airbyte.config.StandardCheckConnectionOutput;
1719
import io.airbyte.config.StandardCheckConnectionOutput.Status;
1820
import io.airbyte.db.factory.DataSourceFactory;
1921
import io.airbyte.db.jdbc.JdbcDatabase;
@@ -41,6 +43,11 @@
4143
public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest {
4244

4345
private static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer();
46+
protected static final String NO_ACTIVE_WAREHOUSE_ERR_MSG =
47+
"No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.";
48+
49+
protected static final String NO_USER_PRIVILEGES_ERR_MSG =
50+
"Schema 'TEXT_SCHEMA' already exists, but current role has no privileges on it.";
4451

4552
// this config is based on the static config, and it contains a random
4653
// schema name that is different for each test run
@@ -178,6 +185,30 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
178185
DataSourceFactory.close(dataSource);
179186
}
180187

188+
@Test
189+
public void testCheckWithNoActiveWarehouseConnection() throws Exception {
190+
// Config to user(creds) that has no warehouse assigned
191+
final JsonNode config = Jsons.deserialize(IOs.readFile(
192+
Path.of("secrets/internal_staging_config_no_active_warehouse.json")));
193+
194+
StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
195+
196+
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
197+
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_ACTIVE_WAREHOUSE_ERR_MSG);
198+
}
199+
200+
@Test
201+
public void testCheckWithNoTextSchemaPermissionConnection() throws Exception {
202+
// Config to user (creds) that has no permission to schema
203+
final JsonNode config = Jsons.deserialize(IOs.readFile(
204+
Path.of("secrets/config_no_text_schema_permission.json")));
205+
206+
StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
207+
208+
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
209+
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_USER_PRIVILEGES_ERR_MSG);
210+
}
211+
181212
@Test
182213
public void testBackwardCompatibilityAfterAddingOauth() {
183214
final JsonNode deprecatedStyleConfig = Jsons.clone(config);

docs/integrations/destinations/snowflake.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,9 @@ Now that you have set up the Snowflake destination connector, check out the foll
275275
276276
| Version | Date | Pull Request | Subject |
277277
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
278-
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
279-
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
278+
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
279+
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
280+
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
280281
| 0.4.36 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
281282
| 0.4.35 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields). |
282283
| 0.4.34 | 2022-07-23 | [\#14388](https://github.com/airbytehq/airbyte/pull/14388) | Add support for key pair authentication |

0 commit comments

Comments
 (0)