Skip to content

Code cleanup in abstract classes #18811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
Expand All @@ -18,8 +22,9 @@
import io.airbyte.db.bigquery.BigQuerySourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -34,7 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQuerySource extends AbstractRelationalDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {
public class BigQuerySource extends AbstractDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySource.class);
private static final String QUOTE = "`";
Expand Down Expand Up @@ -76,7 +81,7 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
checkList.add(database -> {
if (isDatasetConfigured(database)) {
database.query(String.format("select 1 from %s where 1=0",
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES")));
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
LOGGER.info("The source passed the Dataset query test!");
} else {
LOGGER.info("The Dataset query test is skipped due to not configured datasetId!");
Expand Down Expand Up @@ -140,12 +145,23 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final CursorInfo cursorInfo,
final StandardSQLTypeName cursorFieldType) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName),
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString()),
cursorInfo.getCursorField()),
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final BigQueryDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
}

@Override
public boolean isCursorType(final StandardSQLTypeName standardSQLTypeName) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
Expand All @@ -40,7 +43,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand Down Expand Up @@ -81,7 +84,7 @@
* relational DB source which can be accessed via JDBC driver. If you are implementing a connector
* for a relational DB which has a JDBC driver, make an effort to use this class.
*/
public abstract class AbstractJdbcSource<Datatype> extends AbstractRelationalDbSource<Datatype, JdbcDatabase> implements Source {
public abstract class AbstractJdbcSource<Datatype> extends AbstractDbSource<Datatype, JdbcDatabase> implements Source {

public static final String SSL_MODE = "sslMode";

Expand Down Expand Up @@ -136,12 +139,23 @@ public AbstractJdbcSource(final String driverClass,
this.sourceOperations = sourceOperations;
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
}

/**
* Configures a list of operations that can be used to check the connection to the source.
*
* @return list of consumers that run queries for the check command.
*/
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
protected List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
return ImmutableList.of(database -> {
LOGGER.info("Attempting to get metadata from the database to see if we can connect.");
database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), sourceOperations::rowToJson);
Expand Down Expand Up @@ -258,7 +272,7 @@ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLExceptio
* @param field Essential column information returned from
* {@link AbstractJdbcSource#getColumnMetadata}.
*/
public Datatype getFieldType(final JsonNode field) {
private Datatype getFieldType(final JsonNode field) {
return sourceOperations.getFieldType(field);
}

Expand Down Expand Up @@ -515,7 +529,7 @@ public void close() {
* @param config configuration
* @return map containing relevant parsed values including location of keystore or an empty map
*/
public Map<String, String> parseSSLConfig(final JsonNode config) {
protected Map<String, String> parseSSLConfig(final JsonNode config) {
LOGGER.debug("source config: {}", config);

final Map<String, String> additionalParameters = new HashMap<>();
Expand Down Expand Up @@ -572,7 +586,7 @@ public Map<String, String> parseSSLConfig(final JsonNode config) {
* @param sslParams
* @return SSL portion of JDBC question params or and empty string
*/
public String toJDBCQueryParams(final Map<String, String> sslParams) {
protected String toJDBCQueryParams(final Map<String, String> sslParams) {
return Objects.isNull(sslParams) ? ""
: sslParams.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -80,7 +84,7 @@ public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase
LOGGER.info("Queueing query for table: {}", tableName);

final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName);
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName));
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName, getQuoteString()));

LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery);
return queryTable(database, preparedSqlQuery);
Expand All @@ -107,8 +111,8 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database
.queryMetadata(String
.format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName)));
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));

// metadata will be null if table doesn't contain records
if (sqlServerResultSetMetaData != null) {
Expand All @@ -127,7 +131,7 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
.map(
el -> hierarchyIdColumns.contains(el) ? String
.format("%s.ToString() as %s%s%s", el, identifierQuoteString, el, identifierQuoteString)
: getIdentifierWithQuoting(el))
: getIdentifierWithQuoting(el, getQuoteString()))
.toList());
} catch (final SQLException e) {
LOGGER.error("Failed to fetch metadata to prepare a proper request.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConnectionErrorException;
Expand Down Expand Up @@ -234,7 +235,7 @@ private void validateCursorFieldForIncrementalTables(
}
}

protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
private List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
Expand All @@ -244,7 +245,7 @@ protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
Collectors.toList()));
}

protected List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
private List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
Expand Down Expand Up @@ -287,7 +288,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
* sync mode
* @return List of AirbyteMessageIterators containing all iterators for a catalog
*/
protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
Expand Down Expand Up @@ -331,7 +332,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
* @param emittedAt Time when data was emitted from the Source database
* @return
*/
protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
private AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final TableInfo<CommonField<DataType>> table,
final StateManager stateManager,
Expand Down Expand Up @@ -414,7 +415,7 @@ protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Databas
* @param emittedAt Time when data was emitted from the Source database
* @return AirbyteMessage Iterator that
*/
protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
private AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final List<String> selectedDatabaseFields,
final TableInfo<CommonField<DataType>> table,
Expand Down Expand Up @@ -456,7 +457,7 @@ protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Datab
* @param emittedAt Time when data was emitted from the Source database
* @return AirbyteMessageIterator with all records for a database source
*/
protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
final String streamName,
final String namespace,
final List<String> selectedDatabaseFields,
Expand All @@ -468,11 +469,11 @@ protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Datab
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

protected String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
private String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

public AutoCloseableIterator<AirbyteMessage> getMessageIterator(
private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
Expand All @@ -493,7 +494,7 @@ public AutoCloseableIterator<AirbyteMessage> getMessageIterator(
* @return list of table/data structure info
* @throws Exception might throw an error during connection to database
*/
protected List<TableInfo<Field>> getTables(final Database database) throws Exception {
private List<TableInfo<Field>> getTables(final Database database) throws Exception {
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
database, tableInfos);
Expand Down Expand Up @@ -522,7 +523,7 @@ protected List<TableInfo<Field>> getTables(final Database database) throws Excep
.collect(Collectors.toList());
}

protected Field toField(final CommonField<DataType> field) {
private Field toField(final CommonField<DataType> field) {
if (getType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
&& !field.getProperties().isEmpty()) {
final var properties = field.getProperties().stream().map(this::toField).toList();
Expand All @@ -532,7 +533,7 @@ protected Field toField(final CommonField<DataType> field) {
}
}

protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName,
private void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName,
final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
Expand All @@ -557,7 +558,7 @@ protected void assertColumnsWithSameNameAreSame(final String nameSpace, final St
* for SELECT-ing the table with privileges. In some cases such SELECT doesn't require (e.g. in
* Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its owner).
*/
public <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
protected <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
final String schema) throws SQLException {
return Collections.emptySet();
}
Expand Down Expand Up @@ -585,7 +586,7 @@ public <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
*
* @return list of consumers that run queries for the check command.
*/
public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config)
protected abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config)
throws Exception;

/**
Expand All @@ -601,7 +602,7 @@ public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(Js
*
* @return set of system namespaces(schemas) to be excluded
*/
public abstract Set<String> getExcludedInternalNameSpaces();
protected abstract Set<String> getExcludedInternalNameSpaces();

/**
* Discover all available tables in the source database.
Expand Down Expand Up @@ -653,7 +654,7 @@ protected abstract Map<String, List<String>> discoverPrimaryKeys(Database databa
* @param tableName target table
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName);
Expand All @@ -666,7 +667,7 @@ public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Data
*
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
protected abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
Expand All @@ -685,7 +686,7 @@ protected int getStateEmissionFrequency() {
/**
* @return list of fields that could be used as cursors
*/
public abstract boolean isCursorType(DataType type);
protected abstract boolean isCursorType(DataType type);

private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
final Database database = createDatabase(sourceConfig);
Expand Down
Loading