From 9ae6e7f27bef3a8dfc91b7812c466ad9cd43db74 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 20 Jul 2022 16:30:47 -0700 Subject: [PATCH 1/8] Add order by clause in incremental query --- .../integrations/source/bigquery/BigQuerySource.java | 4 ++-- .../integrations/source/jdbc/AbstractJdbcSource.java | 11 +++++++---- .../MongoDbSource.java | 4 ++-- .../integrations/source/mssql/MssqlSource.java | 8 ++++---- .../source/relationaldb/AbstractDbSource.java | 12 ++---------- 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index 4ec30fa121408..44223992a3c8a 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -138,12 +138,12 @@ public AutoCloseableIterator queryTableIncremental(final BigQueryDatab final String tableName, final String cursorField, final StandardSQLTypeName cursorFieldType, - final String cursor) { + final String cursorValue) { return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?", enquoteIdentifierList(columnNames), getFullTableName(schemaName, tableName), cursorField), - sourceOperations.getQueryParameter(cursorFieldType, cursor)); + sourceOperations.getQueryParameter(cursorFieldType, cursorValue)); } private AutoCloseableIterator queryTableWithParams(final BigQueryDatabase database, diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index d119dc173c2fe..bcbbbc8791b3b 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -266,21 +266,24 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final String tableName, final String cursorField, final Datatype cursorFieldType, - final String cursor) { + final String cursorValue) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", + final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField); + final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s ASC", sourceOperations.enquoteIdentifierList(connection, columnNames), sourceOperations .getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), - sourceOperations.enquoteIdentifier(connection, cursorField)); + quotedCursorField, + quotedCursorField + ); final PreparedStatement preparedStatement = connection.prepareStatement(sql); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); LOGGER.info("Executing query for table: {}", tableName); return preparedStatement; }, diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 6a9a9f8fd633a..f91d92416384f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -186,8 +186,8 @@ public AutoCloseableIterator queryTableIncremental(final MongoDatabase final String tableName, final String cursorField, final BsonType cursorFieldType, - final String cursor) { - final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor)); + final String cursorValue) { + final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursorValue)); return queryTable(database, columnNames, tableName, greaterComparison); } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 264742a0e554d..3052558ceb14b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -97,7 +97,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final String tableName, final String cursorField, final JDBCType cursorFieldType, - final String cursor) { + final String cursorValue) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -115,7 +115,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql); final PreparedStatement preparedStatement = connection.prepareStatement(sql); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); LOGGER.info("Executing query for table: {}", tableName); return preparedStatement; }, @@ -279,8 +279,8 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas final List queryResponse = database.queryJsons(connection -> { boolean isAzureSQL = false; - try (Statement stmt = connection.createStatement(); - ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) { + try (final Statement stmt = connection.createStatement(); + final ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) { isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1)); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 995a49a1f4d6a..f1d81f64cc207 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -239,8 +239,7 @@ protected AutoCloseableIterator createReadIterator(final Databas airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } - final JsonSchemaPrimitive cursorType = IncrementalUtils - .getCursorType(airbyteStream, cursorField); + final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, cursorField); iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator( autoCloseableIterator, @@ -494,13 +493,6 @@ public abstract AutoCloseableIterator queryTableFullRefresh(final Data * Read incremental data from a table. Incremental read should returns only records where cursor * column value is bigger than cursor. * - * @param database source database - * @param columnNames interested column names - * @param schemaName table namespace - * @param tableName target table - * @param cursorField cursor field name - * @param cursorFieldType cursor field type - * @param cursor cursor value * @return iterator with read data */ public abstract AutoCloseableIterator queryTableIncremental(Database database, @@ -509,7 +501,7 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d String tableName, String cursorField, DataType cursorFieldType, - String cursor); + String cursorValue); private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception { final Database database = createDatabase(sourceConfig); From f42d9a5abbe404325232d3b23f40afc1f674b61e Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 21 Jul 2022 21:15:33 -0700 Subject: [PATCH 2/8] Support emitting intermediate states --- .../source/jdbc/AbstractJdbcSource.java | 14 ++--- .../source/postgres/PostgresSource.java | 12 +++-- .../source/relationaldb/AbstractDbSource.java | 18 +++++-- .../relationaldb/StateDecoratingIterator.java | 53 +++++++++++++------ .../StateDecoratingIteratorTest.java | 15 ++++-- 5 files changed, 76 insertions(+), 36 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index bcbbbc8791b3b..6a7e5576104b4 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -274,15 +274,15 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase connection -> { LOGGER.info("Preparing query for table: {}", tableName); final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField); - final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s ASC", + final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s > ?", sourceOperations.enquoteIdentifierList(connection, columnNames), - sourceOperations - .getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), - quotedCursorField, - quotedCursorField - ); + sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), + quotedCursorField)); + if (getStateEmissionFrequency() > 0) { + sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); + } - final PreparedStatement preparedStatement = connection.prepareStatement(sql); + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); LOGGER.info("Executing query for table: {}", tableName); return preparedStatement; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index e56359504e4bb..4bebf33e73694 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -5,8 +5,6 @@ package io.airbyte.integrations.source.postgres; import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_MODE; import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL; @@ -18,7 +16,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Sets; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; @@ -73,6 +70,7 @@ public class PostgresSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class); + private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000; public static final String CDC_LSN = "_ab_cdc_lsn"; public static final String DATABASE_KEY = "database"; @@ -82,6 +80,7 @@ public class PostgresSource extends AbstractJdbcSource implements Sour public static final String PORT_KEY = "port"; public static final String SCHEMAS_KEY = "schemas"; public static final String USERNAME_KEY = "username"; + static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName(); static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( "ssl", "true", @@ -94,7 +93,6 @@ public static Source sshWrappedSource() { } PostgresSource() { - super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations()); this.featureFlags = new EnvVariableFeatureFlags(); } @@ -127,7 +125,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) { if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) { additionalParameters.add("sslmode=disable"); } else { - var parametersList = obtainConnectionOptions(config.get(PARAM_SSL_MODE)) + final var parametersList = obtainConnectionOptions(config.get(PARAM_SSL_MODE)) .entrySet() .stream() .map(e -> e.getKey() + "=" + e.getValue()) @@ -416,6 +414,10 @@ protected AirbyteStateType getSupportedStateType(final JsonNode config) { return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; } + protected int getStateEmissionFrequency() { + return INTERMEDIATE_STATE_EMISSION_FREQUENCY; + } + public static void main(final String[] args) throws Exception { final Source source = PostgresSource.sshWrappedSource(); LOGGER.info("starting source: {}", PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index f1d81f64cc207..fb3bedba29a26 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -247,7 +247,8 @@ protected AutoCloseableIterator createReadIterator(final Databas pair, cursorField, cursorOptional.orElse(null), - cursorType), + cursorType, + getStateEmissionFrequency()), airbyteMessageIterator); } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); @@ -490,8 +491,10 @@ public abstract AutoCloseableIterator queryTableFullRefresh(final Data final String tableName); /** - * Read incremental data from a table. Incremental read should returns only records where cursor - * column value is bigger than cursor. + * Read incremental data from a table. Incremental read should return only records where cursor + * column value is bigger than cursor. Note that if the connector needs to emit intermediate + * state (i.e. {@link AbstractDbSource#getStateEmissionFrequency} > 0), the incremental query + * must be sorted by the cursor field. * * @return iterator with read data */ @@ -503,6 +506,15 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d DataType cursorFieldType, String cursorValue); + /** + * When larger than 0, the incremental iterator will emit intermediate state for every N records. + * Please note that if intermediate state emission is enabled, the incremental query must be + * ordered by the cursor field. + */ + protected int getStateEmissionFrequency() { + return 0; + } + private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception { final Database database = createDatabase(sourceConfig); database.setSourceConfig(sourceConfig); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 7eabaad9eb312..35eba91543972 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -25,32 +25,41 @@ public class StateDecoratingIterator extends AbstractIterator im private final AirbyteStreamNameNamespacePair pair; private final String cursorField; private final JsonSchemaPrimitive cursorType; + private final int stateEmissionFrequency; private String maxCursor; - private boolean hasEmittedState; + private boolean hasEmittedFinalState; + private int recordCount; + /** + * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every stateEmissionFrequency records. + * Only emit intermediate states if the records are sorted by the cursor field. + */ public StateDecoratingIterator(final Iterator messageIterator, final StateManager stateManager, final AirbyteStreamNameNamespacePair pair, final String cursorField, final String initialCursor, - final JsonSchemaPrimitive cursorType) { + final JsonSchemaPrimitive cursorType, + final int stateEmissionFrequency) { this.messageIterator = messageIterator; this.stateManager = stateManager; this.pair = pair; this.cursorField = cursorField; this.cursorType = cursorType; this.maxCursor = initialCursor; + this.stateEmissionFrequency = stateEmissionFrequency; } private String getCursorCandidate(final AirbyteMessage message) { - String cursorCandidate = message.getRecord().getData().get(cursorField).asText(); + final String cursorCandidate = message.getRecord().getData().get(cursorField).asText(); return (cursorCandidate != null ? cursorCandidate.replaceAll("\u0000", "") : null); } @Override protected AirbyteMessage computeNext() { if (messageIterator.hasNext()) { + ++recordCount; final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { final String cursorCandidate = getCursorCandidate(message); @@ -59,24 +68,36 @@ protected AirbyteMessage computeNext() { } } - return message; - } else if (!hasEmittedState) { - final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); - LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor {}, cursor field: {}, new cursor: {}", - pair, - stateManager.getOriginalCursorField(pair).orElse(null), - stateManager.getOriginalCursor(pair).orElse(null), - stateManager.getCursorField(pair).orElse(null), - stateManager.getCursor(pair).orElse(null)); - if (stateManager.getCursor(pair).isEmpty()) { - LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) { + emitStateMessage(); + if (!messageIterator.hasNext()) { + hasEmittedFinalState = true; + } } - hasEmittedState = true; - return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + return message; + } else if (!hasEmittedFinalState) { + final AirbyteMessage finalStateMessage = emitStateMessage(); + hasEmittedFinalState = true; + return finalStateMessage; } else { return endOfData(); } } + public AirbyteMessage emitStateMessage() { + final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); + LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}", + pair, + stateManager.getOriginalCursorField(pair).orElse(null), + stateManager.getOriginalCursor(pair).orElse(null), + stateManager.getCursorField(pair).orElse(null), + stateManager.getCursor(pair).orElse(null)); + if (stateManager.getCursor(pair).isEmpty()) { + LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + } + + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index e464a95e40fac..3e935f1b4c469 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -72,7 +72,8 @@ void testWithoutInitialCursor() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(RECORD_MESSAGE1, iterator.next()); assertEquals(RECORD_MESSAGE2, iterator.next()); @@ -90,7 +91,8 @@ void testWithInitialCursor() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, "xyz", - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(RECORD_MESSAGE1, iterator.next()); assertEquals(RECORD_MESSAGE2, iterator.next()); @@ -112,7 +114,8 @@ void testCursorFieldIsEmpty() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(recordMessage, iterator.next()); // null because no records with a cursor field were replicated for the stream. @@ -130,7 +133,8 @@ void testEmptyStream() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(stateMessage, iterator.next().getState()); assertFalse(iterator.hasNext()); @@ -147,7 +151,8 @@ void testUnicodeNull() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(RECORD_MESSAGE3, iterator.next()); assertEquals(stateMessage, iterator.next().getState()); From 6b36d9c74bdb2eaf3b4f5a442429249db1e7466b Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 21 Jul 2022 21:18:21 -0700 Subject: [PATCH 3/8] Add comment --- .../io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 6a7e5576104b4..48b35b1bb7fd5 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -278,6 +278,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase sourceOperations.enquoteIdentifierList(connection, columnNames), sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), quotedCursorField)); + // if the connector emits intermediate states, the incremental query must be sorted by the cursor field if (getStateEmissionFrequency() > 0) { sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); } From 5eb415fefc9db5cc8b51427dde77239a05e3f88f Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 21 Jul 2022 21:30:38 -0700 Subject: [PATCH 4/8] Log state warning only for final state emission --- .../relationaldb/StateDecoratingIterator.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 35eba91543972..d3142e4a9581f 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -32,8 +32,8 @@ public class StateDecoratingIterator extends AbstractIterator im private int recordCount; /** - * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every stateEmissionFrequency records. - * Only emit intermediate states if the records are sorted by the cursor field. + * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every stateEmissionFrequency records. Only emit + * intermediate states if the records are sorted by the cursor field. */ public StateDecoratingIterator(final Iterator messageIterator, final StateManager stateManager, @@ -59,7 +59,7 @@ private String getCursorCandidate(final AirbyteMessage message) { @Override protected AirbyteMessage computeNext() { if (messageIterator.hasNext()) { - ++recordCount; + recordCount++; final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { final String cursorCandidate = getCursorCandidate(message); @@ -69,23 +69,19 @@ protected AirbyteMessage computeNext() { } if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) { - emitStateMessage(); - if (!messageIterator.hasNext()) { - hasEmittedFinalState = true; - } + final boolean isFinalState = !messageIterator.hasNext(); + emitStateMessage(isFinalState); } return message; } else if (!hasEmittedFinalState) { - final AirbyteMessage finalStateMessage = emitStateMessage(); - hasEmittedFinalState = true; - return finalStateMessage; + return emitStateMessage(true); } else { return endOfData(); } } - public AirbyteMessage emitStateMessage() { + public AirbyteMessage emitStateMessage(final boolean isFinalState) { final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}", pair, @@ -93,8 +89,12 @@ public AirbyteMessage emitStateMessage() { stateManager.getOriginalCursor(pair).orElse(null), stateManager.getCursorField(pair).orElse(null), stateManager.getCursor(pair).orElse(null)); - if (stateManager.getCursor(pair).isEmpty()) { - LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + + if (isFinalState) { + hasEmittedFinalState = true; + if (stateManager.getCursor(pair).isEmpty()) { + LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + } } return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); From 1eacb08b9584e3f745fe54852846558c422871aa Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 21 Jul 2022 21:34:59 -0700 Subject: [PATCH 5/8] Format code --- .../source/jdbc/AbstractJdbcSource.java | 13 +++++++------ .../source/postgres/CdcPostgresSourceTest.java | 3 ++- .../source/postgres/PostgresSourceTest.java | 3 +-- .../source/relationaldb/AbstractDbSource.java | 10 +++++----- .../relationaldb/StateDecoratingIterator.java | 5 +++-- 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 48b35b1bb7fd5..8ae030c486777 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -278,7 +278,8 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase sourceOperations.enquoteIdentifierList(connection, columnNames), sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), quotedCursorField)); - // if the connector emits intermediate states, the incremental query must be sorted by the cursor field + // if the connector emits intermediate states, the incremental query must be sorted by the cursor + // field if (getStateEmissionFrequency() > 0) { sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); } @@ -323,8 +324,8 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { } /** - * Retrieves connection_properties from config and also validates if custom - * jdbc_url parameters overlap with the default properties + * Retrieves connection_properties from config and also validates if custom jdbc_url parameters + * overlap with the default properties * * @param config A configuration used to check Jdbc connection * @return A mapping of connection properties @@ -344,7 +345,7 @@ protected Map getConnectionProperties(final JsonNode config) { * @throws IllegalArgumentException */ private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, - final Map defaultParameters) { + final Map defaultParameters) { for (final String key : defaultParameters.keySet()) { if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); @@ -355,8 +356,8 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map firstBatchIterator = getSource() .read(getConfig(), CONFIGURED_CATALOG, null); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index e9f5a949a0d39..4bf12964e89ea 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -481,8 +481,7 @@ void testGetDefaultConnectionPropertiesWithSsl() { final Map defaultConnectionProperties = new PostgresSource().getDefaultConnectionProperties(config); assertEquals(defaultConnectionProperties, ImmutableMap.of( "ssl", "true", - "sslmode", "require" - )); + "sslmode", "require")); }; @Test diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index fb3bedba29a26..b2acd993825a2 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -492,9 +492,9 @@ public abstract AutoCloseableIterator queryTableFullRefresh(final Data /** * Read incremental data from a table. Incremental read should return only records where cursor - * column value is bigger than cursor. Note that if the connector needs to emit intermediate - * state (i.e. {@link AbstractDbSource#getStateEmissionFrequency} > 0), the incremental query - * must be sorted by the cursor field. + * column value is bigger than cursor. Note that if the connector needs to emit intermediate state + * (i.e. {@link AbstractDbSource#getStateEmissionFrequency} > 0), the incremental query must be + * sorted by the cursor field. * * @return iterator with read data */ @@ -508,8 +508,8 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d /** * When larger than 0, the incremental iterator will emit intermediate state for every N records. - * Please note that if intermediate state emission is enabled, the incremental query must be - * ordered by the cursor field. + * Please note that if intermediate state emission is enabled, the incremental query must be ordered + * by the cursor field. */ protected int getStateEmissionFrequency() { return 0; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index d3142e4a9581f..27828afcd3186 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -32,8 +32,9 @@ public class StateDecoratingIterator extends AbstractIterator im private int recordCount; /** - * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every stateEmissionFrequency records. Only emit - * intermediate states if the records are sorted by the cursor field. + * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every + * stateEmissionFrequency records. Only emit intermediate states if the records are sorted by + * the cursor field. */ public StateDecoratingIterator(final Iterator messageIterator, final StateManager stateManager, From 098c26c7535209d75c865deb2efbf48be5861cac Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 22 Jul 2022 17:08:49 -0700 Subject: [PATCH 6/8] Add unit tests --- .../relationaldb/StateDecoratingIterator.java | 11 +- .../StateDecoratingIteratorTest.java | 156 +++++++++++++----- 2 files changed, 128 insertions(+), 39 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 27828afcd3186..d2880e26a3cdd 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -28,6 +28,7 @@ public class StateDecoratingIterator extends AbstractIterator im private final int stateEmissionFrequency; private String maxCursor; + private AirbyteMessage intermediateStateMessage; private boolean hasEmittedFinalState; private int recordCount; @@ -59,7 +60,11 @@ private String getCursorCandidate(final AirbyteMessage message) { @Override protected AirbyteMessage computeNext() { - if (messageIterator.hasNext()) { + if (intermediateStateMessage != null) { + final AirbyteMessage message = intermediateStateMessage; + intermediateStateMessage = null; + return message; + } else if (messageIterator.hasNext()) { recordCount++; final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { @@ -70,8 +75,10 @@ protected AirbyteMessage computeNext() { } if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) { + // Mark the state as final in case this intermediate state happens to be the last one. + // This is not necessary, but avoid sending the final states twice and prevent any edge case. final boolean isFinalState = !messageIterator.hasNext(); - emitStateMessage(isFinalState); + intermediateStateMessage = emitStateMessage(isFinalState); } return message; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 3e935f1b4c469..460070dfdd9c9 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -33,29 +34,48 @@ class StateDecoratingIteratorTest { private static final String STREAM_NAME = "shoes"; private static final AirbyteStreamNameNamespacePair NAME_NAMESPACE_PAIR = new AirbyteStreamNameNamespacePair(STREAM_NAME, NAMESPACE); private static final String UUID_FIELD_NAME = "ascending_inventory_uuid"; - private static final AirbyteMessage RECORD_MESSAGE1 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "abc")))); - private static final AirbyteMessage RECORD_MESSAGE2 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "def")))); - - private static final AirbyteMessage RECORD_MESSAGE3 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "abc\u0000")))); + + private static final AirbyteMessage EMPTY_STATE_MESSAGE = new AirbyteMessage().withType(Type.STATE); + + private static final String RECORD_VALUE_1 = "abc"; + private static final AirbyteMessage RECORD_MESSAGE_1 = createRecordMessage(RECORD_VALUE_1); + private static final AirbyteMessage STATE_MESSAGE_1 = createStateMessage(RECORD_VALUE_1); + + private static final String RECORD_VALUE_2 = "def"; + private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2); + private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2); + + private static final String RECORD_VALUE_3 = "xyz"; + private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3); + private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3); + + private static AirbyteMessage createRecordMessage(final String recordValue) { + return new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, recordValue)))); + } + + private static AirbyteMessage createStateMessage(final String recordValue) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState(new AirbyteStateMessage() + .withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue)))); + } private static Iterator messageIterator; private StateManager stateManager; - private AirbyteStateMessage stateMessage; @BeforeEach void setup() { - messageIterator = MoreIterators.of(RECORD_MESSAGE1, RECORD_MESSAGE2); + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); + stateManager = mock(StateManager.class); - stateMessage = mock(AirbyteStateMessage.class); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(EMPTY_STATE_MESSAGE.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState()); + when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); when(stateManager.getCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); @@ -64,8 +84,6 @@ void setup() { @Test void testWithoutInitialCursor() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "def")).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, @@ -75,39 +93,35 @@ void testWithoutInitialCursor() { JsonSchemaPrimitive.STRING, 0); - assertEquals(RECORD_MESSAGE1, iterator.next()); - assertEquals(RECORD_MESSAGE2, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(RECORD_MESSAGE_1, iterator.next()); + assertEquals(RECORD_MESSAGE_2, iterator.next()); + assertEquals(STATE_MESSAGE_2, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testWithInitialCursor() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "xyz")).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, - "xyz", + RECORD_VALUE_3, JsonSchemaPrimitive.STRING, 0); - assertEquals(RECORD_MESSAGE1, iterator.next()); - assertEquals(RECORD_MESSAGE2, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(RECORD_MESSAGE_1, iterator.next()); + assertEquals(RECORD_MESSAGE_2, iterator.next()); + assertEquals(STATE_MESSAGE_3, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testCursorFieldIsEmpty() { - final AirbyteMessage recordMessage = Jsons.clone(RECORD_MESSAGE1); + final AirbyteMessage recordMessage = Jsons.clone(RECORD_MESSAGE_1); ((ObjectNode) recordMessage.getRecord().getData()).remove(UUID_FIELD_NAME); final Iterator messageStream = MoreIterators.of(recordMessage); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "xyz")).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( messageStream, stateManager, @@ -125,8 +139,6 @@ void testCursorFieldIsEmpty() { @Test void testEmptyStream() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( Collections.emptyIterator(), stateManager, @@ -136,14 +148,20 @@ void testEmptyStream() { JsonSchemaPrimitive.STRING, 0); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(EMPTY_STATE_MESSAGE, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testUnicodeNull() { - messageIterator = MoreIterators.of(RECORD_MESSAGE3); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "abc")).thenReturn(stateMessage); + final String recordValueWithNull = "abc\u0000"; + final AirbyteMessage recordMessageWithNull = createRecordMessage(recordValueWithNull); + + // UTF8 null \u0000 is removed from the cursor value in the state message + final AirbyteMessage stateMessageWithNull = STATE_MESSAGE_1; + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, recordValueWithNull)).thenReturn(stateMessageWithNull.getState()); + + messageIterator = MoreIterators.of(recordMessageWithNull); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, @@ -154,9 +172,73 @@ void testUnicodeNull() { JsonSchemaPrimitive.STRING, 0); - assertEquals(RECORD_MESSAGE3, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(recordMessageWithNull, iterator.next()); + assertEquals(stateMessageWithNull, iterator.next()); assertFalse(iterator.hasNext()); } + @Test + @DisplayName("When initial cursor is null, and emit state for every record") + void testStateEmission1() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + null, + JsonSchemaPrimitive.STRING, + 1); + + assertEquals(RECORD_MESSAGE_1, iterator1.next()); + assertEquals(STATE_MESSAGE_1, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + // final state message should only be emitted once + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + @Test + @DisplayName("When initial cursor is null, and emit state for every 2 records") + void testStateEmission2() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + null, + JsonSchemaPrimitive.STRING, + 2); + + assertEquals(RECORD_MESSAGE_1, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + @Test + @DisplayName("When initial cursor is not null") + void testStateEmission3() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + RECORD_VALUE_1, + JsonSchemaPrimitive.STRING, + 1); + + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + } From a729462a170eb51343ca3b92bd282fb436f6b08a Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Fri, 22 Jul 2022 17:10:54 -0700 Subject: [PATCH 7/8] Define message iterator in each test case --- .../source/relationaldb/StateDecoratingIteratorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 460070dfdd9c9..8f16a7d5a11ff 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -68,8 +68,6 @@ private static AirbyteMessage createStateMessage(final String recordValue) { @BeforeEach void setup() { - messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); - stateManager = mock(StateManager.class); when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(EMPTY_STATE_MESSAGE.getState()); when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState()); @@ -84,6 +82,7 @@ void setup() { @Test void testWithoutInitialCursor() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, @@ -101,6 +100,7 @@ void testWithoutInitialCursor() { @Test void testWithInitialCursor() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, From 55bf9fbf6f6842f9c51002e3c5421b79c41e7089 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Sat, 23 Jul 2022 00:28:13 -0700 Subject: [PATCH 8/8] Fix compilation error --- .../io/airbyte/integrations/source/postgres/PostgresSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 1e0cfef7d4a7d..bc3616b0a8956 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Sets; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags;