Skip to content

Set app_id in snowflake source connector based on oss/cloud #19314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.source.snowflake.SnowflakeSource'
mainClass = 'io.airbyte.integrations.source.snowflake.SnowflakeSourceRunner'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class SnowflakeDataSourceUtils {
public static final String OAUTH_METHOD = "OAuth";
public static final String USERNAME_PASSWORD_METHOD = "username/password";
public static final String UNRECOGNIZED = "Unrecognized";
public static final String AIRBYTE_OSS = "airbyte_oss";
public static final String AIRBYTE_CLOUD = "airbyte_cloud";
private static final String JDBC_CONNECTION_STRING =
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=Airbyte_Connector";
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=%s";

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDataSourceUtils.class);
private static final int PAUSE_BETWEEN_TOKEN_REFRESH_MIN = 7; // snowflake access token's TTL is 10min and can't be modified
Expand All @@ -53,9 +55,9 @@ public class SnowflakeDataSourceUtils {
* @param config source config JSON
* @return datasource
*/
public static HikariDataSource createDataSource(final JsonNode config) {
public static HikariDataSource createDataSource(final JsonNode config, final String airbyteEnvironment) {
final HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(buildJDBCUrl(config));
dataSource.setJdbcUrl(buildJDBCUrl(config, airbyteEnvironment));

if (config.has("credentials")) {
final JsonNode credentials = config.get("credentials");
Expand Down Expand Up @@ -130,7 +132,7 @@ public static String getAccessTokenUsingRefreshToken(final String hostName,
}
}

public static String buildJDBCUrl(final JsonNode config) {
public static String buildJDBCUrl(final JsonNode config, final String airbyteEnvironment) {
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?",
config.get(JdbcUtils.HOST_KEY).asText()));

Expand All @@ -143,7 +145,8 @@ public static String buildJDBCUrl(final JsonNode config) {
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
"JSON",
true));
true,
airbyteEnvironment));

// https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import java.io.IOException;
Expand All @@ -37,16 +36,11 @@ public class SnowflakeSource extends AbstractJdbcSource<JDBCType> implements Sou
public static final String DRIVER_CLASS = DatabaseDriver.SNOWFLAKE.getDriverClassName();
public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);

public SnowflakeSource() {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new SnowflakeSourceOperations());
}
private final String airbyteEnvironment;

public static void main(final String[] args) throws Exception {
final Source source = new SnowflakeSource();
LOGGER.info("starting source: {}", SnowflakeSource.class);
new IntegrationRunner(source).run(args);
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
LOGGER.info("completed source: {}", SnowflakeSource.class);
public SnowflakeSource(final String airbyteEnvironment) {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new SnowflakeSourceOperations());
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand All @@ -59,14 +53,14 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {

@Override
protected DataSource createDataSource(final JsonNode config) {
final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(config);
final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(config, airbyteEnvironment);
dataSources.add(dataSource);
return dataSource;
}

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final String jdbcUrl = SnowflakeDataSourceUtils.buildJDBCUrl(config);
final String jdbcUrl = SnowflakeDataSourceUtils.buildJDBCUrl(config, airbyteEnvironment);

if (config.has("credentials")) {
final JsonNode credentials = config.get("credentials");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.airbyte.integrations.source.snowflake;

import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_CLOUD;
import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_OSS;
import static io.airbyte.integrations.source.snowflake.SnowflakeSource.SCHEDULED_EXECUTOR_SERVICE;

import io.airbyte.integrations.base.adaptive.AdaptiveSourceRunner;

public class SnowflakeSourceRunner {

public static void main(final String[] args) throws Exception {
AdaptiveSourceRunner.baseOnEnv()
.withOssSource(() -> new SnowflakeSource(AIRBYTE_OSS))
.withCloudSource(() -> new SnowflakeSource(AIRBYTE_CLOUD))
.run(args);
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, I totally forgot about this on the destination-snowflake PR

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_OSS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -102,7 +103,7 @@ public String getDriverClass() {

@Override
public AbstractJdbcSource<JDBCType> getJdbcSource() {
return new SnowflakeSource();
return new SnowflakeSource(AIRBYTE_OSS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@ class SnowflakeDataSourceUtilsTest {
}
""";
private final String expectedJdbcUrl =
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE&schema=SOURCE_SCHEMA&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=Airbyte_Connector";
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE&schema=SOURCE_SCHEMA&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=airbyte_oss";

@Test
void testBuildJDBCUrl() {
JsonNode expectedConfig = Jsons.deserialize(config);
final JsonNode expectedConfig = Jsons.deserialize(config);

String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig);
final String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig, SnowflakeDataSourceUtils.AIRBYTE_OSS);

assertEquals(expectedJdbcUrl, jdbcURL);
}

@Test
void testBuildJDBCUrlWithParams() {
JsonNode expectedConfig = Jsons.deserialize(config);
String params = "someParameter1&param2=someParameter2";
final JsonNode expectedConfig = Jsons.deserialize(config);
final String params = "someParameter1&param2=someParameter2";
((ObjectNode) expectedConfig).put("jdbc_url_params", params);

String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig);
final String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig, SnowflakeDataSourceUtils.AIRBYTE_OSS);

assertEquals(expectedJdbcUrl + "&" + params, jdbcURL);
}
Expand Down