Skip to content

Commit 0d795d5

Browse files
authored
Destination Snowflake: set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud (#19302)
* [19250] Destination Snowflake: set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud
1 parent 75d1353 commit 0d795d5

File tree

17 files changed

+78
-86
lines changed

17 files changed

+78
-86
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@
306306
- name: Snowflake
307307
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
308308
dockerRepository: airbyte/destination-snowflake
309-
dockerImageTag: 0.4.39
309+
dockerImageTag: 0.4.40
310310
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
311311
icon: snowflake.svg
312312
normalizationRepository: airbyte/normalization-snowflake

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5408,7 +5408,7 @@
54085408
supported_destination_sync_modes:
54095409
- "overwrite"
54105410
- "append"
5411-
- dockerImage: "airbyte/destination-snowflake:0.4.39"
5411+
- dockerImage: "airbyte/destination-snowflake:0.4.40"
54125412
spec:
54135413
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
54145414
connectionSpecification:

airbyte-integrations/connectors/destination-snowflake/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

2121
ENV ENABLE_SENTRY true
2222

23-
LABEL io.airbyte.version=0.4.39
23+
LABEL io.airbyte.version=0.4.40
2424
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
application {
8-
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination'
8+
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner'
99
// enable when profiling
1010
applicationDefaultJvmArgs = [
1111
'-XX:+ExitOnOutOfMemoryError',
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake;
6+
7+
public class OssCloudEnvVarConsts {
8+
9+
public static final String AIRBYTE_OSS = "airbyte_oss";
10+
public static final String AIRBYTE_CLOUD = "airbyte_cloud";
11+
12+
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyAzureBlobStorageDestination.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323

2424
public class SnowflakeCopyAzureBlobStorageDestination extends CopyDestination {
2525

26+
private final String airbyteEnvironment;
27+
28+
public SnowflakeCopyAzureBlobStorageDestination(final String airbyteEnvironment) {
29+
this.airbyteEnvironment = airbyteEnvironment;
30+
}
31+
2632
@Override
2733
public AirbyteMessageConsumer getConsumer(final JsonNode config,
2834
final ConfiguredAirbyteCatalog catalog,
@@ -52,7 +58,7 @@ public ExtendedNameTransformer getNameTransformer() {
5258

5359
@Override
5460
public DataSource getDataSource(final JsonNode config) {
55-
return SnowflakeDatabase.createDataSource(config);
61+
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
5662
}
5763

5864
@Override

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class SnowflakeDatabase {
5656
private static final String CONNECTION_STRING_IDENTIFIER_KEY = "application";
5757
private static final String CONNECTION_STRING_IDENTIFIER_VAL = "Airbyte_Connector";
5858

59-
public static HikariDataSource createDataSource(final JsonNode config) {
59+
public static HikariDataSource createDataSource(final JsonNode config, final String airbyteEnvironment) {
6060
final HikariDataSource dataSource = new HikariDataSource();
6161

6262
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?",
@@ -129,7 +129,7 @@ public static HikariDataSource createDataSource(final JsonNode config) {
129129

130130
// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
131131
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
132-
properties.put("application", "airbyte");
132+
properties.put("application", airbyteEnvironment); // see envs in OssCloudEnvVarConsts class
133133
// Needed for JDK17 - see
134134
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
135135
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
package io.airbyte.integrations.destination.snowflake;
66

7-
import io.airbyte.integrations.base.Destination;
8-
import io.airbyte.integrations.base.IntegrationRunner;
97
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
108
import java.util.concurrent.Executors;
119
import java.util.concurrent.ScheduledExecutorService;
@@ -21,14 +19,9 @@ enum DestinationType {
2119
INTERNAL_STAGING
2220
}
2321

24-
public SnowflakeDestination() {
25-
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig, SnowflakeDestinationResolver.getTypeToDestination());
26-
}
27-
28-
public static void main(final String[] args) throws Exception {
29-
final Destination destination = new SnowflakeDestination();
30-
new IntegrationRunner(destination).run(args);
31-
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
22+
public SnowflakeDestination(final String airbyteEnvironment) {
23+
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig,
24+
SnowflakeDestinationResolver.getTypeToDestination(airbyteEnvironment));
3225
}
3326

3427
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationResolver.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ public static boolean isAzureBlobCopy(final JsonNode config) {
3737
&& config.get("loading_method").has("azure_blob_storage_account_name");
3838
}
3939

40-
public static Map<DestinationType, Destination> getTypeToDestination() {
41-
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination();
42-
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination();
43-
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination();
44-
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination();
40+
public static Map<DestinationType, Destination> getTypeToDestination(
41+
final String airbyteEnvironment) {
42+
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
43+
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
44+
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);
45+
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination(airbyteEnvironment);
4546

4647
return ImmutableMap.of(
4748
DestinationType.COPY_S3, s3StagingDestination,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake;
6+
7+
import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE;
8+
9+
import io.airbyte.integrations.base.adaptive.AdaptiveDestinationRunner;
10+
11+
public class SnowflakeDestinationRunner {
12+
13+
public static void main(final String[] args) throws Exception {
14+
AdaptiveDestinationRunner.baseOnEnv()
15+
.withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS))
16+
.withCloudDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_CLOUD))
17+
.run(args);
18+
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
19+
}
20+
21+
}

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@
4040
public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination implements Destination {
4141

4242
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStagingDestination.class);
43+
private String airbyteEnvironment;
4344

44-
public SnowflakeGcsStagingDestination() {
45-
this(new SnowflakeSQLNameTransformer());
45+
public SnowflakeGcsStagingDestination(final String airbyteEnvironment) {
46+
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
4647
}
4748

48-
public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
49+
public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
4950
super("", nameTransformer, new SnowflakeSqlOperations());
51+
this.airbyteEnvironment = airbyteEnvironment;
5052
}
5153

5254
@Override
@@ -101,7 +103,7 @@ public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOExcep
101103

102104
@Override
103105
protected DataSource getDataSource(final JsonNode config) {
104-
return SnowflakeDatabase.createDataSource(config);
106+
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
105107
}
106108

107109
@Override

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestination.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {
3030

3131
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
32+
private String airbyteEnvironment;
3233

33-
public SnowflakeInternalStagingDestination() {
34-
this(new SnowflakeSQLNameTransformer());
34+
public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
35+
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
3536
}
3637

37-
public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer) {
38+
public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer, final String airbyteEnvironment) {
3839
super("", nameTransformer, new SnowflakeInternalStagingSqlOperations(nameTransformer));
40+
this.airbyteEnvironment = airbyteEnvironment;
3941
}
4042

4143
@Override
@@ -79,7 +81,7 @@ private static void attemptStageOperations(final String outputSchema,
7981

8082
@Override
8183
protected DataSource getDataSource(final JsonNode config) {
82-
return SnowflakeDatabase.createDataSource(config);
84+
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
8385
}
8486

8587
@Override

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
public class SnowflakeS3StagingDestination extends AbstractJdbcDestination implements Destination {
3535

3636
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StagingDestination.class);
37+
private String airbyteEnvironment;
3738

38-
public SnowflakeS3StagingDestination() {
39-
this(new SnowflakeSQLNameTransformer());
39+
public SnowflakeS3StagingDestination(final String airbyteEnvironment) {
40+
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
4041
}
4142

42-
public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
43+
public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
4344
super("", nameTransformer, new SnowflakeSqlOperations());
45+
this.airbyteEnvironment = airbyteEnvironment;
4446
}
4547

4648
@Override
@@ -93,7 +95,7 @@ private static void attemptStageOperations(final String outputSchema,
9395

9496
@Override
9597
protected DataSource getDataSource(final JsonNode config) {
96-
return SnowflakeDatabase.createDataSource(config);
98+
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
9799
}
98100

99101
@Override

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ void testCheckFailsWithInvalidPermissions() throws Exception {
3333
// this connector should be updated with multiple credentials, each with a clear purpose (valid,
3434
// invalid: insufficient permissions, invalid: wrong password, etc..)
3535
final JsonNode credentialsJsonString = Jsons.deserialize(Files.readString(Paths.get("secrets/config.json")));
36-
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
36+
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
3737
assertEquals(AirbyteConnectionStatus.Status.FAILED, check.getStatus());
3838
}
3939

4040
@Test
4141
public void testInvalidSchemaName() throws Exception {
4242
final JsonNode config = getConfig();
4343
final String schema = config.get("schema").asText();
44-
final DataSource dataSource = SnowflakeDatabase.createDataSource(config);
44+
final DataSource dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
4545
try {
4646
final JdbcDatabase database = SnowflakeDatabase.getDatabase(dataSource);
4747
assertDoesNotThrow(() -> syncWithNamingResolver(database, schema));

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
173173
this.config = Jsons.clone(getStaticConfig());
174174
((ObjectNode) config).put("schema", schemaName);
175175

176-
dataSource = SnowflakeDatabase.createDataSource(config);
176+
dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
177177
database = SnowflakeDatabase.getDatabase(dataSource);
178178
database.execute(createSchemaQuery);
179179
}
@@ -223,7 +223,7 @@ public void testBackwardCompatibilityAfterAddingOauth() {
223223
@Test
224224
void testCheckWithKeyPairAuth() throws Exception {
225225
final JsonNode credentialsJsonString = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_key_pair.json")));
226-
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
226+
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
227227
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus());
228228
}
229229

docs/integrations/destinations/snowflake.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ Now that you have set up the Snowflake destination connector, check out the foll
275275
276276
| Version | Date | Pull Request | Subject |
277277
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
278+
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
278279
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
279280
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
280281
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |

0 commit comments

Comments
 (0)