Skip to content

Code cleanup in abstract classes #18811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
Expand All @@ -18,8 +22,9 @@
import io.airbyte.db.bigquery.BigQuerySourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -34,7 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQuerySource extends AbstractRelationalDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {
public class BigQuerySource extends AbstractDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySource.class);
private static final String QUOTE = "`";
Expand Down Expand Up @@ -76,7 +81,7 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
checkList.add(database -> {
if (isDatasetConfigured(database)) {
database.query(String.format("select 1 from %s where 1=0",
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES")));
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
LOGGER.info("The source passed the Dataset query test!");
} else {
LOGGER.info("The Dataset query test is skipped due to not configured datasetId!");
Expand Down Expand Up @@ -140,12 +145,23 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final CursorInfo cursorInfo,
final StandardSQLTypeName cursorFieldType) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName),
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString()),
cursorInfo.getCursorField()),
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final BigQueryDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
}

@Override
public boolean isCursorType(final StandardSQLTypeName standardSQLTypeName) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
Expand All @@ -40,7 +43,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand Down Expand Up @@ -81,7 +84,7 @@
* relational DB source which can be accessed via JDBC driver. If you are implementing a connector
* for a relational DB which has a JDBC driver, make an effort to use this class.
*/
public abstract class AbstractJdbcSource<Datatype> extends AbstractRelationalDbSource<Datatype, JdbcDatabase> implements Source {
public abstract class AbstractJdbcSource<Datatype> extends AbstractDbSource<Datatype, JdbcDatabase> implements Source {

public static final String SSL_MODE = "sslMode";

Expand Down Expand Up @@ -136,12 +139,23 @@ public AbstractJdbcSource(final String driverClass,
this.sourceOperations = sourceOperations;
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
}

/**
* Configures a list of operations that can be used to check the connection to the source.
*
* @return list of consumers that run queries for the check command.
*/
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
protected List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
return ImmutableList.of(database -> {
LOGGER.info("Attempting to get metadata from the database to see if we can connect.");
database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), sourceOperations::rowToJson);
Expand Down Expand Up @@ -258,7 +272,7 @@ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLExceptio
* @param field Essential column information returned from
* {@link AbstractJdbcSource#getColumnMetadata}.
*/
public Datatype getFieldType(final JsonNode field) {
private Datatype getFieldType(final JsonNode field) {
return sourceOperations.getFieldType(field);
}

Expand Down Expand Up @@ -515,7 +529,7 @@ public void close() {
* @param config configuration
* @return map containing relevant parsed values including location of keystore or an empty map
*/
public Map<String, String> parseSSLConfig(final JsonNode config) {
protected Map<String, String> parseSSLConfig(final JsonNode config) {
LOGGER.debug("source config: {}", config);

final Map<String, String> additionalParameters = new HashMap<>();
Expand Down Expand Up @@ -572,7 +586,7 @@ public Map<String, String> parseSSLConfig(final JsonNode config) {
* @param sslParams
* @return SSL portion of JDBC question params or and empty string
*/
public String toJDBCQueryParams(final Map<String, String> sslParams) {
protected String toJDBCQueryParams(final Map<String, String> sslParams) {
return Objects.isNull(sslParams) ? ""
: sslParams.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -80,7 +84,7 @@ public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase
LOGGER.info("Queueing query for table: {}", tableName);

final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName);
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName));
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName, getQuoteString()));

LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery);
return queryTable(database, preparedSqlQuery);
Expand All @@ -107,8 +111,8 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database
.queryMetadata(String
.format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName)));
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));

// metadata will be null if table doesn't contain records
if (sqlServerResultSetMetaData != null) {
Expand All @@ -127,7 +131,7 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
.map(
el -> hierarchyIdColumns.contains(el) ? String
.format("%s.ToString() as %s%s%s", el, identifierQuoteString, el, identifierQuoteString)
: getIdentifierWithQuoting(el))
: getIdentifierWithQuoting(el, getQuoteString()))
.toList());
} catch (final SQLException e) {
LOGGER.error("Failed to fetch metadata to prepare a proper request.", e);
Expand Down
Loading