Skip to content

Commit 3ce8aed

Browse files
tulirenrobbinhan
authored andcommitted
Add comments about intermediate state emission (airbytehq#16262)
* Add comments about intermediate state emission * Adjust wording * Format code
1 parent 9a7f3b5 commit 3ce8aed

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java

+24-7
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,30 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
2727
private final AirbyteStreamNameNamespacePair pair;
2828
private final String cursorField;
2929
private final JsonSchemaPrimitive cursorType;
30-
private final int stateEmissionFrequency;
3130

3231
private final String initialCursor;
3332
private String maxCursor;
3433
private boolean hasEmittedFinalState;
3534

36-
// The intermediateStateMessage is set to the latest state message.
37-
// For every stateEmissionFrequency messages, emitIntermediateState is set to true and
38-
// the latest intermediateStateMessage will be emitted.
35+
/**
36+
* These parameters are for intermediate state message emission. We can emit an intermediate state
37+
* when the following two conditions are met.
38+
* <p/>
39+
* 1. The records are sorted by the cursor field. This is true when {@code stateEmissionFrequency} >
40+
* 0. This logic is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in which an
41+
* "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency} > 0.
42+
* <p/>
43+
* 2. There is a cursor value that is ready for emission. A cursor value is "ready" if there is no
44+
* more record with the same value. We cannot emit a cursor at will, because there may be multiple
45+
* records with the same cursor value. If we emit a cursor ignoring this condition, should the sync
46+
* fail right after the emission, the next sync may skip some records with the same cursor value due
47+
* to "WHERE cursor_field > cursor" in {@code AbstractJdbcSource#queryTableIncremental}.
48+
* <p/>
49+
* The {@code intermediateStateMessage} is set to the latest state message that is ready for
50+
* emission. For every {@code stateEmissionFrequency} messages, {@code emitIntermediateState} is set
51+
* to true and the latest "ready" state will be emitted in the next {@code computeNext} call.
52+
*/
53+
private final int stateEmissionFrequency;
3954
private int totalRecordCount = 0;
4055
private boolean emitIntermediateState = false;
4156
private AirbyteMessage intermediateStateMessage = null;
@@ -47,9 +62,11 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
4762
* @param cursorField Path to the comparator field used to track the records read so far
4863
* @param initialCursor name of the initial cursor column
4964
* @param cursorType ENUM type of primitive values that can be used as a cursor for checkpointing
50-
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
51-
* stateEmissionFrequency records. Only emit intermediate states if the records are sorted by
52-
* the cursor field.
65+
* @param stateEmissionFrequency If larger than 0, the records are sorted by the cursor field, and
66+
* intermediate states will be emitted for every {@code stateEmissionFrequency} records. The
67+
* order of the records is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in
68+
* which an "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency}
69+
* > 0.
5370
*/
5471
public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
5572
final StateManager stateManager,

airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static org.junit.jupiter.api.Assertions.assertEquals;
88
import static org.junit.jupiter.api.Assertions.assertFalse;
99
import static org.junit.jupiter.api.Assertions.assertNull;
10-
import static org.junit.jupiter.api.Assertions.assertThrows;
1110
import static org.mockito.Mockito.mock;
1211
import static org.mockito.Mockito.when;
1312

@@ -75,6 +74,7 @@ private static AirbyteMessage createStateMessage(final String recordValue) {
7574

7675
private Iterator<AirbyteMessage> createExceptionIterator() {
7776
return new Iterator<AirbyteMessage>() {
77+
7878
final Iterator<AirbyteMessage> internalMessageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2,
7979
RECORD_MESSAGE_2, RECORD_MESSAGE_3);
8080

@@ -88,7 +88,8 @@ public AirbyteMessage next() {
8888
if (internalMessageIterator.hasNext()) {
8989
return internalMessageIterator.next();
9090
} else {
91-
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a SQLException is thrown and wrapped in
91+
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a
92+
// SQLException is thrown and wrapped in
9293
// StreamingJdbcDatabase#tryAdvance
9394
throw new RuntimeException(new SQLException("Connection marked broken because of SQLSTATE(080006)", "08006"));
9495
}
@@ -186,10 +187,12 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() {
186187
1);
187188
assertEquals(RECORD_MESSAGE_1, iterator.next());
188189
assertEquals(RECORD_MESSAGE_2, iterator.next());
189-
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the condition of "ready"
190+
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the
191+
// condition of "ready"
190192
assertEquals(RECORD_MESSAGE_2, iterator.next());
191193
assertEquals(RECORD_MESSAGE_3, iterator.next());
192-
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the frequency minimum of 1 record
194+
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the
195+
// frequency minimum of 1 record
193196
assertEquals(STATE_MESSAGE_2, iterator.next());
194197
// no further records to read since Exception was caught above and marked iterator as endOfData()
195198
assertFalse(iterator.hasNext());
@@ -210,8 +213,10 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyZero() {
210213
assertEquals(RECORD_MESSAGE_2, iterator.next());
211214
assertEquals(RECORD_MESSAGE_2, iterator.next());
212215
assertEquals(RECORD_MESSAGE_3, iterator.next());
213-
// since stateEmission is not set to emit frequently, this will catch the error but not emit state message since it wasn't in a ready state
214-
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by marking iterator as endOfData()
216+
// since stateEmission is not set to emit frequently, this will catch the error but not emit state
217+
// message since it wasn't in a ready state
218+
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by
219+
// marking iterator as endOfData()
215220
assertFalse(iterator.hasNext());
216221
}
217222

0 commit comments

Comments
 (0)