Skip to content

🎉 JDBC sources: emit cursor counts #15535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
32cb097
Add cursor_record_count to db stream state
tuliren Aug 9, 2022
ad10bf7
Add cursor record count to cursor info
tuliren Aug 9, 2022
340ba3e
Emit max cursor record count
tuliren Aug 9, 2022
cd497bb
Add original cursor record count
tuliren Aug 11, 2022
c80eaee
Unify logging format
tuliren Aug 11, 2022
8c95bcf
Add backward compatible methods
tuliren Aug 12, 2022
8db6c21
Merge branch 'master' into liren/emit-record-count
tuliren Sep 6, 2022
2ff5bf1
Update unit tests for state decorating iterator
tuliren Aug 12, 2022
2c5be9c
Update test (not done yet)
tuliren Oct 10, 2022
23b4953
Merge branch 'master' of github.com:airbytehq/airbyte into liren/emit…
tuliren Oct 10, 2022
e17adeb
Fix one more unit test
tuliren Oct 10, 2022
04df4fd
Change where clause operator according to record count
tuliren Oct 10, 2022
7b2f949
Add branch for null cursor
tuliren Oct 10, 2022
27327cd
Skip saving record count when it is 0
tuliren Oct 10, 2022
9537714
Fix log wording
tuliren Oct 10, 2022
bb936a2
Set mock record count in test
tuliren Oct 10, 2022
3248cf5
Check cursor value instead of cursor info
tuliren Oct 10, 2022
6d80323
Fix source jdbc test
tuliren Oct 10, 2022
1b27a96
Read record count from state
tuliren Oct 10, 2022
7bcf668
Fix tests
tuliren Oct 10, 2022
28ed907
Add an acceptance test case
tuliren Oct 11, 2022
57c01e6
Merge branch 'master' into liren/emit-record-count
tuliren Oct 11, 2022
ab22b55
Fix npe
tuliren Oct 11, 2022
7ec2a7c
Change record count from int to long to avoid type conversion
tuliren Oct 11, 2022
34676c8
Fix references
tuliren Oct 11, 2022
2a5bbd4
Fix oracle container
tuliren Oct 11, 2022
fdfd6e6
Use uppercase for snowflake
tuliren Oct 11, 2022
3665f2c
Use uppercase for db2
tuliren Oct 11, 2022
8b0952f
Fix and use uppercase
tuliren Oct 11, 2022
f6086cc
Update test case to include the edge case
tuliren Oct 11, 2022
0a6ac5e
Merge branch 'master' of github.com:airbytehq/airbyte into liren/emit…
tuliren Oct 13, 2022
b8ade77
Format code
tuliren Oct 13, 2022
446909d
Remove extra assertion in clickhouse
tuliren Oct 13, 2022
59b308f
Merge ms sql incremental query method
tuliren Oct 13, 2022
1d88be9
Log query for debugging
tuliren Oct 13, 2022
cbd2221
Clean up name_and_timestamp table
tuliren Oct 13, 2022
e4e230c
Fix db2 tests
tuliren Oct 13, 2022
fe04161
Fix mssql tests
tuliren Oct 13, 2022
43a7173
Fix oracle tests
tuliren Oct 13, 2022
645a272
Fix oracle tests
tuliren Oct 13, 2022
9d84d2d
Fix cockroachdb tests
tuliren Oct 13, 2022
b9e592d
Fix snowflake tests
tuliren Oct 13, 2022
c44c18a
Add changelog
tuliren Oct 13, 2022
b05c68f
Fix mssql tests
tuliren Oct 14, 2022
bd1fe12
Fix db2-strict-encrypt tests
tuliren Oct 14, 2022
4811679
Fix oracle-strict-encrypt tests
tuliren Oct 14, 2022
2da3eae
Bump postgres version
tuliren Oct 14, 2022
4764b0c
Fix oracle-strict-encrypt tests
tuliren Oct 14, 2022
750e002
auto-bump connector version [ci skip]
octavia-squidington-iii Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
Expand Down Expand Up @@ -136,14 +137,13 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final List<String> columnNames,
final String schemaName,
final String tableName,
final String cursorField,
final StandardSQLTypeName cursorFieldType,
final String cursorValue) {
final CursorInfo cursorInfo,
final StandardSQLTypeName cursorFieldType) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName),
cursorField),
sourceOperations.getQueryParameter(cursorFieldType, cursorValue));
cursorInfo.getCursorField()),
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
config.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText()));

boolean isAdditionalParamsExists =
final boolean isAdditionalParamsExists =
config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty();
List<String> params = new ArrayList<>();
final List<String> params = new ArrayList<>();
// assume ssl if not explicitly mentioned.
if (isSsl) {
params.add(SSL_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ protected int getStateEmissionFrequency() {
return INTERMEDIATE_STATE_EMISSION_FREQUENCY;
}

@Override
protected String getCountColumnName() {
return "RECORD_COUNT";
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges() {
return connection -> connection.prepareStatement(
"SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,9 @@ public AbstractJdbcSource<JDBCType> getJdbcSource() {
return new Db2Source();
}

@Override
protected String getConcurrentTestTableName() {
return "NAME_AND_TIMESTAMP";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
Expand All @@ -42,6 +41,7 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.CommonField;
Expand All @@ -50,6 +50,7 @@
import io.airbyte.protocol.models.JsonSchemaType;
import java.net.MalformedURLException;
import java.net.URI;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -329,28 +330,44 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
final List<String> columnNames,
final String schemaName,
final String tableName,
final String cursorField,
final Datatype cursorFieldType,
final String cursorValue) {
final CursorInfo cursorInfo,
final Datatype cursorFieldType) {
LOGGER.info("Queueing query for table: {}", tableName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField);
final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s > ?",
final String fullTableName = sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName);
final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorInfo.getCursorField());

final String operator;
if (cursorInfo.getCursorRecordCount() <= 0L) {
operator = ">";
} else {
final long actualRecordCount = getActualCursorRecordCount(
connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor());
LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, cursorInfo.getCursorRecordCount(), actualRecordCount);
if (actualRecordCount == cursorInfo.getCursorRecordCount()) {
operator = ">";
} else {
operator = ">=";
}
}

final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s %s ?",
sourceOperations.enquoteIdentifierList(connection, columnNames),
sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName),
quotedCursorField));
fullTableName,
quotedCursorField,
operator));
// if the connector emits intermediate states, the incremental query must be sorted by the cursor
// field
if (getStateEmissionFrequency() > 0) {
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField));
}

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
LOGGER.info("Executing query for table: {}", tableName);
return preparedStatement;
},
Expand All @@ -362,6 +379,39 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
});
}

protected String getCountColumnName() {
return "record_count";
}

private long getActualCursorRecordCount(final Connection connection,
final String fullTableName,
final String quotedCursorField,
final Datatype cursorFieldType,
final String cursor) throws SQLException {
final String columnName = getCountColumnName();
final PreparedStatement cursorRecordStatement;
if (cursor == null) {
final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL",
columnName,
fullTableName,
quotedCursorField);
cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);
} else {
final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s = ?",
columnName,
fullTableName,
quotedCursorField);
cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);;
sourceOperations.setStatementField(cursorRecordStatement, 1, cursorFieldType, cursor);
}
final ResultSet resultSet = cursorRecordStatement.executeQuery();
if (resultSet.next()) {
return resultSet.getLong(columnName);
} else {
return 0L;
}
}

protected DataSource createDataSource(final JsonNode config) {
final JsonNode jdbcConfig = toDatabaseConfig(config);
final DataSource dataSource = DataSourceFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("5");
.withCursor("5")
.withCursorRecordCount(1L);
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
return expectedMessages;
}
Expand Down Expand Up @@ -763,7 +764,8 @@ void testReadMultipleTablesIncrementally() throws Exception {
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("3"),
.withCursor("3")
.withCursorRecordCount(1L),
new DbStreamState()
.withStreamName(streamName2)
.withStreamNamespace(namespace)
Expand All @@ -775,12 +777,14 @@ void testReadMultipleTablesIncrementally() throws Exception {
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("3"),
.withCursor("3")
.withCursorRecordCount(1L),
new DbStreamState()
.withStreamName(streamName2)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("3"));
.withCursor("3")
.withCursorRecordCount(1L));

final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1));
Expand All @@ -789,6 +793,7 @@ void testReadMultipleTablesIncrementally() throws Exception {

setEmittedAtToNull(actualMessagesFirstSync);

assertEquals(expectedMessagesFirstSync, actualMessagesFirstSync);
assertEquals(expectedMessagesFirstSync.size(), actualMessagesFirstSync.size());
assertTrue(expectedMessagesFirstSync.containsAll(actualMessagesFirstSync));
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
Expand Down Expand Up @@ -818,6 +823,112 @@ protected void incrementalCursorCheck(
expectedRecordMessages);
}

protected String getConcurrentTestTableName() {
return "name_and_timestamp";
}

// See https://github.com/airbytehq/airbyte/issues/14732 for rationale and details.
@Test
void testIncrementalWithConcurrentInsertion() throws Exception {
final String namespace = getDefaultNamespace();
final String tableName = getConcurrentTestTableName();
final String fullyQualifiedTableName = getFullyQualifiedTableName(tableName);

// 1st sync
database.execute(ctx -> {
ctx.createStatement().execute(createTableQuery(fullyQualifiedTableName, "name VARCHAR(200) NOT NULL, timestamp TIMESTAMP NOT NULL", ""));
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('a', '2021-01-01 00:00:00')", fullyQualifiedTableName));
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('b', '2021-01-01 00:00:00')", fullyQualifiedTableName));
});

final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
tableName,
namespace,
Field.of("name", JsonSchemaType.STRING),
Field.of("timestamp", JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)))));
configuredCatalog.getStreams().forEach(airbyteStream -> {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(List.of("timestamp"));
airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
});

final List<AirbyteMessage> firstSyncActualMessages = MoreIterators.toList(
source.read(config, configuredCatalog, createEmptyState(tableName, namespace)));

// cursor after 1st sync: 2021-01-01 00:00:00, count 2
final Optional<AirbyteMessage> firstSyncStateOptional = firstSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(firstSyncStateOptional.isPresent());
final JsonNode firstSyncState = getStateData(firstSyncStateOptional.get(), tableName);
assertTrue(firstSyncState.get("cursor").asText().contains("2021-01-01"));
assertTrue(firstSyncState.get("cursor").asText().contains("00:00:00"));
assertEquals(2L, firstSyncState.get("cursor_record_count").asLong());

final List<String> firstSyncNames = firstSyncActualMessages.stream()
.filter(r -> r.getType() == Type.RECORD)
.map(r -> r.getRecord().getData().get("name").asText())
.toList();
assertEquals(List.of("a", "b"), firstSyncNames);

// 2nd sync
database.execute(ctx -> {
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('c', '2021-01-02 00:00:00')", fullyQualifiedTableName));
});

final List<AirbyteMessage> secondSyncActualMessages = MoreIterators.toList(
source.read(config, configuredCatalog, createState(tableName, namespace, firstSyncState)));

// cursor after 2nd sync: 2021-01-02 00:00:00, count 1
final Optional<AirbyteMessage> secondSyncStateOptional = secondSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(secondSyncStateOptional.isPresent());
final JsonNode secondSyncState = getStateData(secondSyncStateOptional.get(), tableName);
assertTrue(secondSyncState.get("cursor").asText().contains("2021-01-02"));
assertTrue(secondSyncState.get("cursor").asText().contains("00:00:00"));
assertEquals(1L, secondSyncState.get("cursor_record_count").asLong());

final List<String> secondSyncNames = secondSyncActualMessages.stream()
.filter(r -> r.getType() == Type.RECORD)
.map(r -> r.getRecord().getData().get("name").asText())
.toList();
assertEquals(List.of("c"), secondSyncNames);

// 3rd sync has records with duplicated cursors
database.execute(ctx -> {
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('d', '2021-01-02 00:00:00')", fullyQualifiedTableName));
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('e', '2021-01-02 00:00:00')", fullyQualifiedTableName));
ctx.createStatement().execute(String.format("INSERT INTO %s (name, timestamp) VALUES ('f', '2021-01-03 00:00:00')", fullyQualifiedTableName));
});

final List<AirbyteMessage> thirdSyncActualMessages = MoreIterators.toList(
source.read(config, configuredCatalog, createState(tableName, namespace, secondSyncState)));

// Cursor after 3rd sync is: 2021-01-03 00:00:00, count 1.
final Optional<AirbyteMessage> thirdSyncStateOptional = thirdSyncActualMessages.stream().filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(thirdSyncStateOptional.isPresent());
final JsonNode thirdSyncState = getStateData(thirdSyncStateOptional.get(), tableName);
assertTrue(thirdSyncState.get("cursor").asText().contains("2021-01-03"));
assertTrue(thirdSyncState.get("cursor").asText().contains("00:00:00"));
assertEquals(1L, thirdSyncState.get("cursor_record_count").asLong());

// The c, d, e, f are duplicated records from this sync, because the cursor
// record count in the database is different from that in the state.
final List<String> thirdSyncExpectedNames = thirdSyncActualMessages.stream()
.filter(r -> r.getType() == Type.RECORD)
.map(r -> r.getRecord().getData().get("name").asText())
.toList();
assertEquals(List.of("c", "d", "e", "f"), thirdSyncExpectedNames);
}

private JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) {
for (final JsonNode stream : airbyteMessage.getState().getData().get("streams")) {
if (stream.get("stream_name").asText().equals(streamName)) {
return stream;
}
}
throw new IllegalArgumentException("Stream not found in state message: " + streamName);
}

private void incrementalCursorCheck(
final String initialCursorField,
final String cursorField,
Expand Down Expand Up @@ -849,7 +960,8 @@ private void incrementalCursorCheck(
.withStreamName(airbyteStream.getStream().getName())
.withStreamNamespace(airbyteStream.getStream().getNamespace())
.withCursorField(List.of(initialCursorField))
.withCursor(initialCursorValue);
.withCursor(initialCursorValue)
.withCursorRecordCount(1L);

final List<AirbyteMessage> actualMessages = MoreIterators
.toList(source.read(config, configuredCatalog, Jsons.jsonNode(createState(List.of(dbStreamState)))));
Expand All @@ -861,7 +973,8 @@ private void incrementalCursorCheck(
.withStreamName(airbyteStream.getStream().getName())
.withStreamNamespace(airbyteStream.getStream().getNamespace())
.withCursorField(List.of(cursorField))
.withCursor(endCursorValue));
.withCursor(endCursorValue)
.withCursorRecordCount(1L));
final List<AirbyteMessage> expectedMessages = new ArrayList<>(expectedRecordMessages);
expectedMessages.addAll(createExpectedTestMessages(expectedStreams));

Expand Down Expand Up @@ -1082,6 +1195,27 @@ protected JsonNode createEmptyState(final String streamName, final String stream
}
}

protected JsonNode createState(final String streamName, final String streamNamespace, final JsonNode stateData) {
if (supportsPerStream()) {
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(streamName).withNamespace(streamNamespace))
.withStreamState(stateData));
return Jsons.jsonNode(List.of(airbyteStateMessage));
} else {
final DbState dbState = new DbState().withStreams(List.of(
new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(streamNamespace)
.withCursor(stateData.get("cursor").asText())
.withCursorField(List.of(stateData.get("cursor_field").asText()))
.withCursorRecordCount(stateData.get("cursor_record_count").asLong())));
return Jsons.jsonNode(dbState);
}
}

/**
* Extracts the state component from the provided {@link AirbyteMessage} based on the value returned
* by {@link #supportsPerStream()}.
Expand Down
Loading