diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5ea4c5006542e..fc8cf2578bb4f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -836,7 +836,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.15 + dockerImageTag: 1.0.16 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index d6e64732c8836..405a7a8989e2d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8596,7 +8596,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.15" +- dockerImage: "airbyte/source-postgres:1.0.16" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index dce577a068f84..f15a35ac7caf3 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -19,6 +19,7 @@ import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; +import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.JsonSchemaType; @@ -136,14 +137,13 @@ public AutoCloseableIterator queryTableIncremental(final BigQueryDatab final List columnNames, final String schemaName, final String tableName, - final String cursorField, - final StandardSQLTypeName cursorFieldType, - final String cursorValue) { + final CursorInfo cursorInfo, + final StandardSQLTypeName cursorFieldType) { return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?", enquoteIdentifierList(columnNames), getFullTableName(schemaName, tableName), - cursorField), - sourceOperations.getQueryParameter(cursorFieldType, cursorValue)); + cursorInfo.getCursorField()), + sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor())); } @Override diff --git a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java index 31b1cfabec040..1e5ca5b4202d5 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java @@ -91,9 +91,9 @@ public JsonNode toDatabaseConfig(final JsonNode config) { config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText())); - boolean isAdditionalParamsExists = + final boolean isAdditionalParamsExists = config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty(); - List params = new ArrayList<>(); + final List params = new ArrayList<>(); // assume ssl if not explicitly mentioned. if (isSsl) { params.add(SSL_MODE); diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java index 976066cf0c327..18504b4e7c061 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java @@ -331,7 +331,8 @@ void testReadOneTableIncrementallyTwice() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("5"))))))); + .withCursor("5") + .withCursorRecordCount(1L))))))); setEmittedAtToNull(actualMessagesSecondSync); @@ -463,7 +464,8 @@ void testReadMultipleTablesIncrementally() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("3"), + .withCursor("3") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(streamName2) .withStreamNamespace(namespace) @@ -481,12 +483,14 @@ void testReadMultipleTablesIncrementally() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("3"), + .withCursor("3") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(streamName2) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("3"))))))); + .withCursor("3") + .withCursorRecordCount(1L))))))); setEmittedAtToNull(actualMessagesFirstSync); diff --git a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java index b1a7cbd4768fe..8ff36d97b9db8 100644 --- a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java @@ -63,14 +63,16 @@ static void init() throws IOException, InterruptedException { TABLE_NAME_COMPOSITE_PK = "FULL_NAME_COMPOSITE_PK"; TABLE_NAME_WITHOUT_CURSOR_TYPE = "TABLE_NAME_WITHOUT_CURSOR_TYPE"; TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE = "TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE"; + TABLE_NAME_AND_TIMESTAMP = "NAME_AND_TIMESTAMP"; TEST_TABLES = ImmutableSet - .of(TABLE_NAME, TABLE_NAME_WITHOUT_PK, TABLE_NAME_COMPOSITE_PK); + .of(TABLE_NAME, TABLE_NAME_WITHOUT_PK, TABLE_NAME_COMPOSITE_PK, TABLE_NAME_AND_TIMESTAMP); COL_ID = "ID"; COL_NAME = "NAME"; COL_UPDATED_AT = "UPDATED_AT"; COL_FIRST_NAME = "FIRST_NAME"; COL_LAST_NAME = "LAST_NAME"; COL_LAST_NAME_WITH_SPACE = "LAST NAME"; + COL_TIMESTAMP = "TIMESTAMP"; // In Db2 PK columns must be declared with NOT NULL statement. COLUMN_CLAUSE_WITH_PK = "id INTEGER NOT NULL, name VARCHAR(200), updated_at DATE"; COLUMN_CLAUSE_WITH_COMPOSITE_PK = "first_name VARCHAR(200) NOT NULL, last_name VARCHAR(200) NOT NULL, updated_at DATE"; diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java index de55925b1bc94..773d784f28d7d 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java @@ -111,6 +111,11 @@ protected int getStateEmissionFrequency() { return INTERMEDIATE_STATE_EMISSION_FREQUENCY; } + @Override + protected String getCountColumnName() { + return "RECORD_COUNT"; + } + private CheckedFunction getPrivileges() { return connection -> connection.prepareStatement( "SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER"); diff --git a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java index 406d27c4a647c..cd99b0ab5da60 100644 --- a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java @@ -41,14 +41,16 @@ static void init() { TABLE_NAME_COMPOSITE_PK = "FULL_NAME_COMPOSITE_PK"; TABLE_NAME_WITHOUT_CURSOR_TYPE = "TABLE_NAME_WITHOUT_CURSOR_TYPE"; TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE = "TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE"; + TABLE_NAME_AND_TIMESTAMP = "NAME_AND_TIMESTAMP"; TEST_TABLES = ImmutableSet - .of(TABLE_NAME, TABLE_NAME_WITHOUT_PK, TABLE_NAME_COMPOSITE_PK); + .of(TABLE_NAME, TABLE_NAME_WITHOUT_PK, TABLE_NAME_COMPOSITE_PK, TABLE_NAME_AND_TIMESTAMP); COL_ID = "ID"; COL_NAME = "NAME"; COL_UPDATED_AT = "UPDATED_AT"; COL_FIRST_NAME = "FIRST_NAME"; COL_LAST_NAME = "LAST_NAME"; COL_LAST_NAME_WITH_SPACE = "LAST NAME"; + COL_TIMESTAMP = "TIMESTAMP"; // In Db2 PK columns must be declared with NOT NULL statement. COLUMN_CLAUSE_WITH_PK = "id INTEGER NOT NULL, name VARCHAR(200), updated_at DATE"; COLUMN_CLAUSE_WITH_COMPOSITE_PK = "first_name VARCHAR(200) NOT NULL, last_name VARCHAR(200) NOT NULL, updated_at DATE"; diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 59611c601fe4a..3c694621e23a5 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -41,6 +41,7 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; +import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.CommonField; @@ -49,6 +50,7 @@ import io.airbyte.protocol.models.JsonSchemaType; import java.net.MalformedURLException; import java.net.URI; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -328,20 +330,37 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final List columnNames, final String schemaName, final String tableName, - final String cursorField, - final Datatype cursorFieldType, - final String cursorValue) { + final CursorInfo cursorInfo, + final Datatype cursorFieldType) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField); - final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s > ?", - sourceOperations.enquoteIdentifierList(connection, columnNames), - sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), - quotedCursorField)); + final String fullTableName = sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName); + final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorInfo.getCursorField()); + + final String operator; + if (cursorInfo.getCursorRecordCount() <= 0L) { + operator = ">"; + } else { + final long actualRecordCount = getActualCursorRecordCount( + connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, cursorInfo.getCursorRecordCount(), actualRecordCount); + if (actualRecordCount == cursorInfo.getCursorRecordCount()) { + operator = ">"; + } else { + operator = ">="; + } + } + + final String wrappedColumnNames = getWrappedColumnNames(database, connection, columnNames, schemaName, tableName); + final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s %s ?", + wrappedColumnNames, + fullTableName, + quotedCursorField, + operator)); // if the connector emits intermediate states, the incremental query must be sorted by the cursor // field if (getStateEmissionFrequency() > 0) { @@ -349,8 +368,8 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase } final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); - LOGGER.info("Executing query for table: {}", tableName); + LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); + sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); return preparedStatement; }, sourceOperations::rowToJson); @@ -361,6 +380,51 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase }); } + /** + * Some databases need special column names in the query. + */ + protected String getWrappedColumnNames(final JdbcDatabase database, + final Connection connection, + final List columnNames, + final String schemaName, + final String tableName) throws SQLException { + return sourceOperations.enquoteIdentifierList(connection, columnNames); + } + + protected String getCountColumnName() { + return "record_count"; + } + + private long getActualCursorRecordCount(final Connection connection, + final String fullTableName, + final String quotedCursorField, + final Datatype cursorFieldType, + final String cursor) + throws SQLException { + final String columnName = getCountColumnName(); + final PreparedStatement cursorRecordStatement; + if (cursor == null) { + final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL", + columnName, + fullTableName, + quotedCursorField); + cursorRecordStatement = connection.prepareStatement(cursorRecordQuery); + } else { + final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s = ?", + columnName, + fullTableName, + quotedCursorField); + cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);; + sourceOperations.setStatementField(cursorRecordStatement, 1, cursorFieldType, cursor); + } + final ResultSet resultSet = cursorRecordStatement.executeQuery(); + if (resultSet.next()) { + return resultSet.getLong(columnName); + } else { + return 0L; + } + } + protected DataSource createDataSource(final JsonNode config) { final JsonNode jdbcConfig = toDatabaseConfig(config); final DataSource dataSource = DataSourceFactory.create( diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 54a10f609e3f4..2522e7352622a 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -92,6 +92,8 @@ public abstract class JdbcSourceAcceptanceTest { public static String TABLE_NAME_COMPOSITE_PK = "full_name_composite_pk"; public static String TABLE_NAME_WITHOUT_CURSOR_TYPE = "table_without_cursor_type"; public static String TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE = "table_with_null_cursor_type"; + // this table is used in testing incremental sync with concurrent insertions + public static String TABLE_NAME_AND_TIMESTAMP = "name_and_timestamp"; public static String COL_ID = "id"; public static String COL_NAME = "name"; @@ -100,6 +102,8 @@ public abstract class JdbcSourceAcceptanceTest { public static String COL_LAST_NAME = "last_name"; public static String COL_LAST_NAME_WITH_SPACE = "last name"; public static String COL_CURSOR = "cursor_field"; + public static String COL_TIMESTAMP = "timestamp"; + public static String COL_TIMESTAMP_TYPE = "TIMESTAMP"; public static Number ID_VALUE_1 = 1; public static Number ID_VALUE_2 = 2; public static Number ID_VALUE_3 = 3; @@ -116,6 +120,8 @@ public abstract class JdbcSourceAcceptanceTest { public static String INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(0);"; public static String CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s VARCHAR(20));"; public static String INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES('Hello world :)');"; + public static String INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY = "INSERT INTO %s (name, timestamp) VALUES ('%s', '%s')"; + public JsonNode config; public DataSource dataSource; public JdbcDatabase database; @@ -707,7 +713,8 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("5"); + .withCursor("5") + .withCursorRecordCount(1L); expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } @@ -763,7 +770,8 @@ void testReadMultipleTablesIncrementally() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("3"), + .withCursor("3") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(streamName2) .withStreamNamespace(namespace) @@ -775,12 +783,14 @@ void testReadMultipleTablesIncrementally() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("3"), + .withCursor("3") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(streamName2) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("3")); + .withCursor("3") + .withCursorRecordCount(1L)); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1)); @@ -818,6 +828,111 @@ protected void incrementalCursorCheck( expectedRecordMessages); } + // See https://github.com/airbytehq/airbyte/issues/14732 for rationale and details. + @Test + void testIncrementalWithConcurrentInsertion() throws Exception { + final String namespace = getDefaultNamespace(); + final String fullyQualifiedTableName = getFullyQualifiedTableName(TABLE_NAME_AND_TIMESTAMP); + final String columnDefinition = String.format("name VARCHAR(200) NOT NULL, timestamp %s NOT NULL", COL_TIMESTAMP_TYPE); + + // 1st sync + database.execute(ctx -> { + ctx.createStatement().execute(createTableQuery(fullyQualifiedTableName, columnDefinition, "")); + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "a", "2021-01-01 00:00:00")); + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "b", "2021-01-01 00:00:00")); + }); + + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog( + new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + TABLE_NAME_AND_TIMESTAMP, + namespace, + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_TIMESTAMP, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE))))); + configuredCatalog.getStreams().forEach(airbyteStream -> { + airbyteStream.setSyncMode(SyncMode.INCREMENTAL); + airbyteStream.setCursorField(List.of(COL_TIMESTAMP)); + airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND); + }); + + final List firstSyncActualMessages = MoreIterators.toList( + source.read(config, configuredCatalog, createEmptyState(TABLE_NAME_AND_TIMESTAMP, namespace))); + + // cursor after 1st sync: 2021-01-01 00:00:00, count 2 + final Optional firstSyncStateOptional = firstSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst(); + assertTrue(firstSyncStateOptional.isPresent()); + final JsonNode firstSyncState = getStateData(firstSyncStateOptional.get(), TABLE_NAME_AND_TIMESTAMP); + assertEquals(firstSyncState.get("cursor_field").elements().next().asText(), COL_TIMESTAMP); + assertTrue(firstSyncState.get("cursor").asText().contains("2021-01-01")); + assertTrue(firstSyncState.get("cursor").asText().contains("00:00:00")); + assertEquals(2L, firstSyncState.get("cursor_record_count").asLong()); + + final List firstSyncNames = firstSyncActualMessages.stream() + .filter(r -> r.getType() == Type.RECORD) + .map(r -> r.getRecord().getData().get(COL_NAME).asText()) + .toList(); + assertEquals(List.of("a", "b"), firstSyncNames); + + // 2nd sync + database.execute(ctx -> { + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "c", "2021-01-02 00:00:00")); + }); + + final List secondSyncActualMessages = MoreIterators.toList( + source.read(config, configuredCatalog, createState(TABLE_NAME_AND_TIMESTAMP, namespace, firstSyncState))); + + // cursor after 2nd sync: 2021-01-02 00:00:00, count 1 + final Optional secondSyncStateOptional = secondSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst(); + assertTrue(secondSyncStateOptional.isPresent()); + final JsonNode secondSyncState = getStateData(secondSyncStateOptional.get(), TABLE_NAME_AND_TIMESTAMP); + assertEquals(secondSyncState.get("cursor_field").elements().next().asText(), COL_TIMESTAMP); + assertTrue(secondSyncState.get("cursor").asText().contains("2021-01-02")); + assertTrue(secondSyncState.get("cursor").asText().contains("00:00:00")); + assertEquals(1L, secondSyncState.get("cursor_record_count").asLong()); + + final List secondSyncNames = secondSyncActualMessages.stream() + .filter(r -> r.getType() == Type.RECORD) + .map(r -> r.getRecord().getData().get(COL_NAME).asText()) + .toList(); + assertEquals(List.of("c"), secondSyncNames); + + // 3rd sync has records with duplicated cursors + database.execute(ctx -> { + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "d", "2021-01-02 00:00:00")); + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "e", "2021-01-02 00:00:00")); + ctx.createStatement().execute(String.format(INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "f", "2021-01-03 00:00:00")); + }); + + final List thirdSyncActualMessages = MoreIterators.toList( + source.read(config, configuredCatalog, createState(TABLE_NAME_AND_TIMESTAMP, namespace, secondSyncState))); + + // Cursor after 3rd sync is: 2021-01-03 00:00:00, count 1. + final Optional thirdSyncStateOptional = thirdSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst(); + assertTrue(thirdSyncStateOptional.isPresent()); + final JsonNode thirdSyncState = getStateData(thirdSyncStateOptional.get(), TABLE_NAME_AND_TIMESTAMP); + assertEquals(thirdSyncState.get("cursor_field").elements().next().asText(), COL_TIMESTAMP); + assertTrue(thirdSyncState.get("cursor").asText().contains("2021-01-03")); + assertTrue(thirdSyncState.get("cursor").asText().contains("00:00:00")); + assertEquals(1L, thirdSyncState.get("cursor_record_count").asLong()); + + // The c, d, e, f are duplicated records from this sync, because the cursor + // record count in the database is different from that in the state. + final List thirdSyncExpectedNames = thirdSyncActualMessages.stream() + .filter(r -> r.getType() == Type.RECORD) + .map(r -> r.getRecord().getData().get(COL_NAME).asText()) + .toList(); + assertEquals(List.of("c", "d", "e", "f"), thirdSyncExpectedNames); + } + + private JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) { + for (final JsonNode stream : airbyteMessage.getState().getData().get("streams")) { + if (stream.get("stream_name").asText().equals(streamName)) { + return stream; + } + } + throw new IllegalArgumentException("Stream not found in state message: " + streamName); + } + private void incrementalCursorCheck( final String initialCursorField, final String cursorField, @@ -849,7 +964,8 @@ private void incrementalCursorCheck( .withStreamName(airbyteStream.getStream().getName()) .withStreamNamespace(airbyteStream.getStream().getNamespace()) .withCursorField(List.of(initialCursorField)) - .withCursor(initialCursorValue); + .withCursor(initialCursorValue) + .withCursorRecordCount(1L); final List actualMessages = MoreIterators .toList(source.read(config, configuredCatalog, Jsons.jsonNode(createState(List.of(dbStreamState))))); @@ -861,7 +977,8 @@ private void incrementalCursorCheck( .withStreamName(airbyteStream.getStream().getName()) .withStreamNamespace(airbyteStream.getStream().getNamespace()) .withCursorField(List.of(cursorField)) - .withCursor(endCursorValue)); + .withCursor(endCursorValue) + .withCursorRecordCount(1L)); final List expectedMessages = new ArrayList<>(expectedRecordMessages); expectedMessages.addAll(createExpectedTestMessages(expectedStreams)); @@ -1082,6 +1199,28 @@ protected JsonNode createEmptyState(final String streamName, final String stream } } + protected JsonNode createState(final String streamName, final String streamNamespace, final JsonNode stateData) { + if (supportsPerStream()) { + final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withName(streamName).withNamespace(streamNamespace)) + .withStreamState(stateData)); + return Jsons.jsonNode(List.of(airbyteStateMessage)); + } else { + final List cursorFields = MoreIterators.toList(stateData.get("cursor_field").elements()).stream().map(JsonNode::asText).toList(); + final DbState dbState = new DbState().withStreams(List.of( + new DbStreamState() + .withStreamName(streamName) + .withStreamNamespace(streamNamespace) + .withCursor(stateData.get("cursor").asText()) + .withCursorField(cursorFields) + .withCursorRecordCount(stateData.get("cursor_record_count").asLong()))); + return Jsons.jsonNode(dbState); + } + } + /** * Extracts the state component from the provided {@link AirbyteMessage} based on the value returned * by {@link #supportsPerStream()}. diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 45d8ba7004456..cd09d9b22eef8 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -24,6 +24,7 @@ import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.AbstractDbSource; +import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.JsonSchemaType; @@ -179,10 +180,9 @@ public AutoCloseableIterator queryTableIncremental(final MongoDatabase final List columnNames, final String schemaName, final String tableName, - final String cursorField, - final BsonType cursorFieldType, - final String cursorValue) { - final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursorValue)); + final CursorInfo cursorInfo, + final BsonType cursorFieldType) { + final Bson greaterComparison = gt(cursorInfo.getCursorField(), MongoUtils.getBsonValue(cursorFieldType, cursorInfo.getCursor())); return queryTable(database, columnNames, tableName, greaterComparison); } diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mssql/MssqlStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mssql/MssqlStrictEncryptJdbcSourceAcceptanceTest.java index b160b66eaa37e..d741e616d607b 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mssql/MssqlStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mssql/MssqlStrictEncryptJdbcSourceAcceptanceTest.java @@ -38,6 +38,10 @@ public class MssqlStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAccept @BeforeAll static void init() { + // In mssql, timestamp is generated automatically, so we need to use + // the datetime type instead so that we can set the value manually. + COL_TIMESTAMP_TYPE = "DATETIME"; + dbContainer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); dbContainer.start(); } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 84d440940d6e4..76bcf8ed7bbd9 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -17,7 +17,6 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; @@ -37,6 +36,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.SyncMode; import java.io.File; +import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -49,7 +49,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,54 +79,13 @@ public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase final String tableName) { LOGGER.info("Queueing query for table: {}", tableName); - final List newIdentifiersList = getWrappedColumn(database, - columnNames, - schemaName, tableName, "\""); - final String preparedSqlQuery = String - .format("SELECT %s FROM %s", String.join(",", newIdentifiersList), - getFullTableName(schemaName, tableName)); + final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName); + final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName)); LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery); return queryTable(database, preparedSqlQuery); } - @Override - public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String cursorField, - final JDBCType cursorFieldType, - final String cursorValue) { - LOGGER.info("Queueing query for table: {}", tableName); - return AutoCloseableIterators.lazyIterator(() -> { - try { - final Stream stream = database.unsafeQuery( - connection -> { - LOGGER.info("Preparing query for table: {}", tableName); - - final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); - final List newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString); - - final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", - String.join(",", newColumnNames), - sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), - sourceOperations.enquoteIdentifier(connection, cursorField)); - LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql); - - final PreparedStatement preparedStatement = connection.prepareStatement(sql); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue); - LOGGER.info("Executing query for table: {}", tableName); - return preparedStatement; - }, - sourceOperations::rowToJson); - return AutoCloseableIterators.fromStream(stream); - } catch (final SQLException e) { - throw new RuntimeException(e); - } - }); - } - /** * There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be * converted to a nvarchar(4000) data type by calling the ToString() method. So we make a separate @@ -137,13 +95,15 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase * * @return the list with Column names updated to handle functions (if nay) properly */ - private List getWrappedColumn(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String enquoteSymbol) { + @Override + protected String getWrappedColumnNames(final JdbcDatabase database, + final Connection connection, + final List columnNames, + final String schemaName, + final String tableName) { final List hierarchyIdColumns = new ArrayList<>(); try { + final String identifierQuoteString = database.getMetaData().getIdentifierQuoteString(); final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database .queryMetadata(String .format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type @@ -159,20 +119,20 @@ private List getWrappedColumn(final JdbcDatabase database, } } + // iterate through names and replace Hierarchyid field for query is with toString() function + // Eventually would get columns like this: testColumn.toString as "testColumn" + // toString function in SQL server is the only way to get human readable value, but not mssql + // specific HEX value + return String.join(", ", columnNames.stream() + .map( + el -> hierarchyIdColumns.contains(el) ? String + .format("%s.ToString() as %s%s%s", el, identifierQuoteString, el, identifierQuoteString) + : getIdentifierWithQuoting(el)) + .toList()); } catch (final SQLException e) { LOGGER.error("Failed to fetch metadata to prepare a proper request.", e); + throw new RuntimeException(e); } - - // iterate through names and replace Hierarchyid field for query is with toString() function - // Eventually would get columns like this: testColumn.toString as "testColumn" - // toString function in SQL server is the only way to get human readable value, but not mssql - // specific HEX value - return columnNames.stream() - .map( - el -> hierarchyIdColumns.contains(el) ? String - .format("%s.ToString() as %s%s%s", el, enquoteSymbol, el, enquoteSymbol) - : getIdentifierWithQuoting(el)) - .collect(toList()); } @Override @@ -245,15 +205,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { } @Override - public List>> discoverInternal(JdbcDatabase database) throws Exception { + public List>> discoverInternal(final JdbcDatabase database) throws Exception { final List>> internals = super.discoverInternal(database); if (schemas != null && !schemas.isEmpty()) { // process explicitly filtered (from UI) schemas - List>> resultInternals = internals + final List>> resultInternals = internals .stream() .filter(this::isTableInRequestedSchema) .toList(); - for (TableInfo> info : resultInternals) { + for (final TableInfo> info : resultInternals) { LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName()); } return resultInternals; @@ -263,7 +223,7 @@ public List>> discoverInternal(JdbcDatabase data } } - private boolean isTableInRequestedSchema(TableInfo> tableInfo) { + private boolean isTableInRequestedSchema(final TableInfo> tableInfo) { return schemas .stream() .anyMatch(schema -> schema.equals(tableInfo.getNameSpace())); diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index a72a5991e7b58..f4f5cd2aeeef6 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -38,6 +38,10 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { @BeforeAll static void init() { + // In mssql, timestamp is generated automatically, so we need to use + // the datetime type instead so that we can set the value manually. + COL_TIMESTAMP_TYPE = "DATETIME"; + dbContainer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); dbContainer.start(); } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java index 51767bd1a8e01..ba22c3d1e9249 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java @@ -231,7 +231,8 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("5"); + .withCursor("5") + .withCursorRecordCount(1L); expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index 82bca2a517d90..b50caaef548c3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -53,7 +53,8 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"); props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); - // For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are specifically defined in the replication_method + // For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are + // specifically defined in the replication_method // config. if (sourceConfig.get("replication_method").has("server_time_zone")) { final String serverTimeZone = sourceConfig.get("replication_method").get("server_time_zone").asText(); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java index 303efaa43242c..682d4186c54a5 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.source.mysql.helpers; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.db.jdbc.JdbcDatabase; import java.time.Duration; diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index fdbe4551a8d2b..e8eec3bafac93 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -284,7 +284,8 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("5"); + .withCursor("5") + .withCursorRecordCount(1L); expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java index bdee5069be372..d7f713671d6cc 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java @@ -69,12 +69,14 @@ static void init() { TABLE_NAME_WITH_SPACES = "ID AND NAME"; TABLE_NAME_WITHOUT_PK = "ID_AND_NAME_WITHOUT_PK"; TABLE_NAME_COMPOSITE_PK = "FULL_NAME_COMPOSITE_PK"; + TABLE_NAME_AND_TIMESTAMP = "NAME_AND_TIMESTAMP"; COL_ID = "ID"; COL_NAME = "NAME"; COL_UPDATED_AT = "UPDATED_AT"; COL_FIRST_NAME = "FIRST_NAME"; COL_LAST_NAME = "LAST_NAME"; COL_LAST_NAME_WITH_SPACE = "LAST NAME"; + COL_TIMESTAMP = "TIMESTAMP"; ID_VALUE_1 = new BigDecimal(1); ID_VALUE_2 = new BigDecimal(2); ID_VALUE_3 = new BigDecimal(3); @@ -84,6 +86,7 @@ static void init() { INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(to_clob('clob data'))"; CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s VARCHAR(20))"; INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES('Hello world :)')"; + INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY = "INSERT INTO %s (name, timestamp) VALUES ('%s', TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS'))"; ORACLE_DB = new AirbyteOracleTestContainer() .withUsername("test") @@ -380,7 +383,8 @@ void testReadOneTableIncrementallyTwice() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("5"))))))); + .withCursor("5") + .withCursorRecordCount(1L))))))); setEmittedAtToNull(actualMessagesSecondSync); diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index dcaf1803f431e..89ce3406fd758 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -74,12 +74,14 @@ static void init() { TABLE_NAME_WITH_SPACES = "ID AND NAME"; TABLE_NAME_WITHOUT_PK = "ID_AND_NAME_WITHOUT_PK"; TABLE_NAME_COMPOSITE_PK = "FULL_NAME_COMPOSITE_PK"; + TABLE_NAME_AND_TIMESTAMP = "NAME_AND_TIMESTAMP"; COL_ID = "ID"; COL_NAME = "NAME"; COL_UPDATED_AT = "UPDATED_AT"; COL_FIRST_NAME = "FIRST_NAME"; COL_LAST_NAME = "LAST_NAME"; COL_LAST_NAME_WITH_SPACE = "LAST NAME"; + COL_TIMESTAMP = "TIMESTAMP"; ID_VALUE_1 = new BigDecimal(1); ID_VALUE_2 = new BigDecimal(2); ID_VALUE_3 = new BigDecimal(3); @@ -89,6 +91,7 @@ static void init() { INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(to_clob('clob data'))"; CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s VARCHAR(20))"; INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES('Hello world :)')"; + INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY = "INSERT INTO %s (name, timestamp) VALUES ('%s', TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS'))"; ORACLE_DB = new AirbyteOracleTestContainer() .withEnv("NLS_DATE_FORMAT", "YYYY-MM-DD") @@ -282,7 +285,8 @@ void testReadOneTableIncrementallyTwice() throws Exception { .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("5"))))))); + .withCursor("5") + .withCursorRecordCount(1L))))))); setEmittedAtToNull(actualMessagesSecondSync); diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 5cc9e8cb9e8e7..ff44c65d602e5 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.15 +LABEL io.airbyte.version=1.0.16 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 8bb028fafeaa3..7984704fde708 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.15 +LABEL io.airbyte.version=1.0.16 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index e09667d7dcdf8..c47aaa9497fc7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -446,7 +446,8 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("5"); + .withCursor("5") + .withCursorRecordCount(1L); expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index da1eb7c241fbd..4cebc03732953 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -312,11 +312,17 @@ protected AutoCloseableIterator createReadIterator(final Databas // this is where the bifurcation between full refresh and incremental if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { final String cursorField = IncrementalUtils.getCursorField(airbyteStream); - final Optional cursorOptional = stateManager.getCursor(pair); + final Optional cursorInfo = stateManager.getCursorInfo(pair); final AutoCloseableIterator airbyteMessageIterator; - if (cursorOptional.isPresent()) { - airbyteMessageIterator = getIncrementalStream(database, airbyteStream, selectedDatabaseFields, table, cursorOptional.get(), emittedAt); + if (cursorInfo.map(CursorInfo::getCursor).isPresent()) { + airbyteMessageIterator = getIncrementalStream( + database, + airbyteStream, + selectedDatabaseFields, + table, + cursorInfo.get(), + emittedAt); } else { // if no cursor is present then this is the first read for is the same as doing a full refresh read. airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); @@ -329,7 +335,7 @@ protected AutoCloseableIterator createReadIterator(final Databas stateManager, pair, cursorField, - cursorOptional.orElse(null), + cursorInfo.map(CursorInfo::getCursor).orElse(null), cursorType, getStateEmissionFrequency()), airbyteMessageIterator); @@ -356,7 +362,7 @@ protected AutoCloseableIterator createReadIterator(final Databas * @param airbyteStream represents an ingestion source (e.g. API endpoint or database table) * @param selectedDatabaseFields subset of database fields selected for replication * @param table information in tabular format - * @param cursor state of where to start the sync from + * @param cursorInfo state of where to start the sync from * @param emittedAt Time when data was emitted from the Source database * @return AirbyteMessage Iterator that */ @@ -364,7 +370,7 @@ protected AutoCloseableIterator getIncrementalStream(final Datab final ConfiguredAirbyteStream airbyteStream, final List selectedDatabaseFields, final TableInfo> table, - final String cursor, + final CursorInfo cursorInfo, final Instant emittedAt) { final String streamName = airbyteStream.getStream().getName(); final String namespace = airbyteStream.getStream().getNamespace(); @@ -383,9 +389,8 @@ protected AutoCloseableIterator getIncrementalStream(final Datab selectedDatabaseFields, table.getNameSpace(), table.getName(), - cursorField, - cursorType, - cursor); + cursorInfo, + cursorType); return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli()); } @@ -606,9 +611,8 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d List columnNames, String schemaName, String tableName, - String cursorField, - DataType cursorFieldType, - String cursorValue); + CursorInfo cursorInfo, + DataType cursorFieldType); /** * When larger than 0, the incremental iterator will emit intermediate state for every N records. diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CursorInfo.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CursorInfo.java index fc796c73d2b99..4122e95f4ef0f 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CursorInfo.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CursorInfo.java @@ -10,18 +10,31 @@ public class CursorInfo { private final String originalCursorField; private final String originalCursor; + private final long originalCursorRecordCount; private final String cursorField; private String cursor; + private long cursorRecordCount; public CursorInfo(final String originalCursorField, final String originalCursor, final String cursorField, final String cursor) { + this(originalCursorField, originalCursor, 0L, cursorField, cursor, 0L); + } + + public CursorInfo(final String originalCursorField, + final String originalCursor, + final long originalCursorRecordCount, + final String cursorField, + final String cursor, + final long cursorRecordCount) { this.originalCursorField = originalCursorField; this.originalCursor = originalCursor; + this.originalCursorRecordCount = originalCursorRecordCount; this.cursorField = cursorField; this.cursor = cursor; + this.cursorRecordCount = cursorRecordCount; } public String getOriginalCursorField() { @@ -32,6 +45,10 @@ public String getOriginalCursor() { return originalCursor; } + public long getOriginalCursorRecordCount() { + return originalCursorRecordCount; + } + public String getCursorField() { return cursorField; } @@ -40,12 +57,21 @@ public String getCursor() { return cursor; } + public long getCursorRecordCount() { + return cursorRecordCount; + } + @SuppressWarnings("UnusedReturnValue") public CursorInfo setCursor(final String cursor) { this.cursor = cursor; return this; } + public CursorInfo setCursorRecordCount(final long cursorRecordCount) { + this.cursorRecordCount = cursorRecordCount; + return this; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -55,14 +81,17 @@ public boolean equals(final Object o) { return false; } final CursorInfo that = (CursorInfo) o; - return Objects.equals(originalCursorField, that.originalCursorField) && Objects - .equals(originalCursor, that.originalCursor) - && Objects.equals(cursorField, that.cursorField) && Objects.equals(cursor, that.cursor); + return Objects.equals(originalCursorField, that.originalCursorField) + && Objects.equals(originalCursor, that.originalCursor) + && Objects.equals(originalCursorRecordCount, that.originalCursorRecordCount) + && Objects.equals(cursorField, that.cursorField) + && Objects.equals(cursor, that.cursor) + && Objects.equals(cursorRecordCount, that.cursorRecordCount); } @Override public int hashCode() { - return Objects.hash(originalCursorField, originalCursor, cursorField, cursor); + return Objects.hash(originalCursorField, originalCursor, originalCursorRecordCount, cursorField, cursor, cursorRecordCount); } @Override @@ -70,8 +99,10 @@ public String toString() { return "CursorInfo{" + "originalCursorField='" + originalCursorField + '\'' + ", originalCursor='" + originalCursor + '\'' + + ", originalCursorRecordCount='" + originalCursorRecordCount + '\'' + ", cursorField='" + cursorField + '\'' + ", cursor='" + cursor + '\'' + + ", cursorRecordCount='" + cursorRecordCount + '\'' + '}'; } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 605c3aca8eba8..c77a61fb9845f 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -30,6 +30,7 @@ public class StateDecoratingIterator extends AbstractIterator im private final String initialCursor; private String maxCursor; + private long maxCursorRecordCount = 0L; private boolean hasEmittedFinalState; /** @@ -125,13 +126,17 @@ protected AirbyteMessage computeNext() { final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { final String cursorCandidate = getCursorCandidate(message); - if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) { + final int cursorComparison = IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType); + if (cursorComparison < 0) { if (stateEmissionFrequency > 0 && !Objects.equals(maxCursor, initialCursor) && messageIterator.hasNext()) { // Only emit an intermediate state when it is not the first or last record message, // because the last state message will be taken care of in a different branch. intermediateStateMessage = createStateMessage(false); } maxCursor = cursorCandidate; + maxCursorRecordCount = 1L; + } else if (cursorComparison == 0) { + maxCursorRecordCount++; } } @@ -183,18 +188,21 @@ protected final Optional getIntermediateMessage() { * @return AirbyteMessage which includes information on state of records read so far */ public AirbyteMessage createStateMessage(final boolean isFinalState) { - final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); - LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}", + final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor, maxCursorRecordCount); + final Optional cursorInfo = stateManager.getCursorInfo(pair); + LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})", pair, - stateManager.getOriginalCursorField(pair).orElse(null), - stateManager.getOriginalCursor(pair).orElse(null), - stateManager.getCursorField(pair).orElse(null), - stateManager.getCursor(pair).orElse(null)); + cursorInfo.map(CursorInfo::getOriginalCursorField).orElse(null), + cursorInfo.map(CursorInfo::getOriginalCursor).orElse(null), + cursorInfo.map(CursorInfo::getOriginalCursorRecordCount).orElse(null), + cursorInfo.map(CursorInfo::getCursorField).orElse(null), + cursorInfo.map(CursorInfo::getCursor).orElse(null), + cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null)); if (isFinalState) { hasEmittedFinalState = true; if (stateManager.getCursor(pair).isEmpty()) { - LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair); + LOGGER.warn("Cursor for stream {} was null. This stream will replicate all records on the next run", pair); } } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java index dec78ec39facf..df8d1200b5b5e 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java @@ -40,6 +40,8 @@ public abstract class AbstractStateManager implements StateManager { * the connector's state. * @param cursorFieldFunction A {@link Function} that extracts the cursor field name from a stream * stored in the connector's state. + * @param cursorRecordCountFunction A {@link Function} that extracts the cursor record count for a + * stream stored in the connector's state. * @param namespacePairFunction A {@link Function} that generates a * {@link AirbyteStreamNameNamespacePair} that identifies each stream in the connector's * state. @@ -48,8 +50,9 @@ public AbstractStateManager(final ConfiguredAirbyteCatalog catalog, final Supplier> streamSupplier, final Function cursorFunction, final Function> cursorFieldFunction, + final Function cursorRecordCountFunction, final Function namespacePairFunction) { - cursorManager = new CursorManager(catalog, streamSupplier, cursorFunction, cursorFieldFunction, namespacePairFunction); + cursorManager = new CursorManager(catalog, streamSupplier, cursorFunction, cursorFieldFunction, cursorRecordCountFunction, namespacePairFunction); } @Override diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java index 2fabade977264..ca5c0504d9e9b 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -47,6 +48,8 @@ public class CursorManager { * the connector's state. * @param cursorFieldFunction A {@link Function} that extracts the cursor field name from a stream * stored in the connector's state. + * @param cursorRecordCountFunction A {@link Function} that extracts the cursor record count for a + * stream stored in the connector's state. * @param namespacePairFunction A {@link Function} that generates a * {@link AirbyteStreamNameNamespacePair} that identifies each stream in the connector's * state. @@ -55,8 +58,10 @@ public CursorManager(final ConfiguredAirbyteCatalog catalog, final Supplier> streamSupplier, final Function cursorFunction, final Function> cursorFieldFunction, + final Function cursorRecordCountFunction, final Function namespacePairFunction) { - pairToCursorInfo = createCursorInfoMap(catalog, streamSupplier, cursorFunction, cursorFieldFunction, namespacePairFunction); + pairToCursorInfo = createCursorInfoMap( + catalog, streamSupplier, cursorFunction, cursorFieldFunction, cursorRecordCountFunction, namespacePairFunction); } /** @@ -70,6 +75,8 @@ public CursorManager(final ConfiguredAirbyteCatalog catalog, * the connector's state. * @param cursorFieldFunction A {@link Function} that extracts the cursor field name from a stream * stored in the connector's state. + * @param cursorRecordCountFunction A {@link Function} that extracts the cursor record count for a + * stream stored in the connector's state. * @param namespacePairFunction A {@link Function} that generates a * {@link AirbyteStreamNameNamespacePair} that identifies each stream in the connector's * state. @@ -81,13 +88,14 @@ protected Map createCursorInfoMap( final Supplier> streamSupplier, final Function cursorFunction, final Function> cursorFieldFunction, + final Function cursorRecordCountFunction, final Function namespacePairFunction) { final Set allStreamNames = catalog.getStreams() .stream() .map(ConfiguredAirbyteStream::getStream) .map(AirbyteStreamNameNamespacePair::fromAirbyteSteam) .collect(Collectors.toSet()); - allStreamNames.addAll(streamSupplier.get().stream().map(namespacePairFunction).filter(n -> n != null).collect(Collectors.toSet())); + allStreamNames.addAll(streamSupplier.get().stream().map(namespacePairFunction).filter(Objects::nonNull).collect(Collectors.toSet())); final Map localMap = new HashMap<>(); final Map pairToState = streamSupplier.get() @@ -99,7 +107,8 @@ protected Map createCursorInfoMap( for (final AirbyteStreamNameNamespacePair pair : allStreamNames) { final Optional stateOptional = Optional.ofNullable(pairToState.get(pair)); final Optional streamOptional = Optional.ofNullable(pairToConfiguredAirbyteStream.get(pair)); - localMap.put(pair, createCursorInfoForStream(pair, stateOptional, streamOptional, cursorFunction, cursorFieldFunction)); + localMap.put(pair, + createCursorInfoForStream(pair, stateOptional, streamOptional, cursorFunction, cursorFieldFunction, cursorRecordCountFunction)); } return localMap; @@ -118,6 +127,8 @@ protected Map createCursorInfoMap( * associated with the stream. * @param cursorFieldFunction A {@link Function} that provides the cursor field name for the cursor * stored in the state associated with the stream. + * @param cursorRecordCountFunction A {@link Function} that extracts the cursor record count for a + * stream stored in the connector's state. * @return A {@link CursorInfo} object based on the data currently stored in the connector's state * for the given stream. */ @@ -127,15 +138,18 @@ protected CursorInfo createCursorInfoForStream(final AirbyteStreamNameNamespaceP final Optional stateOptional, final Optional streamOptional, final Function cursorFunction, - final Function> cursorFieldFunction) { + final Function> cursorFieldFunction, + final Function cursorRecordCountFunction) { final String originalCursorField = stateOptional .map(cursorFieldFunction) .flatMap(f -> f.size() > 0 ? Optional.of(f.get(0)) : Optional.empty()) .orElse(null); final String originalCursor = stateOptional.map(cursorFunction).orElse(null); + final long originalCursorRecordCount = stateOptional.map(cursorRecordCountFunction).orElse(0L); final String cursor; final String cursorField; + final long cursorRecordCount; // if cursor field is set in catalog. if (streamOptional.map(ConfiguredAirbyteStream::getCursorField).isPresent()) { @@ -148,19 +162,23 @@ protected CursorInfo createCursorInfoForStream(final AirbyteStreamNameNamespaceP // if cursor field in catalog and state are the same. if (stateOptional.map(cursorFieldFunction).equals(streamOptional.map(ConfiguredAirbyteStream::getCursorField))) { cursor = stateOptional.map(cursorFunction).orElse(null); - LOGGER.info("Found matching cursor in state. Stream: {}. Cursor Field: {} Value: {}", pair, cursorField, cursor); + cursorRecordCount = stateOptional.map(cursorRecordCountFunction).orElse(0L); + LOGGER.info("Found matching cursor in state. Stream: {}. Cursor Field: {} Value: {} Count: {}", + pair, cursorField, cursor, cursorRecordCount); // if cursor field in catalog and state are different. } else { cursor = null; + cursorRecordCount = 0L; LOGGER.info( - "Found cursor field. Does not match previous cursor field. Stream: {}. Original Cursor Field: {}. New Cursor Field: {}. Resetting cursor value.", - pair, originalCursorField, cursorField); + "Found cursor field. Does not match previous cursor field. Stream: {}. Original Cursor Field: {} (count {}). New Cursor Field: {}. Resetting cursor value.", + pair, originalCursorField, originalCursorRecordCount, cursorField); } // if cursor field is not set in state but is set in catalog. } else { LOGGER.info("No cursor field set in catalog but not present in state. Stream: {}, New Cursor Field: {}. Resetting cursor value", pair, cursorField); cursor = null; + cursorRecordCount = 0L; } // if cursor field is not set in catalog. } else { @@ -169,9 +187,10 @@ protected CursorInfo createCursorInfoForStream(final AirbyteStreamNameNamespaceP pair, originalCursorField, originalCursor); cursorField = null; cursor = null; + cursorRecordCount = 0L; } - return new CursorInfo(originalCursorField, originalCursor, cursorField, cursor); + return new CursorInfo(originalCursorField, originalCursor, originalCursorRecordCount, cursorField, cursor, cursorRecordCount); } /** diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java index ff4cf6de52a57..ee170e5d518c8 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java @@ -6,6 +6,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION; import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_RECORD_COUNT_FUNCTION; import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; import io.airbyte.commons.json.Jsons; @@ -55,6 +56,7 @@ public GlobalStateManager(final AirbyteStateMessage airbyteStateMessage, final C getStreamsSupplier(airbyteStateMessage), CURSOR_FUNCTION, CURSOR_FIELD_FUNCTION, + CURSOR_RECORD_COUNT_FUNCTION, NAME_NAMESPACE_PAIR_FUNCTION); this.cdcStateManager = new CdcStateManager(extractCdcState(airbyteStateMessage), extractStreams(airbyteStateMessage)); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java index 1847faafe60b7..a1e147e76d055 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java @@ -4,17 +4,16 @@ package io.airbyte.integrations.source.relationaldb.state; -import com.google.common.base.Preconditions; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; -import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; import org.slf4j.Logger; @@ -44,6 +43,9 @@ public class LegacyStateManager extends AbstractStateManager> CURSOR_FIELD_FUNCTION = DbStreamState::getCursorField; + private static final Function CURSOR_RECORD_COUNT_FUNCTION = + stream -> Objects.requireNonNullElse(stream.getCursorRecordCount(), 0L); + /** * {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state. */ @@ -70,9 +72,10 @@ public class LegacyStateManager extends AbstractStateManager dbState.getStreams(), + dbState::getStreams, CURSOR_FUNCTION, CURSOR_FIELD_FUNCTION, + CURSOR_RECORD_COUNT_FUNCTION, NAME_NAMESPACE_PAIR_FUNCTION); this.cdcStateManager = new CdcStateManager(dbState.getCdcState(), AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog)); @@ -99,11 +102,14 @@ public AirbyteStateMessage toState(final Optional cursorInfo = getCursorInfo(pair); - Preconditions.checkState(cursorInfo.isPresent(), "Could not find cursor information for stream: " + pair); - cursorInfo.get().setCursor(cursor); + return super.updateAndEmit(pair, cursor, cursorRecordCount); } return toState(Optional.ofNullable(pair)); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java index 40fa957c71b53..130a520e98b23 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -52,6 +52,11 @@ public class StateGeneratorUtils { } }; + public static final Function CURSOR_RECORD_COUNT_FUNCTION = stream -> { + final Optional dbStreamState = StateGeneratorUtils.extractState(stream); + return dbStreamState.map(DbStreamState::getCursorRecordCount).orElse(0L); + }; + /** * {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state. */ @@ -120,11 +125,15 @@ public static DbState generateDbState(final Map 0L) { + state.setCursorRecordCount(cursorInfo.getCursorRecordCount()); + } + return state; } /** diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java index a4234454b06fd..3039758f97465 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java @@ -140,10 +140,17 @@ default AirbyteStateMessage emit(final Optional * manager. */ default AirbyteStateMessage updateAndEmit(final AirbyteStreamNameNamespacePair pair, final String cursor) { + return updateAndEmit(pair, cursor, 0L); + } + + default AirbyteStateMessage updateAndEmit(final AirbyteStreamNameNamespacePair pair, final String cursor, final long cursorRecordCount) { final Optional cursorInfo = getCursorInfo(pair); Preconditions.checkState(cursorInfo.isPresent(), "Could not find cursor information for stream: " + pair); - LOGGER.debug("Updating cursor value for {} to {}...", pair, cursor); cursorInfo.get().setCursor(cursor); + if (cursorRecordCount > 0L) { + cursorInfo.get().setCursorRecordCount(cursorRecordCount); + } + LOGGER.debug("Updating cursor value for {} to {} (count {})...", pair, cursor, cursorRecordCount); return emit(Optional.ofNullable(pair)); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java index 701fc099edcce..2d1cd66673d18 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java @@ -6,6 +6,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION; import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_RECORD_COUNT_FUNCTION; import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; import io.airbyte.commons.json.Jsons; @@ -25,7 +26,7 @@ /** * Per-stream implementation of the {@link StateManager} interface. - * + *

* This implementation generates a state object for each stream detected in catalog/map of known * streams to cursor information stored in this manager. */ @@ -44,9 +45,10 @@ public class StreamStateManager extends AbstractStateManager airbyteStateMessages, final ConfiguredAirbyteCatalog catalog) { super(catalog, - () -> airbyteStateMessages.stream().map(a -> a.getStream()).collect(Collectors.toList()), + () -> airbyteStateMessages.stream().map(AirbyteStateMessage::getStream).collect(Collectors.toList()), CURSOR_FUNCTION, CURSOR_FIELD_FUNCTION, + CURSOR_RECORD_COUNT_FUNCTION, NAME_NAMESPACE_PAIR_FUNCTION); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/resources/db_models/db_models.yaml b/airbyte-integrations/connectors/source-relational-db/src/main/resources/db_models/db_models.yaml index 28af27c8046bf..ba6e769cac668 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/resources/db_models/db_models.yaml +++ b/airbyte-integrations/connectors/source-relational-db/src/main/resources/db_models/db_models.yaml @@ -46,3 +46,6 @@ definitions: cursor: description: string representation of the last value recorded for the cursor. type: string + cursor_record_count: + description: number of records that have the cursor value. + type: integer diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 7532ca2517b5f..088f46fdfd5be 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -73,7 +73,7 @@ private static AirbyteMessage createStateMessage(final String recordValue) { } private Iterator createExceptionIterator() { - return new Iterator() { + return new Iterator<>() { final Iterator internalMessageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_2, RECORD_MESSAGE_3); @@ -104,17 +104,14 @@ public AirbyteMessage next() { @BeforeEach void setup() { stateManager = mock(StateManager.class); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null)).thenReturn(EMPTY_STATE_MESSAGE.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4)).thenReturn(STATE_MESSAGE_4.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5)).thenReturn(STATE_MESSAGE_5.getState()); - - when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); - when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); - when(stateManager.getCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); - when(stateManager.getCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null, 0)).thenReturn(EMPTY_STATE_MESSAGE.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1, 1L)).thenReturn(STATE_MESSAGE_1.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 1L)).thenReturn(STATE_MESSAGE_2.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 1L)).thenReturn(STATE_MESSAGE_3.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4, 1L)).thenReturn(STATE_MESSAGE_4.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 1L)).thenReturn(STATE_MESSAGE_5.getState()); + + when(stateManager.getCursorInfo(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); } @Test @@ -137,6 +134,10 @@ void testWithoutInitialCursor() { @Test void testWithInitialCursor() { + // record 1 and 2 has smaller cursor value, so at the end, the initial cursor is emitted with 0 + // record count + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 0L)).thenReturn(STATE_MESSAGE_5.getState()); + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( messageIterator, @@ -177,6 +178,12 @@ void testCursorFieldIsEmpty() { @Test void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() { final Iterator exceptionIterator = createExceptionIterator(); + + // The mock record count matches the number of records returned by the exception iterator. + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1, 1L)).thenReturn(STATE_MESSAGE_1.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 2L)).thenReturn(STATE_MESSAGE_2.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 1L)).thenReturn(STATE_MESSAGE_3.getState()); + final StateDecoratingIterator iterator = new StateDecoratingIterator( exceptionIterator, stateManager, @@ -242,7 +249,7 @@ void testUnicodeNull() { // UTF8 null \u0000 is removed from the cursor value in the state message final AirbyteMessage stateMessageWithNull = STATE_MESSAGE_1; - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, recordValueWithNull)).thenReturn(stateMessageWithNull.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, recordValueWithNull, 1L)).thenReturn(stateMessageWithNull.getState()); messageIterator = MoreIterators.of(recordMessageWithNull); @@ -364,11 +371,17 @@ void testStateEmissionWhenInitialCursorIsNotNull() { * start with `F1 > 16` and skip record 3. *

* So intermediate state emission should only happen when all records with the same cursor value has - * been synced to destination. Reference: https://github.com/airbytehq/airbyte/issues/15427 + * been synced to destination. Reference: + * link */ @Test @DisplayName("When there are multiple records with the same cursor value") void testStateEmissionForRecordsSharingSameCursorValue() { + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 2L)).thenReturn(STATE_MESSAGE_2.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 3L)).thenReturn(STATE_MESSAGE_3.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4, 1L)).thenReturn(STATE_MESSAGE_4.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 2L)).thenReturn(STATE_MESSAGE_5.getState()); + messageIterator = MoreIterators.of( RECORD_MESSAGE_2, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3, diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java index 67b7fddc23f52..5f85d99be4d8e 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.CURSOR; import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.CURSOR_FIELD1; import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.CURSOR_FIELD2; +import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.CURSOR_RECORD_COUNT; import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.NAME_NAMESPACE_PAIR1; import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.NAME_NAMESPACE_PAIR2; import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.getCatalog; @@ -19,6 +20,7 @@ import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import java.util.Collections; import java.util.Optional; +import java.util.function.Function; import org.junit.jupiter.api.Test; /** @@ -26,16 +28,25 @@ */ public class CursorManagerTest { + private static final Function CURSOR_RECORD_COUNT_FUNCTION = stream -> { + if (stream.getCursorRecordCount() != null) { + return stream.getCursorRecordCount(); + } else { + return 0L; + } + }; + @Test void testCreateCursorInfoCatalogAndStateSameCursorField() { final CursorManager cursorManager = createCursorManager(CURSOR_FIELD1, CURSOR, NAME_NAMESPACE_PAIR1); final CursorInfo actual = cursorManager.createCursorInfoForStream( NAME_NAMESPACE_PAIR1, - getState(CURSOR_FIELD1, CURSOR), + getState(CURSOR_FIELD1, CURSOR, CURSOR_RECORD_COUNT), getStream(CURSOR_FIELD1), DbStreamState::getCursor, - DbStreamState::getCursorField); - assertEquals(new CursorInfo(CURSOR_FIELD1, CURSOR, CURSOR_FIELD1, CURSOR), actual); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); + assertEquals(new CursorInfo(CURSOR_FIELD1, CURSOR, CURSOR_RECORD_COUNT, CURSOR_FIELD1, CURSOR, CURSOR_RECORD_COUNT), actual); } @Test @@ -46,7 +57,8 @@ void testCreateCursorInfoCatalogAndStateSameCursorFieldButNoCursor() { getState(CURSOR_FIELD1, null), getStream(CURSOR_FIELD1), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(CURSOR_FIELD1, null, CURSOR_FIELD1, null), actual); } @@ -58,7 +70,8 @@ void testCreateCursorInfoCatalogAndStateChangeInCursorFieldName() { getState(CURSOR_FIELD1, CURSOR), getStream(CURSOR_FIELD2), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(CURSOR_FIELD1, CURSOR, CURSOR_FIELD2, null), actual); } @@ -70,7 +83,8 @@ void testCreateCursorInfoCatalogAndNoState() { Optional.empty(), getStream(CURSOR_FIELD1), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(null, null, CURSOR_FIELD1, null), actual); } @@ -82,7 +96,8 @@ void testCreateCursorInfoStateAndNoCatalog() { getState(CURSOR_FIELD1, CURSOR), Optional.empty(), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(CURSOR_FIELD1, CURSOR, null, null), actual); } @@ -95,7 +110,8 @@ void testCreateCursorInfoNoCatalogAndNoState() { Optional.empty(), Optional.empty(), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(null, null, null, null), actual); } @@ -107,7 +123,8 @@ void testCreateCursorInfoStateAndCatalogButNoCursorField() { getState(CURSOR_FIELD1, CURSOR), getStream(null), DbStreamState::getCursor, - DbStreamState::getCursorField); + DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION); assertEquals(new CursorInfo(CURSOR_FIELD1, CURSOR, null, null), actual); } @@ -134,6 +151,7 @@ private CursorManager createCursorManager(final String cursorFiel () -> Collections.singleton(dbStreamState), DbStreamState::getCursor, DbStreamState::getCursorField, + CURSOR_RECORD_COUNT_FUNCTION, s -> nameNamespacePair); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java index 3ef46156c5dc0..7df8b7c4ee5eb 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java @@ -135,7 +135,7 @@ void testToStateFromLegacyState() { .withGlobal(expectedGlobalState) .withType(AirbyteStateType.GLOBAL); - final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a"); + final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a", 1L); assertEquals(expected, actualFirstEmission); } @@ -167,7 +167,8 @@ void testToState() { .withStreamName(STREAM_NAME1) .withStreamNamespace(NAMESPACE) .withCursorField(List.of(CURSOR_FIELD1)) - .withCursor("a"), + .withCursor("a") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(STREAM_NAME2) .withStreamNamespace(NAMESPACE) @@ -204,7 +205,7 @@ void testToState() { .withGlobal(expectedGlobalState) .withType(AirbyteStateType.GLOBAL); - final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a"); + final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a", 1L); assertEquals(expected, actualFirstEmission); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java index e939c9aea87dd..1e6ac72d25b3f 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java @@ -28,10 +28,10 @@ public final class StateTestConstants { public static final String CURSOR_FIELD1 = "year"; public static final String CURSOR_FIELD2 = "generation"; public static final String CURSOR = "2000"; + public static final long CURSOR_RECORD_COUNT = 19L; private StateTestConstants() {} - @SuppressWarnings("SameParameterValue") public static Optional getState(final String cursorField, final String cursor) { return Optional.of(new DbStreamState() .withStreamName(STREAM_NAME1) @@ -39,6 +39,14 @@ public static Optional getState(final String cursorField, final S .withCursor(cursor)); } + public static Optional getState(final String cursorField, final String cursor, final long cursorRecordCount) { + return Optional.of(new DbStreamState() + .withStreamName(STREAM_NAME1) + .withCursorField(Lists.newArrayList(cursorField)) + .withCursor(cursor) + .withCursorRecordCount(cursorRecordCount)); + } + public static Optional getCatalog(final String cursorField) { return Optional.of(new ConfiguredAirbyteCatalog() .withStreams(List.of(getStream(cursorField).orElse(null)))); diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java index 4b6876987fe48..7484202848b99 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java @@ -106,7 +106,8 @@ void testToState() { .withStreamName(STREAM_NAME1) .withStreamNamespace(NAMESPACE) .withCursorField(List.of(CURSOR_FIELD1)) - .withCursor("a"), + .withCursor("a") + .withCursorRecordCount(1L), new DbStreamState() .withStreamName(STREAM_NAME2) .withStreamNamespace(NAMESPACE) diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java index 0224c01b6792e..fc7a755390221 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java @@ -94,6 +94,11 @@ protected int getStateEmissionFrequency() { return INTERMEDIATE_STATE_EMISSION_FREQUENCY; } + @Override + protected String getCountColumnName() { + return "RECORD_COUNT"; + } + private JsonNode buildOAuthConfig(final JsonNode config, final String jdbcUrl) { final String accessToken; final var credentials = config.get("credentials"); diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java index 60c2406032b29..c3e21bbd75ef0 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java @@ -57,12 +57,14 @@ static void init() { TABLE_NAME_WITH_SPACES = "ID AND NAME"; TABLE_NAME_WITHOUT_PK = "ID_AND_NAME_WITHOUT_PK"; TABLE_NAME_COMPOSITE_PK = "FULL_NAME_COMPOSITE_PK"; + TABLE_NAME_AND_TIMESTAMP = "NAME_AND_TIMESTAMP"; COL_ID = "ID"; COL_NAME = "NAME"; COL_UPDATED_AT = "UPDATED_AT"; COL_FIRST_NAME = "FIRST_NAME"; COL_LAST_NAME = "LAST_NAME"; COL_LAST_NAME_WITH_SPACE = "LAST NAME"; + COL_TIMESTAMP = "TIMESTAMP"; ID_VALUE_1 = new BigDecimal(1); ID_VALUE_2 = new BigDecimal(2); ID_VALUE_3 = new BigDecimal(3); @@ -165,6 +167,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); } + @Override protected List getTestMessages() { return List.of( new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) @@ -216,7 +219,8 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamName(streamName) .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) - .withCursor("5"); + .withCursor("5") + .withCursorRecordCount(1L); expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CommonField.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CommonField.java index c5b096f786d40..61bc0de2af5da 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CommonField.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CommonField.java @@ -57,4 +57,9 @@ public List> getProperties() { return properties; } + @Override + public String toString() { + return String.format("CommonField{name='%s', type=%s, properties=%s}", name, type, properties); + } + } diff --git a/docs/integrations/sources/alloydb.md b/docs/integrations/sources/alloydb.md index 289a79262ae45..fce3378b4c406 100644 --- a/docs/integrations/sources/alloydb.md +++ b/docs/integrations/sources/alloydb.md @@ -331,14 +331,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 | | 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Align with strict-encrypt version | | 0.1.0 | 2022-09-05 | [16323](https://github.com/airbytehq/airbyte/pull/16323) | Initial commit. Based on source-postgres v.1.0.7 | - - -## Changelog (Strict Encrypt) - -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------| -| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 | -| 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Initial commit. Based on source-postgres-strict-encrypt | diff --git a/docs/integrations/sources/bigquery.md b/docs/integrations/sources/bigquery.md index 75881e800c2d6..d8a6968701da3 100644 --- a/docs/integrations/sources/bigquery.md +++ b/docs/integrations/sources/bigquery.md @@ -88,6 +88,7 @@ Once you've configured BigQuery as a source, delete the Service Account Key from | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.2.2 | 2022-09-22 | [16902](https://github.com/airbytehq/airbyte/pull/16902) | Source BigQuery: added user agent header | | 0.2.1 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.2.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | diff --git a/docs/integrations/sources/clickhouse.md b/docs/integrations/sources/clickhouse.md index b9fa131fc2d5b..07df7cb267baa 100644 --- a/docs/integrations/sources/clickhouse.md +++ b/docs/integrations/sources/clickhouse.md @@ -96,6 +96,7 @@ Using this feature requires additional configuration, when creating the source. | Version | Date | Pull Request | Subject | |:---| :--- |:---------------------------------------------------------|:---------------------------------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.1.14 | 2022-09-27 | [17031](https://github.com/airbytehq/airbyte/pull/17031) | Added custom jdbc url parameters field | | 0.1.13 | 2022-09-01 | [16238](https://github.com/airbytehq/airbyte/pull/16238) | Emit state messages more frequently | | 0.1.9 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | diff --git a/docs/integrations/sources/cockroachdb.md b/docs/integrations/sources/cockroachdb.md index 1538c0221a040..b6d6f5cc0c716 100644 --- a/docs/integrations/sources/cockroachdb.md +++ b/docs/integrations/sources/cockroachdb.md @@ -95,6 +95,7 @@ Your database user should now be ready for use with Airbyte. | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- | :--- | +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.1.18 | 2022-09-01 | [16394](https://github.com/airbytehq/airbyte/pull/16394) | Added custom jdbc properties field | | 0.1.17 | 2022-09-01 | [16238](https://github.com/airbytehq/airbyte/pull/16238) | Emit state messages more frequently | | 0.1.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | diff --git a/docs/integrations/sources/db2.md b/docs/integrations/sources/db2.md index 974a57a93c78f..54207cef61e7c 100644 --- a/docs/integrations/sources/db2.md +++ b/docs/integrations/sources/db2.md @@ -60,6 +60,7 @@ You can also enter your own password for the keystore, but if you don't, the pas | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.1.16 | 2022-09-06 | [16354](https://github.com/airbytehq/airbyte/pull/16354) | Add custom JDBC params | | 0.1.15 | 2022-09-01 | [16238](https://github.com/airbytehq/airbyte/pull/16238) | Emit state messages more frequently | | 0.1.14 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 14088fafa4219..2d24d7e9c9527 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.4.20 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.4.19 | 2022-09-05 | [16002](https://github.com/airbytehq/airbyte/pull/16002) | Added ability to specify schemas for discovery during setting connector up | | 0.4.18 | 2022-09-03 | [14910](https://github.com/airbytehq/airbyte/pull/14910) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 3d84b175170b7..ee84f6747f26d 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -251,6 +251,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 1.0.4 | 2022-10-11 | [17815](https://github.com/airbytehq/airbyte/pull/17815) | Expose setting server timezone for CDC syncs | | 1.0.3 | 2022-10-07 | [17236](https://github.com/airbytehq/airbyte/pull/17236) | Fix large table issue by fetch size | | 1.0.2 | 2022-10-03 | [17170](https://github.com/airbytehq/airbyte/pull/17170) | Make initial CDC waiting time configurable | diff --git a/docs/integrations/sources/oracle.md b/docs/integrations/sources/oracle.md index 5d5233780e0b7..d7935841a078d 100644 --- a/docs/integrations/sources/oracle.md +++ b/docs/integrations/sources/oracle.md @@ -132,6 +132,7 @@ Airbyte has the ability to connect to the Oracle source with 3 network connectiv | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.3.21 | 2022-09-01 | [16238](https://github.com/airbytehq/airbyte/pull/16238) | Emit state messages more frequently | | 0.3.20 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | | 0.3.19 | 2022-08-03 | [14953](https://github.com/airbytehq/airbyte/pull/14953) | Use Service Name to connect to database | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index fd6b229a30984..4c0669ced2cec 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -398,7 +398,8 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Handle 24:00:00 value for Time column | +| 1.0.16 | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | +| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Handle 24:00:00 value for Time column | | 1.0.14 | 2022-10-03 | [17515](https://github.com/airbytehq/airbyte/pull/17515) | Fix an issue preventing connection using client certificate | | 1.0.13 | 2022-10-01 | [17459](https://github.com/airbytehq/airbyte/pull/17459) | Upgrade debezium version to 1.9.6 from 1.9.2 | | 1.0.12 | 2022-09-27 | [17299](https://github.com/airbytehq/airbyte/pull/17299) | Improve error handling for strict-encrypt postgres source | diff --git a/docs/integrations/sources/redshift.md b/docs/integrations/sources/redshift.md index 85e3f4c6b9e8e..2311a930376ac 100644 --- a/docs/integrations/sources/redshift.md +++ b/docs/integrations/sources/redshift.md @@ -54,6 +54,7 @@ All Redshift connections are encrypted using SSL | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- |:----------------------------------------------------------------------------------------------------------| +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.3.14 | 2022-09-01 | [16258](https://github.com/airbytehq/airbyte/pull/16258) | Emit state messages more frequently | | 0.3.13 | 2022-05-25 | | Added JDBC URL params | | 0.3.12 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | diff --git a/docs/integrations/sources/snowflake.md b/docs/integrations/sources/snowflake.md index b72970bdf0343..4004a08c2e077 100644 --- a/docs/integrations/sources/snowflake.md +++ b/docs/integrations/sources/snowflake.md @@ -122,6 +122,7 @@ To read more please check official [Snowflake documentation](https://docs.snowfl | Version | Date | Pull Request | Subject | |:----------| :--- | :--- | :--- | +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.1.24 | 2022-09-26 | [17144](https://github.com/airbytehq/airbyte/pull/17144) | Fixed bug with incorrect date-time datatypes handling | | 0.1.23 | 2022-09-26 | [17116](https://github.com/airbytehq/airbyte/pull/17116) | added connection string identifier | | 0.1.22 | 2022-09-21 | [16766](https://github.com/airbytehq/airbyte/pull/16766) | Update JDBC Driver version to 3.13.22 | diff --git a/docs/integrations/sources/tidb.md b/docs/integrations/sources/tidb.md index 79582e758c434..e026f886c618b 100644 --- a/docs/integrations/sources/tidb.md +++ b/docs/integrations/sources/tidb.md @@ -128,6 +128,7 @@ Now that you have set up the TiDB source connector, check out the following TiDB | Version | Date | Pull Request | Subject | | :------ | :--- | :----------- | ------- | +| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode | | 0.2.1 | 2022-09-01 | [16238](https://github.com/airbytehq/airbyte/pull/16238) | Emit state messages more frequently | | 0.2.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.1.5 | 2022-07-25 | [14996](https://github.com/airbytehq/airbyte/pull/14996) | Removed additionalProperties:false from spec |