Skip to content

🐛Destination-Snowflake: updated check method to handle more errors #18970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -21,8 +22,10 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -84,11 +87,38 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

/**
* This method is deprecated. It verifies table creation, but not insert right to a newly created
* table. Use attemptTableOperations with the attemptInsert argument instead.
*/
@Deprecated
public static void attemptSQLCreateAndDropTableOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOps)
throws Exception {
attemptTableOperations(outputSchema, database, namingResolver, sqlOps, false);
}

/**
* Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists. 2.
* Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to true.
* 4. Delete table created on step 2.
*
* @param outputSchema - schema to tests against.
* @param database - database to tests against.
* @param namingResolver - naming resolver.
* @param sqlOps - SqlOperations object
* @param attemptInsert - set true if need to make attempt to insert dummy records to newly created
* table. Set false to skip insert step.
* @throws Exception
*/
public static void attemptTableOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOps,
final boolean attemptInsert)
throws Exception {
// verify we have write permissions on the target schema by creating a table with a random name,
// then dropping that table
try {
Expand All @@ -100,7 +130,14 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
sqlOps.createSchemaIfNotExists(database, outputSchema);
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName);
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
// verify if user has permission to make SQL INSERT queries
try {
if (attemptInsert) {
sqlOps.insertRecords(database, List.of(getDummyRecord()), outputSchema, outputTableName);
}
} finally {
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
}
} catch (final SQLException e) {
if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) {
throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e);
Expand All @@ -113,6 +150,19 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
}
}

/**
* Generates a dummy AirbyteRecordMessage with random values.
*
* @return AirbyteRecordMessage object with dummy values that may be used to test insert permission.
*/
private static AirbyteRecordMessage getDummyRecord() {
final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }");
return new AirbyteRecordMessage()
.withStream("stream1")
.withData(dummyDataToInsert)
.withEmittedAt(1602637589000L);
}

protected DataSource getDataSource(final JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);
return DataSourceFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
Expand Down Expand Up @@ -68,7 +69,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final JdbcDatabase database = getDatabase(dataSource);
final var nameTransformer = getNameTransformer();
final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText());
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
performCreateInsertTestOnDestination(outputSchema, database, nameTransformer);

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException ex) {
Expand All @@ -92,4 +93,11 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

protected void performCreateInsertTestOnDestination(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies {
// this is a configuration to make mockito work with final classes
testImplementation 'org.mockito:mockito-inline:2.13.0'


integrationTestJavaImplementation project(':airbyte-commons-worker')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-snowflake')
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
Expand Down Expand Up @@ -63,6 +65,15 @@ public SqlOperations getSqlOperations() {
return new SnowflakeSqlOperations();
}

@Override
protected void performCreateInsertTestOnDestination(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer,
getSqlOperations(), true);
}

private String getConfiguredSchema(final JsonNode config) {
return config.get("schema").asText();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations);

attemptTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations,
true);
attemptWriteAndDeleteGcsObject(gcsConfig, outputSchema);

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = nameTransformer.getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer,
snowflakeInternalStagingSqlOperations, true);
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeInternalStagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
Expand All @@ -63,10 +64,10 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private static void attemptSQLCreateAndDropStages(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeInternalStagingSqlOperations sqlOperations)
private static void attemptStageOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeInternalStagingSqlOperations sqlOperations)
throws Exception {

// verify we have permissions to create/drop stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations,
true);
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
Expand All @@ -77,7 +78,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private static void attemptSQLCreateAndDropStages(final String outputSchema,
private static void attemptStageOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeS3StagingSqlOperations sqlOperations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.snowflake;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -14,6 +15,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -41,6 +43,11 @@
public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer();
protected static final String NO_ACTIVE_WAREHOUSE_ERR_MSG =
"No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.";

protected static final String NO_USER_PRIVILEGES_ERR_MSG =
"Schema 'TEXT_SCHEMA' already exists, but current role has no privileges on it.";

// this config is based on the static config, and it contains a random
// schema name that is different for each test run
Expand Down Expand Up @@ -178,6 +185,30 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
DataSourceFactory.close(dataSource);
}

@Test
public void testCheckWithNoActiveWarehouseConnection() throws Exception {
// Config to user(creds) that has no warehouse assigned
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/internal_staging_config_no_active_warehouse.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_ACTIVE_WAREHOUSE_ERR_MSG);
}

@Test
public void testCheckWithNoTextSchemaPermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/config_no_text_schema_permission.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_USER_PRIVILEGES_ERR_MSG);
}

@Test
public void testBackwardCompatibilityAfterAddingOauth() {
final JsonNode deprecatedStyleConfig = Jsons.clone(config);
Expand Down