Skip to content

Destination Snowflake: set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud #19302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination'
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner'
// enable when profiling
applicationDefaultJvmArgs = [
'-XX:+ExitOnOutOfMemoryError',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

public class OssCloudEnvVarConsts {

public static final String AIRBYTE_OSS = "airbyte_oss";
public static final String AIRBYTE_CLOUD = "airbyte_cloud";

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

public class SnowflakeCopyAzureBlobStorageDestination extends CopyDestination {

private final String airbyteEnvironment;

public SnowflakeCopyAzureBlobStorageDestination(final String airbyteEnvironment) {
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -52,7 +58,7 @@ public ExtendedNameTransformer getNameTransformer() {

@Override
public DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class SnowflakeDatabase {
private static final String CONNECTION_STRING_IDENTIFIER_KEY = "application";
private static final String CONNECTION_STRING_IDENTIFIER_VAL = "Airbyte_Connector";

public static HikariDataSource createDataSource(final JsonNode config) {
public static HikariDataSource createDataSource(final JsonNode config, final String airbyteEnvironment) {
final HikariDataSource dataSource = new HikariDataSource();

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?",
Expand Down Expand Up @@ -129,7 +129,7 @@ public static HikariDataSource createDataSource(final JsonNode config) {

// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
properties.put("application", "airbyte");
properties.put("application", airbyteEnvironment); // see envs in OssCloudEnvVarConsts class
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.integrations.destination.snowflake;

import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -21,14 +19,9 @@ enum DestinationType {
INTERNAL_STAGING
}

public SnowflakeDestination() {
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig, SnowflakeDestinationResolver.getTypeToDestination());
}

public static void main(final String[] args) throws Exception {
final Destination destination = new SnowflakeDestination();
new IntegrationRunner(destination).run(args);
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
public SnowflakeDestination(final String airbyteEnvironment) {
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig,
SnowflakeDestinationResolver.getTypeToDestination(airbyteEnvironment));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public static boolean isAzureBlobCopy(final JsonNode config) {
&& config.get("loading_method").has("azure_blob_storage_account_name");
}

public static Map<DestinationType, Destination> getTypeToDestination() {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination();
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination();
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination();
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination();
public static Map<DestinationType, Destination> getTypeToDestination(
final String airbyteEnvironment) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be on the same line as above, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, it started looking line that after "gradlew format" command (this command is also run in CI). I tried to make it a single line manually twice, but the automatic format sets it as you see now. Have no idea why

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we can keep this change and it appears you're already in the publication process plus it's not a functional requirement. Thanks for the context though

final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination(airbyteEnvironment);

return ImmutableMap.of(
DestinationType.COPY_S3, s3StagingDestination,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing

/*
 * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks


import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE;

import io.airbyte.integrations.base.adaptive.AdaptiveDestinationRunner;

public class SnowflakeDestinationRunner {

public static void main(final String[] args) throws Exception {
AdaptiveDestinationRunner.baseOnEnv()
.withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking, maybe could have these variables imported at the top to keep in sync with Snowflake Source

import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_CLOUD;
import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_OSS;

Copy link
Contributor Author

@etsybaev etsybaev Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of the comments above, I was asked to downgrade it from Base level to destination one :) I already published the connector but will have a look at my next PR. At the moment when I started working on this PR we didn't have these vars. Thanks

.withCloudDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_CLOUD))
.run(args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably also needs to do a SCHEDULED_EXECUTOR_SERVICE.shutdownNow(); similar to Akash's PR https://github.com/airbytehq/airbyte/pull/19314/files#diff-d6c8766f9065e3ddfc335cd448642c552b312f2d1feac08e62793aec3716df12R16

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, thanks!

SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStagingDestination.class);
private String airbyteEnvironment;

public SnowflakeGcsStagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeGcsStagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -101,7 +103,7 @@ public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOExcep

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
private String airbyteEnvironment;

public SnowflakeInternalStagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer) {
public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeInternalStagingSqlOperations(nameTransformer));
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -79,7 +81,7 @@ private static void attemptStageOperations(final String outputSchema,

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
public class SnowflakeS3StagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StagingDestination.class);
private String airbyteEnvironment;

public SnowflakeS3StagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeS3StagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -93,7 +95,7 @@ private static void attemptStageOperations(final String outputSchema,

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ void testCheckFailsWithInvalidPermissions() throws Exception {
// this connector should be updated with multiple credentials, each with a clear purpose (valid,
// invalid: insufficient permissions, invalid: wrong password, etc..)
final JsonNode credentialsJsonString = Jsons.deserialize(Files.readString(Paths.get("secrets/config.json")));
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
assertEquals(AirbyteConnectionStatus.Status.FAILED, check.getStatus());
}

@Test
public void testInvalidSchemaName() throws Exception {
final JsonNode config = getConfig();
final String schema = config.get("schema").asText();
final DataSource dataSource = SnowflakeDatabase.createDataSource(config);
final DataSource dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
try {
final JdbcDatabase database = SnowflakeDatabase.getDatabase(dataSource);
assertDoesNotThrow(() -> syncWithNamingResolver(database, schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
this.config = Jsons.clone(getStaticConfig());
((ObjectNode) config).put("schema", schemaName);

dataSource = SnowflakeDatabase.createDataSource(config);
dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
database = SnowflakeDatabase.getDatabase(dataSource);
database.execute(createSchemaQuery);
}
Expand Down Expand Up @@ -223,7 +223,7 @@ public void testBackwardCompatibilityAfterAddingOauth() {
@Test
void testCheckWithKeyPairAuth() throws Exception {
final JsonNode credentialsJsonString = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_key_pair.json")));
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus());
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ Now that you have set up the Snowflake destination connector, check out the foll

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
Expand Down