Skip to content

🎉 Postgres source: emit state messages more frequently for incremental sync #14903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final String tableName,
final String cursorField,
final StandardSQLTypeName cursorFieldType,
final String cursor) {
final String cursorValue) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName),
cursorField),
sourceOperations.getQueryParameter(cursorFieldType, cursor));
sourceOperations.getQueryParameter(cursorFieldType, cursorValue));
}

private AutoCloseableIterator<JsonNode> queryTableWithParams(final BigQueryDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,21 +266,26 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
final String tableName,
final String cursorField,
final Datatype cursorFieldType,
final String cursor) {
final String cursorValue) {
LOGGER.info("Queueing query for table: {}", tableName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String sql = String.format("SELECT %s FROM %s WHERE %s > ?",
final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField);
final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s > ?",
sourceOperations.enquoteIdentifierList(connection, columnNames),
sourceOperations
.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName),
sourceOperations.enquoteIdentifier(connection, cursorField));
sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName),
quotedCursorField));
// if the connector emits intermediate states, the incremental query must be sorted by the cursor
// field
if (getStateEmissionFrequency() > 0) {
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a dumb question but are we sure ORDER BY %s ASC would be compatible with all the AbstractJDBC based sources? I assume it should be!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This is basic ANSI SQL syntax, so I think all of them should follow it. But I will double check.

}

final PreparedStatement preparedStatement = connection.prepareStatement(sql);
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor);
final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
LOGGER.info("Executing query for table: {}", tableName);
return preparedStatement;
},
Expand Down Expand Up @@ -340,7 +345,7 @@ protected Map<String, String> getConnectionProperties(final JsonNode config) {
* @throws IllegalArgumentException
*/
protected static void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
final Map<String, String> defaultParameters) {
final Map<String, String> defaultParameters) {
for (final String key : defaultParameters.keySet()) {
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final MongoDatabase
final String tableName,
final String cursorField,
final BsonType cursorFieldType,
final String cursor) {
final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor));
final String cursorValue) {
final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursorValue));
return queryTable(database, columnNames, tableName, greaterComparison);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
final String tableName,
final String cursorField,
final JDBCType cursorFieldType,
final String cursor) {
final String cursorValue) {
LOGGER.info("Queueing query for table: {}", tableName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
Expand All @@ -113,7 +113,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql);

final PreparedStatement preparedStatement = connection.prepareStatement(sql);
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor);
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
LOGGER.info("Executing query for table: {}", tableName);
return preparedStatement;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;

static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
Expand All @@ -84,7 +86,6 @@ public static Source sshWrappedSource() {
}

PostgresSource() {

super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations());
this.featureFlags = new EnvVariableFeatureFlags();
}
Expand Down Expand Up @@ -408,6 +409,10 @@ protected AirbyteStateType getSupportedStateType(final JsonNode config) {
return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

protected int getStateEmissionFrequency() {
return INTERMEDIATE_STATE_EMISSION_FREQUENCY;
}

public static void main(final String[] args) throws Exception {
final Source source = PostgresSource.sshWrappedSource();
LOGGER.info("starting source: {}", PostgresSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,16 @@ protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Databas
airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
}

final JsonSchemaPrimitive cursorType = IncrementalUtils
.getCursorType(airbyteStream, cursorField);
final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, cursorField);

iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator(
autoCloseableIterator,
stateManager,
pair,
cursorField,
cursorOptional.orElse(null),
cursorType),
cursorType,
getStateEmissionFrequency()),
airbyteMessageIterator);
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
Expand Down Expand Up @@ -491,16 +491,11 @@ public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Data
final String tableName);

/**
* Read incremental data from a table. Incremental read should returns only records where cursor
* column value is bigger than cursor.
* Read incremental data from a table. Incremental read should return only records where cursor
* column value is bigger than cursor. Note that if the connector needs to emit intermediate state
* (i.e. {@link AbstractDbSource#getStateEmissionFrequency} > 0), the incremental query must be
* sorted by the cursor field.
*
* @param database source database
* @param columnNames interested column names
* @param schemaName table namespace
* @param tableName target table
* @param cursorField cursor field name
* @param cursorFieldType cursor field type
* @param cursor cursor value
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
Expand All @@ -509,7 +504,16 @@ public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database d
String tableName,
String cursorField,
DataType cursorFieldType,
String cursor);
String cursorValue);

/**
* When larger than 0, the incremental iterator will emit intermediate state for every N records.
* Please note that if intermediate state emission is enabled, the incremental query must be ordered
* by the cursor field.
*/
protected int getStateEmissionFrequency() {
return 0;
}

private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
final Database database = createDatabase(sourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,47 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
private final AirbyteStreamNameNamespacePair pair;
private final String cursorField;
private final JsonSchemaPrimitive cursorType;
private final int stateEmissionFrequency;

private String maxCursor;
private boolean hasEmittedState;
private AirbyteMessage intermediateStateMessage;
private boolean hasEmittedFinalState;
private int recordCount;

/**
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
* stateEmissionFrequency records. Only emit intermediate states if the records are sorted by
* the cursor field.
*/
public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
final StateManager stateManager,
final AirbyteStreamNameNamespacePair pair,
final String cursorField,
final String initialCursor,
final JsonSchemaPrimitive cursorType) {
final JsonSchemaPrimitive cursorType,
final int stateEmissionFrequency) {
this.messageIterator = messageIterator;
this.stateManager = stateManager;
this.pair = pair;
this.cursorField = cursorField;
this.cursorType = cursorType;
this.maxCursor = initialCursor;
this.stateEmissionFrequency = stateEmissionFrequency;
}

private String getCursorCandidate(final AirbyteMessage message) {
String cursorCandidate = message.getRecord().getData().get(cursorField).asText();
final String cursorCandidate = message.getRecord().getData().get(cursorField).asText();
return (cursorCandidate != null ? cursorCandidate.replaceAll("\u0000", "") : null);
}

@Override
protected AirbyteMessage computeNext() {
if (messageIterator.hasNext()) {
if (intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
return message;
} else if (messageIterator.hasNext()) {
recordCount++;
final AirbyteMessage message = messageIterator.next();
if (message.getRecord().getData().hasNonNull(cursorField)) {
final String cursorCandidate = getCursorCandidate(message);
Expand All @@ -59,24 +74,38 @@ protected AirbyteMessage computeNext() {
}
}

return message;
} else if (!hasEmittedState) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor);
LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor {}, cursor field: {}, new cursor: {}",
pair,
stateManager.getOriginalCursorField(pair).orElse(null),
stateManager.getOriginalCursor(pair).orElse(null),
stateManager.getCursorField(pair).orElse(null),
stateManager.getCursor(pair).orElse(null));
if (stateManager.getCursor(pair).isEmpty()) {
LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair);
if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) {
// Mark the state as final in case this intermediate state happens to be the last one.
// This is not necessary, but avoid sending the final states twice and prevent any edge case.
final boolean isFinalState = !messageIterator.hasNext();
intermediateStateMessage = emitStateMessage(isFinalState);
}

hasEmittedState = true;
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
return message;
} else if (!hasEmittedFinalState) {
return emitStateMessage(true);
} else {
return endOfData();
}
}

public AirbyteMessage emitStateMessage(final boolean isFinalState) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor);
LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}",
pair,
stateManager.getOriginalCursorField(pair).orElse(null),
stateManager.getOriginalCursor(pair).orElse(null),
stateManager.getCursorField(pair).orElse(null),
stateManager.getCursor(pair).orElse(null));

if (isFinalState) {
hasEmittedFinalState = true;
if (stateManager.getCursor(pair).isEmpty()) {
LOGGER.warn("Cursor was for stream {} was null. This stream will replicate all records on the next run", pair);
}
}

return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}

}
Loading