diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index 4ec30fa121408..44223992a3c8a 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -138,12 +138,12 @@ public AutoCloseableIterator queryTableIncremental(final BigQueryDatab final String tableName, final String cursorField, final StandardSQLTypeName cursorFieldType, - final String cursor) { + final String cursorValue) { return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?", enquoteIdentifierList(columnNames), getFullTableName(schemaName, tableName), cursorField), - sourceOperations.getQueryParameter(cursorFieldType, cursor)); + sourceOperations.getQueryParameter(cursorFieldType, cursorValue)); } private AutoCloseableIterator queryTableWithParams(final BigQueryDatabase database, diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index ea58d1cb01083..6a0220331f2c7 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -266,21 +266,26 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final String tableName, final String cursorField, final Datatype cursorFieldType, - final String cursor) { + final String cursorValue) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", + final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField); + final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s > ?", sourceOperations.enquoteIdentifierList(connection, columnNames), - sourceOperations - .getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), - sourceOperations.enquoteIdentifier(connection, cursorField)); + sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), + quotedCursorField)); + // if the connector emits intermediate states, the incremental query must be sorted by the cursor + // field + if (getStateEmissionFrequency() > 0) { + sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); + } - final PreparedStatement preparedStatement = connection.prepareStatement(sql); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor); + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); LOGGER.info("Executing query for table: {}", tableName); return preparedStatement; }, @@ -340,7 +345,7 @@ protected Map getConnectionProperties(final JsonNode config) { * @throws IllegalArgumentException */ protected static void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, - final Map defaultParameters) { + final Map defaultParameters) { for (final String key : defaultParameters.keySet()) { if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 45568922714dd..93eb23ff092ab 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -182,8 +182,8 @@ public AutoCloseableIterator queryTableIncremental(final MongoDatabase final String tableName, final String cursorField, final BsonType cursorFieldType, - final String cursor) { - final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor)); + final String cursorValue) { + final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursorValue)); return queryTable(database, columnNames, tableName, greaterComparison); } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 077c11b1a0c06..2830156a2dc3b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -95,7 +95,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final String tableName, final String cursorField, final JDBCType cursorFieldType, - final String cursor) { + final String cursorValue) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -113,7 +113,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql); final PreparedStatement preparedStatement = connection.prepareStatement(sql); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); LOGGER.info("Executing query for table: {}", tableName); return preparedStatement; }, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index a8da51ae7d4dd..d04c62dadbd3f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -72,6 +72,8 @@ public class PostgresSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class); + private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000; + static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName(); static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( "ssl", "true", @@ -84,7 +86,6 @@ public static Source sshWrappedSource() { } PostgresSource() { - super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations()); this.featureFlags = new EnvVariableFeatureFlags(); } @@ -408,6 +409,10 @@ protected AirbyteStateType getSupportedStateType(final JsonNode config) { return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; } + protected int getStateEmissionFrequency() { + return INTERMEDIATE_STATE_EMISSION_FREQUENCY; + } + public static void main(final String[] args) throws Exception { final Source source = PostgresSource.sshWrappedSource(); LOGGER.info("starting source: {}", PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 995a49a1f4d6a..b2acd993825a2 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -239,8 +239,7 @@ protected AutoCloseableIterator createReadIterator(final Databas airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } - final JsonSchemaPrimitive cursorType = IncrementalUtils - .getCursorType(airbyteStream, cursorField); + final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, cursorField); iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator( autoCloseableIterator, @@ -248,7 +247,8 @@ protected AutoCloseableIterator createReadIterator(final Databas pair, cursorField, cursorOptional.orElse(null), - cursorType), + cursorType, + getStateEmissionFrequency()), airbyteMessageIterator); } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); @@ -491,16 +491,11 @@ public abstract AutoCloseableIterator queryTableFullRefresh(final Data final String tableName); /** - * Read incremental data from a table. Incremental read should returns only records where cursor - * column value is bigger than cursor. + * Read incremental data from a table. Incremental read should return only records where cursor + * column value is bigger than cursor. Note that if the connector needs to emit intermediate state + * (i.e. {@link AbstractDbSource#getStateEmissionFrequency} > 0), the incremental query must be + * sorted by the cursor field. * - * @param database source database - * @param columnNames interested column names - * @param schemaName table namespace - * @param tableName target table - * @param cursorField cursor field name - * @param cursorFieldType cursor field type - * @param cursor cursor value * @return iterator with read data */ public abstract AutoCloseableIterator queryTableIncremental(Database database, @@ -509,7 +504,16 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d String tableName, String cursorField, DataType cursorFieldType, - String cursor); + String cursorValue); + + /** + * When larger than 0, the incremental iterator will emit intermediate state for every N records. + * Please note that if intermediate state emission is enabled, the incremental query must be ordered + * by the cursor field. + */ + protected int getStateEmissionFrequency() { + return 0; + } private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception { final Database database = createDatabase(sourceConfig); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 7eabaad9eb312..d2880e26a3cdd 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -25,32 +25,47 @@ public class StateDecoratingIterator extends AbstractIterator im private final AirbyteStreamNameNamespacePair pair; private final String cursorField; private final JsonSchemaPrimitive cursorType; + private final int stateEmissionFrequency; private String maxCursor; - private boolean hasEmittedState; + private AirbyteMessage intermediateStateMessage; + private boolean hasEmittedFinalState; + private int recordCount; + /** + * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every + * stateEmissionFrequency records. Only emit intermediate states if the records are sorted by + * the cursor field. + */ public StateDecoratingIterator(final Iterator messageIterator, final StateManager stateManager, final AirbyteStreamNameNamespacePair pair, final String cursorField, final String initialCursor, - final JsonSchemaPrimitive cursorType) { + final JsonSchemaPrimitive cursorType, + final int stateEmissionFrequency) { this.messageIterator = messageIterator; this.stateManager = stateManager; this.pair = pair; this.cursorField = cursorField; this.cursorType = cursorType; this.maxCursor = initialCursor; + this.stateEmissionFrequency = stateEmissionFrequency; } private String getCursorCandidate(final AirbyteMessage message) { - String cursorCandidate = message.getRecord().getData().get(cursorField).asText(); + final String cursorCandidate = message.getRecord().getData().get(cursorField).asText(); return (cursorCandidate != null ? cursorCandidate.replaceAll("\u0000", "") : null); } @Override protected AirbyteMessage computeNext() { - if (messageIterator.hasNext()) { + if (intermediateStateMessage != null) { + final AirbyteMessage message = intermediateStateMessage; + intermediateStateMessage = null; + return message; + } else if (messageIterator.hasNext()) { + recordCount++; final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { final String cursorCandidate = getCursorCandidate(message); @@ -59,24 +74,38 @@ protected AirbyteMessage computeNext() { } } - return message; - } else if (!hasEmittedState) { - final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); - LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor {}, cursor field: {}, new cursor: {}", - pair, - stateManager.getOriginalCursorField(pair).orElse(null), - stateManager.getOriginalCursor(pair).orElse(null), - stateManager.getCursorField(pair).orElse(null), - stateManager.getCursor(pair).orElse(null)); - if (stateManager.getCursor(pair).isEmpty()) { - LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) { + // Mark the state as final in case this intermediate state happens to be the last one. + // This is not necessary, but avoid sending the final states twice and prevent any edge case. + final boolean isFinalState = !messageIterator.hasNext(); + intermediateStateMessage = emitStateMessage(isFinalState); } - hasEmittedState = true; - return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + return message; + } else if (!hasEmittedFinalState) { + return emitStateMessage(true); } else { return endOfData(); } } + public AirbyteMessage emitStateMessage(final boolean isFinalState) { + final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); + LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}", + pair, + stateManager.getOriginalCursorField(pair).orElse(null), + stateManager.getOriginalCursor(pair).orElse(null), + stateManager.getCursorField(pair).orElse(null), + stateManager.getCursor(pair).orElse(null)); + + if (isFinalState) { + hasEmittedFinalState = true; + if (stateManager.getCursor(pair).isEmpty()) { + LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + } + } + + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index e464a95e40fac..8f16a7d5a11ff 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -33,29 +34,46 @@ class StateDecoratingIteratorTest { private static final String STREAM_NAME = "shoes"; private static final AirbyteStreamNameNamespacePair NAME_NAMESPACE_PAIR = new AirbyteStreamNameNamespacePair(STREAM_NAME, NAMESPACE); private static final String UUID_FIELD_NAME = "ascending_inventory_uuid"; - private static final AirbyteMessage RECORD_MESSAGE1 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "abc")))); - private static final AirbyteMessage RECORD_MESSAGE2 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "def")))); - - private static final AirbyteMessage RECORD_MESSAGE3 = new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "abc\u0000")))); + + private static final AirbyteMessage EMPTY_STATE_MESSAGE = new AirbyteMessage().withType(Type.STATE); + + private static final String RECORD_VALUE_1 = "abc"; + private static final AirbyteMessage RECORD_MESSAGE_1 = createRecordMessage(RECORD_VALUE_1); + private static final AirbyteMessage STATE_MESSAGE_1 = createStateMessage(RECORD_VALUE_1); + + private static final String RECORD_VALUE_2 = "def"; + private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2); + private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2); + + private static final String RECORD_VALUE_3 = "xyz"; + private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3); + private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3); + + private static AirbyteMessage createRecordMessage(final String recordValue) { + return new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, recordValue)))); + } + + private static AirbyteMessage createStateMessage(final String recordValue) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState(new AirbyteStateMessage() + .withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue)))); + } private static Iterator messageIterator; private StateManager stateManager; - private AirbyteStateMessage stateMessage; @BeforeEach void setup() { - messageIterator = MoreIterators.of(RECORD_MESSAGE1, RECORD_MESSAGE2); stateManager = mock(StateManager.class); - stateMessage = mock(AirbyteStateMessage.class); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(EMPTY_STATE_MESSAGE.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState()); + when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); when(stateManager.getCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); @@ -64,55 +82,54 @@ void setup() { @Test void testWithoutInitialCursor() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "def")).thenReturn(stateMessage); - + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); - assertEquals(RECORD_MESSAGE1, iterator.next()); - assertEquals(RECORD_MESSAGE2, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(RECORD_MESSAGE_1, iterator.next()); + assertEquals(RECORD_MESSAGE_2, iterator.next()); + assertEquals(STATE_MESSAGE_2, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testWithInitialCursor() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "xyz")).thenReturn(stateMessage); - + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, - "xyz", - JsonSchemaPrimitive.STRING); + RECORD_VALUE_3, + JsonSchemaPrimitive.STRING, + 0); - assertEquals(RECORD_MESSAGE1, iterator.next()); - assertEquals(RECORD_MESSAGE2, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(RECORD_MESSAGE_1, iterator.next()); + assertEquals(RECORD_MESSAGE_2, iterator.next()); + assertEquals(STATE_MESSAGE_3, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testCursorFieldIsEmpty() { - final AirbyteMessage recordMessage = Jsons.clone(RECORD_MESSAGE1); + final AirbyteMessage recordMessage = Jsons.clone(RECORD_MESSAGE_1); ((ObjectNode) recordMessage.getRecord().getData()).remove(UUID_FIELD_NAME); final Iterator messageStream = MoreIterators.of(recordMessage); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "xyz")).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( messageStream, stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); assertEquals(recordMessage, iterator.next()); // null because no records with a cursor field were replicated for the stream. @@ -122,24 +139,29 @@ void testCursorFieldIsEmpty() { @Test void testEmptyStream() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(stateMessage); - final StateDecoratingIterator iterator = new StateDecoratingIterator( Collections.emptyIterator(), stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(EMPTY_STATE_MESSAGE, iterator.next()); assertFalse(iterator.hasNext()); } @Test void testUnicodeNull() { - messageIterator = MoreIterators.of(RECORD_MESSAGE3); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "abc")).thenReturn(stateMessage); + final String recordValueWithNull = "abc\u0000"; + final AirbyteMessage recordMessageWithNull = createRecordMessage(recordValueWithNull); + + // UTF8 null \u0000 is removed from the cursor value in the state message + final AirbyteMessage stateMessageWithNull = STATE_MESSAGE_1; + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, recordValueWithNull)).thenReturn(stateMessageWithNull.getState()); + + messageIterator = MoreIterators.of(recordMessageWithNull); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, @@ -147,11 +169,76 @@ void testUnicodeNull() { NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, null, - JsonSchemaPrimitive.STRING); + JsonSchemaPrimitive.STRING, + 0); - assertEquals(RECORD_MESSAGE3, iterator.next()); - assertEquals(stateMessage, iterator.next().getState()); + assertEquals(recordMessageWithNull, iterator.next()); + assertEquals(stateMessageWithNull, iterator.next()); assertFalse(iterator.hasNext()); } + @Test + @DisplayName("When initial cursor is null, and emit state for every record") + void testStateEmission1() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + null, + JsonSchemaPrimitive.STRING, + 1); + + assertEquals(RECORD_MESSAGE_1, iterator1.next()); + assertEquals(STATE_MESSAGE_1, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + // final state message should only be emitted once + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + @Test + @DisplayName("When initial cursor is null, and emit state for every 2 records") + void testStateEmission2() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + null, + JsonSchemaPrimitive.STRING, + 2); + + assertEquals(RECORD_MESSAGE_1, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + @Test + @DisplayName("When initial cursor is not null") + void testStateEmission3() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + RECORD_VALUE_1, + JsonSchemaPrimitive.STRING, + 1); + + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + }