Skip to content

🐞 Postgres source: fix bug in intermediate state emission #15496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.0"
- dockerImage: "airbyte/source-postgres:1.0.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Iterator;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,10 +28,16 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
private final JsonSchemaPrimitive cursorType;
private final int stateEmissionFrequency;

private final String initialCursor;
private String maxCursor;
private AirbyteMessage intermediateStateMessage;
private boolean hasEmittedFinalState;
private int recordCount;

// The intermediateStateMessage is set to the latest state message.
// For every stateEmissionFrequency messages, emitIntermediateState is set to true and
// the latest intermediateStateMessage will be emitted.
private int totalRecordCount = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're setting the intermediateStateMessage = null here over the previous code where it was originally intermediateStateMessage? Is it mainly to be explicit with the default value?

Copy link
Contributor Author

@tuliren tuliren Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I just want to make it explicit because these values are set and reset repetitively.


/**
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
Expand All @@ -49,6 +56,7 @@ public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
this.pair = pair;
this.cursorField = cursorField;
this.cursorType = cursorType;
this.initialCursor = initialCursor;
this.maxCursor = initialCursor;
this.stateEmissionFrequency = stateEmissionFrequency;
}
Expand All @@ -60,36 +68,41 @@ private String getCursorCandidate(final AirbyteMessage message) {

@Override
protected AirbyteMessage computeNext() {
if (intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
return message;
} else if (messageIterator.hasNext()) {
recordCount++;
if (messageIterator.hasNext()) {
if (emitIntermediateState && intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
emitIntermediateState = false;
return message;
}

totalRecordCount++;
final AirbyteMessage message = messageIterator.next();
if (message.getRecord().getData().hasNonNull(cursorField)) {
final String cursorCandidate = getCursorCandidate(message);
if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) {
if (stateEmissionFrequency > 0 && !Objects.equals(maxCursor, initialCursor) && messageIterator.hasNext()) {
// Only emit an intermediate state when it is not the first or last record message,
// because the last state message will be taken care of in a different branch.
intermediateStateMessage = createStateMessage(false);
}
maxCursor = cursorCandidate;
}
}

if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) {
// Mark the state as final in case this intermediate state happens to be the last one.
// This is not necessary, but avoid sending the final states twice and prevent any edge case.
final boolean isFinalState = !messageIterator.hasNext();
intermediateStateMessage = emitStateMessage(isFinalState);
if (stateEmissionFrequency > 0 && totalRecordCount % stateEmissionFrequency == 0) {
emitIntermediateState = true;
}

return message;
} else if (!hasEmittedFinalState) {
return emitStateMessage(true);
return createStateMessage(true);
} else {
return endOfData();
}
}

public AirbyteMessage emitStateMessage(final boolean isFinalState) {
public AirbyteMessage createStateMessage(final boolean isFinalState) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor);
LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}",
pair,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ class StateDecoratingIteratorTest {
private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2);
private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2);

private static final String RECORD_VALUE_3 = "xyz";
private static final String RECORD_VALUE_3 = "ghi";
private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3);
private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3);

private static final String RECORD_VALUE_4 = "jkl";
private static final AirbyteMessage RECORD_MESSAGE_4 = createRecordMessage(RECORD_VALUE_4);
private static final AirbyteMessage STATE_MESSAGE_4 = createStateMessage(RECORD_VALUE_4);

private static final String RECORD_VALUE_5 = "xyz";
private static final AirbyteMessage RECORD_MESSAGE_5 = createRecordMessage(RECORD_VALUE_5);
private static final AirbyteMessage STATE_MESSAGE_5 = createStateMessage(RECORD_VALUE_5);

private static AirbyteMessage createRecordMessage(final String recordValue) {
return new AirbyteMessage()
.withType(Type.RECORD)
Expand All @@ -73,6 +81,8 @@ void setup() {
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4)).thenReturn(STATE_MESSAGE_4.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5)).thenReturn(STATE_MESSAGE_5.getState());

when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty());
when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty());
Expand Down Expand Up @@ -106,13 +116,13 @@ void testWithInitialCursor() {
stateManager,
NAME_NAMESPACE_PAIR,
UUID_FIELD_NAME,
RECORD_VALUE_3,
RECORD_VALUE_5,
JsonSchemaPrimitive.STRING,
0);

assertEquals(RECORD_MESSAGE_1, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(STATE_MESSAGE_3, iterator.next());
assertEquals(STATE_MESSAGE_5, iterator.next());
assertFalse(iterator.hasNext());
}

Expand Down Expand Up @@ -179,8 +189,8 @@ void testUnicodeNull() {

@Test
@DisplayName("When initial cursor is null, and emit state for every record")
void testStateEmission1() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionFrequency1() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -191,19 +201,27 @@ void testStateEmission1() {
1);

assertEquals(RECORD_MESSAGE_1, iterator1.next());
assertEquals(STATE_MESSAGE_1, iterator1.next());
// should emit state 1, but it is unclear whether there will be more
// records with the same cursor value, so no state is ready for emission
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
// emit state 1 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
// final state message should only be emitted once
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
// state 4 is not emitted because there is no more record and only
// the final state should be emitted at this point; also the final
// state should only be emitted once
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

@Test
@DisplayName("When initial cursor is null, and emit state for every 2 records")
void testStateEmission2() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionFrequency2() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -215,16 +233,74 @@ void testStateEmission2() {

assertEquals(RECORD_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
// emit state 1 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
// emit state 3 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

@Test
@DisplayName("When initial cursor is not null")
void testStateEmission3() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionWhenInitialCursorIsNotNull() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
NAME_NAMESPACE_PAIR,
UUID_FIELD_NAME,
RECORD_VALUE_1,
JsonSchemaPrimitive.STRING,
1);

assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

/**
* Incremental syncs will sort the table with the cursor field, and emit the max cursor for every N
* records. The purpose is to emit the states frequently, so that if any transient failure occurs
* during a long sync, the next run does not need to start from the beginning, but can resume from
* the last successful intermediate state committed on the destination. The next run will start with
* `cursorField > cursor`. However, it is possible that there are multiple records with the same
* cursor value. If the intermediate state is emitted before all these records have been synced to
* the destination, some of these records may be lost.
* <p/>
* Here is an example:
*
* <pre>
* | Record ID | Cursor Field | Other Field | Note |
* | --------- | ------------ | ----------- | ----------------------------- |
* | 1 | F1=16 | F2="abc" | |
* | 2 | F1=16 | F2="def" | <- state emission and failure |
* | 3 | F1=16 | F2="ghi" | |
* </pre>
*
* If the intermediate state is emitted for record 2 and the sync fails immediately such that the
* cursor value `16` is committed, but only record 1 and 2 are actually synced, the next run will
* start with `F1 > 16` and skip record 3.
* <p/>
* So intermediate state emission should only happen when all records with the same cursor value has
* been synced to destination. Reference: https://github.com/airbytehq/airbyte/issues/15427
*/
@Test
@DisplayName("When there are multiple records with the same cursor value")
void testStateEmissionForRecordsSharingSameCursorValue() {
messageIterator = MoreIterators.of(
RECORD_MESSAGE_2, RECORD_MESSAGE_2,
RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3,
RECORD_MESSAGE_4,
RECORD_MESSAGE_5, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -235,9 +311,19 @@ void testStateEmission3() {
1);

assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
// state 2 is the latest state ready for emission because
// all records with the same cursor value have been emitted
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_4, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

Expand Down
6 changes: 4 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,20 @@ Possible solutions include:

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- |:----------------------------------------------------------------------------------------------------------------|
| 1.0.1 | 2022-08-10 | [15496](https://github.com/airbytehq/airbyte/pull/15496) | Fix state emission in incremental sync |
| | 2022-08-10 | [15481](https://github.com/airbytehq/airbyte/pull/15481) | Fix data handling from WAL logs in CDC mode |
| 1.0.0 | 2022-08-05 | [15380](https://github.com/airbytehq/airbyte/pull/15380) | Change connector label to generally_available |
| 0.4.44 | 2022-08-05 | [15342](https://github.com/airbytehq/airbyte/pull/15342) | Adjust titles and descriptions in spec.json |
| 0.4.43 | 2022-08-03 | [15226](https://github.com/airbytehq/airbyte/pull/15226) | Make connectionTimeoutMs configurable through JDBC url parameters |
| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time |
| 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead) |
| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps |
| | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers |
| 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |
| 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
| 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |
Expand Down