Skip to content

Commit 0886ee0

Browse files
cgardensedgao
andauthored
Refactor state management out of BufferStrategy (#13669)
Co-authored-by: Edward Gao <[email protected]>
1 parent edb74ec commit 0886ee0

File tree

6 files changed

+73
-83
lines changed

6 files changed

+73
-83
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,11 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
8484
private boolean hasStarted;
8585
private boolean hasClosed;
8686

87-
private AirbyteMessage lastFlushedState;
87+
// represents the last state message for which all of it records have been flushed to tmp storage in
88+
// the destination.
89+
private AirbyteMessage lastFlushedToTmpDstState;
90+
// presents the last state message whose state is waiting to be flushed to tmp storage in the
91+
// destination.
8892
private AirbyteMessage pendingState;
8993

9094
public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
@@ -103,7 +107,6 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
103107
this.isValidRecord = isValidRecord;
104108
this.streamToIgnoredRecordCount = new HashMap<>();
105109
this.bufferingStrategy = bufferingStrategy;
106-
bufferingStrategy.registerFlushAllEventHook(this::flushQueueToDestination);
107110
}
108111

109112
@Override
@@ -134,7 +137,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
134137
return;
135138
}
136139

137-
bufferingStrategy.addRecord(stream, message);
140+
// if the buffer flushes, update the states appropriately.
141+
if (bufferingStrategy.addRecord(stream, message)) {
142+
markStatesAsFlushedToTmpDestination();
143+
}
144+
138145
} else if (message.getType() == Type.STATE) {
139146
pendingState = message;
140147
} else {
@@ -143,9 +150,9 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
143150

144151
}
145152

146-
private void flushQueueToDestination() {
153+
private void markStatesAsFlushedToTmpDestination() {
147154
if (pendingState != null) {
148-
lastFlushedState = pendingState;
155+
lastFlushedToTmpDstState = pendingState;
149156
pendingState = null;
150157
}
151158
}
@@ -169,13 +176,14 @@ protected void close(final boolean hasFailed) throws Exception {
169176
} else {
170177
LOGGER.info("executing on success close procedure.");
171178
bufferingStrategy.flushAll();
179+
markStatesAsFlushedToTmpDestination();
172180
}
173181
bufferingStrategy.close();
174182

175183
try {
176184
// if no state was emitted (i.e. full refresh), if there were still no failures, then we can
177185
// still succeed.
178-
if (lastFlushedState == null) {
186+
if (lastFlushedToTmpDstState == null) {
179187
onClose.accept(hasFailed);
180188
} else {
181189
// if any state message flushed that means we can still go for at least a partial success.
@@ -184,8 +192,8 @@ protected void close(final boolean hasFailed) throws Exception {
184192

185193
// if onClose succeeds without exception then we can emit the state record because it means its
186194
// records were not only flushed, but committed.
187-
if (lastFlushedState != null) {
188-
outputRecordCollector.accept(lastFlushedState);
195+
if (lastFlushedToTmpDstState != null) {
196+
outputRecordCollector.accept(lastFlushedToTmpDstState);
189197
}
190198
} catch (final Exception e) {
191199
LOGGER.error("Close failed.", e);

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.integrations.destination.record_buffer;
66

7-
import io.airbyte.commons.concurrency.VoidCallable;
87
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
98
import io.airbyte.protocol.models.AirbyteMessage;
109

@@ -22,8 +21,13 @@ public interface BufferingStrategy extends AutoCloseable {
2221

2322
/**
2423
* Add a new message to the buffer while consuming streams
24+
*
25+
* @param stream - stream associated with record
26+
* @param message - message to buffer
27+
* @return true if this record cause ALL records in the buffer to flush, otherwise false.
28+
* @throws Exception throw on failure
2529
*/
26-
void addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception;
30+
boolean addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception;
2731

2832
/**
2933
* Flush buffered messages in a writer from a particular stream
@@ -40,12 +44,4 @@ public interface BufferingStrategy extends AutoCloseable {
4044
*/
4145
void clear() throws Exception;
4246

43-
/**
44-
* When all buffers are being flushed, we can signal some parent function of this event for further
45-
* processing.
46-
*
47-
* THis install such a hook to be triggered when that happens.
48-
*/
49-
void registerFlushAllEventHook(VoidCallable onFlushAllEventHook);
50-
5147
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.integrations.destination.record_buffer;
66

7-
import io.airbyte.commons.concurrency.VoidCallable;
87
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
98
import io.airbyte.integrations.base.sentry.AirbyteSentry;
109
import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter;
@@ -39,7 +38,6 @@ public class InMemoryRecordBufferingStrategy implements BufferingStrategy {
3938
private final RecordSizeEstimator recordSizeEstimator;
4039
private final long maxQueueSizeInBytes;
4140
private long bufferSizeInBytes;
42-
private VoidCallable onFlushAllEventHook;
4341

4442
public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage> recordWriter,
4543
final long maxQueueSizeInBytes) {
@@ -55,20 +53,24 @@ public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage>
5553
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
5654
this.bufferSizeInBytes = 0;
5755
this.recordSizeEstimator = new RecordSizeEstimator();
58-
this.onFlushAllEventHook = null;
5956
}
6057

6158
@Override
62-
public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
59+
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
60+
boolean didFlush = false;
61+
6362
final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(message.getRecord());
6463
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
6564
flushAll();
65+
didFlush = true;
6666
bufferSizeInBytes = 0;
6767
}
6868

6969
final List<AirbyteRecordMessage> bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>());
7070
bufferedRecords.add(message.getRecord());
7171
bufferSizeInBytes += messageSizeInBytes;
72+
73+
return didFlush;
7274
}
7375

7476
@Override
@@ -91,22 +93,13 @@ public void flushAll() throws Exception {
9193
}, Map.of("bufferSizeInBytes", bufferSizeInBytes));
9294
close();
9395
clear();
94-
95-
if (onFlushAllEventHook != null) {
96-
onFlushAllEventHook.call();
97-
}
9896
}
9997

10098
@Override
10199
public void clear() {
102100
streamBuffer = new HashMap<>();
103101
}
104102

105-
@Override
106-
public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) {
107-
this.onFlushAllEventHook = onFlushAllEventHook;
108-
}
109-
110103
@Override
111104
public void close() throws Exception {}
112105

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.integrations.destination.record_buffer;
66

7-
import io.airbyte.commons.concurrency.VoidCallable;
87
import io.airbyte.commons.functional.CheckedBiConsumer;
98
import io.airbyte.commons.functional.CheckedBiFunction;
109
import io.airbyte.commons.string.Strings;
@@ -27,7 +26,6 @@ public class SerializedBufferingStrategy implements BufferingStrategy {
2726

2827
private final CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyteCatalog, SerializableBuffer, Exception> onCreateBuffer;
2928
private final CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> onStreamFlush;
30-
private VoidCallable onFlushAllEventHook;
3129

3230
private Map<AirbyteStreamNameNamespacePair, SerializableBuffer> allBuffers = new HashMap<>();
3331
private long totalBufferSizeInBytes;
@@ -40,16 +38,11 @@ public SerializedBufferingStrategy(final CheckedBiFunction<AirbyteStreamNameName
4038
this.catalog = catalog;
4139
this.onStreamFlush = onStreamFlush;
4240
this.totalBufferSizeInBytes = 0;
43-
this.onFlushAllEventHook = null;
4441
}
4542

4643
@Override
47-
public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) {
48-
this.onFlushAllEventHook = onFlushAllEventHook;
49-
}
50-
51-
@Override
52-
public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
44+
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
45+
boolean didFlush = false;
5346

5447
final SerializableBuffer streamBuffer = allBuffers.computeIfAbsent(stream, k -> {
5548
LOGGER.info("Starting a new buffer for stream {} (current state: {} in {} buffers)",
@@ -71,10 +64,28 @@ public void addRecord(final AirbyteStreamNameNamespacePair stream, final Airbyte
7164
if (totalBufferSizeInBytes >= streamBuffer.getMaxTotalBufferSizeInBytes()
7265
|| allBuffers.size() >= streamBuffer.getMaxConcurrentStreamsInBuffer()) {
7366
flushAll();
67+
didFlush = true;
7468
totalBufferSizeInBytes = 0;
7569
} else if (streamBuffer.getByteCount() >= streamBuffer.getMaxPerStreamBufferSizeInBytes()) {
7670
flushWriter(stream, streamBuffer);
71+
/*
72+
* Note: We intentionally do not mark didFlush as true in the branch of this conditional. Because
73+
* this branch flushes individual streams, there is no guaranteee that it will flush records in the
74+
* same order that state messages were received. The outcome here is that records get flushed but
75+
* our updating of which state messages have been flushed falls behind.
76+
*
77+
* This is not ideal from a checkpoint point of view, because it means in the case where there is a
78+
* failure, we will not be able to report that those records that were flushed and committed were
79+
* committed because there corresponding state messages weren't marked as flushed. Thus, it weakens
80+
* checkpointing, but it does not cause a correctness issue.
81+
*
82+
* In non-failure cases, using this conditional branch relies on the state messages getting flushed
83+
* by some other means. That can be caused by the previous branch in this conditional. It is
84+
* guaranteed by the fact that we always flush all state messages at the end of a sync.
85+
*/
7786
}
87+
88+
return didFlush;
7889
}
7990

8091
@Override
@@ -99,9 +110,6 @@ public void flushAll() throws Exception {
99110
clear();
100111
}, Map.of("bufferSizeInBytes", totalBufferSizeInBytes));
101112

102-
if (onFlushAllEventHook != null) {
103-
onFlushAllEventHook.call();
104-
}
105113
totalBufferSizeInBytes = 0;
106114
}
107115

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
package io.airbyte.integrations.destination.record_buffer;
66

7+
import static org.junit.jupiter.api.Assertions.assertFalse;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
79
import static org.mockito.Mockito.mock;
810
import static org.mockito.Mockito.times;
911
import static org.mockito.Mockito.verify;
1012

1113
import com.fasterxml.jackson.databind.JsonNode;
12-
import io.airbyte.commons.concurrency.VoidCallable;
1314
import io.airbyte.commons.json.Jsons;
1415
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
1516
import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter;
@@ -25,6 +26,7 @@ public class InMemoryRecordBufferingStrategyTest {
2526
// instances
2627
private static final int MAX_QUEUE_SIZE_IN_BYTES = 130;
2728

29+
@SuppressWarnings("unchecked")
2830
private final RecordWriter<AirbyteRecordMessage> recordWriter = mock(RecordWriter.class);
2931

3032
@Test
@@ -36,25 +38,19 @@ public void testBuffering() throws Exception {
3638
final AirbyteMessage message2 = generateMessage(stream2);
3739
final AirbyteMessage message3 = generateMessage(stream2);
3840
final AirbyteMessage message4 = generateMessage(stream2);
39-
final VoidCallable hook = mock(VoidCallable.class);
40-
buffering.registerFlushAllEventHook(hook);
4141

42-
buffering.addRecord(stream1, message1);
43-
buffering.addRecord(stream2, message2);
42+
assertFalse(buffering.addRecord(stream1, message1));
43+
assertFalse(buffering.addRecord(stream2, message2));
4444
// Buffer still has room
45-
verify(hook, times(0)).call();
46-
47-
buffering.addRecord(stream2, message3);
45+
assertTrue(buffering.addRecord(stream2, message3));
4846
// Buffer limit reach, flushing all messages so far before adding the new incoming one
49-
verify(hook, times(1)).call();
5047
verify(recordWriter, times(1)).accept(stream1, List.of(message1.getRecord()));
5148
verify(recordWriter, times(1)).accept(stream2, List.of(message2.getRecord()));
5249

5350
buffering.addRecord(stream2, message4);
5451

5552
// force flush to terminate test
5653
buffering.flushAll();
57-
verify(hook, times(2)).call();
5854
verify(recordWriter, times(1)).accept(stream2, List.of(message3.getRecord(), message4.getRecord()));
5955
}
6056

0 commit comments

Comments
 (0)