Skip to content

Refactor state management out of BufferStrategy #13669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private boolean hasStarted;
private boolean hasClosed;

private AirbyteMessage lastFlushedState;
// represents the last state message for which all of it records have been flushed to tmp storage in
// the destination.
private AirbyteMessage lastFlushedToTmpDstState;
// presents the last state message whose state is waitint to be flushed to tmp storage in the
// destination.
private AirbyteMessage pendingState;

public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
Expand All @@ -103,7 +107,6 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.isValidRecord = isValidRecord;
this.streamToIgnoredRecordCount = new HashMap<>();
this.bufferingStrategy = bufferingStrategy;
bufferingStrategy.registerFlushAllEventHook(this::flushQueueToDestination);
}

@Override
Expand Down Expand Up @@ -134,7 +137,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
return;
}

bufferingStrategy.addRecord(stream, message);
// if the buffer flushes, update the states appropriately.
if (bufferingStrategy.addRecord(stream, message)) {
markStatesAsFlushesToTmpDestination();
}

} else if (message.getType() == Type.STATE) {
pendingState = message;
} else {
Expand All @@ -143,9 +150,9 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {

}

private void flushQueueToDestination() {
private void markStatesAsFlushesToTmpDestination() {
if (pendingState != null) {
lastFlushedState = pendingState;
lastFlushedToTmpDstState = pendingState;
pendingState = null;
}
}
Expand All @@ -169,13 +176,14 @@ protected void close(final boolean hasFailed) throws Exception {
} else {
LOGGER.info("executing on success close procedure.");
bufferingStrategy.flushAll();
markStatesAsFlushesToTmpDestination();
}
bufferingStrategy.close();

try {
// if no state was emitted (i.e. full refresh), if there were still no failures, then we can
// still succeed.
if (lastFlushedState == null) {
if (lastFlushedToTmpDstState == null) {
onClose.accept(hasFailed);
} else {
// if any state message flushed that means we can still go for at least a partial success.
Expand All @@ -184,8 +192,8 @@ protected void close(final boolean hasFailed) throws Exception {

// if onClose succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
if (lastFlushedState != null) {
outputRecordCollector.accept(lastFlushedState);
if (lastFlushedToTmpDstState != null) {
outputRecordCollector.accept(lastFlushedToTmpDstState);
}
} catch (final Exception e) {
LOGGER.error("Close failed.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.record_buffer;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteMessage;

Expand All @@ -22,8 +21,13 @@ public interface BufferingStrategy extends AutoCloseable {

/**
* Add a new message to the buffer while consuming streams
*
* @param stream - stream associated with record
* @param message - message to buffer
* @return true if this record cause the buffer to flush, otherwise false.
* @throws Exception throw on failure
*/
void addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception;
boolean addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception;

/**
* Flush buffered messages in a writer from a particular stream
Expand All @@ -40,12 +44,4 @@ public interface BufferingStrategy extends AutoCloseable {
*/
void clear() throws Exception;

/**
* When all buffers are being flushed, we can signal some parent function of this event for further
* processing.
*
* THis install such a hook to be triggered when that happens.
*/
void registerFlushAllEventHook(VoidCallable onFlushAllEventHook);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the callback pattern that doesn't seem idiomatic. the fact that the event hook is set after the instantiation of object is a code smell.

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the "flush all event" is indeed a code smell to be looked at, thank you for investigating this.

The buffering strategy refactor was already a pretty large PR on handling AirbyteMessage differently, handling the state messages in a good manner was not necessarily the primary priority there... So the hook "pattern" was introduced/hacked to avoid diverging too much from the previous behavior of destinations on that matter.


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.record_buffer;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter;
Expand Down Expand Up @@ -39,7 +38,6 @@ public class InMemoryRecordBufferingStrategy implements BufferingStrategy {
private final RecordSizeEstimator recordSizeEstimator;
private final long maxQueueSizeInBytes;
private long bufferSizeInBytes;
private VoidCallable onFlushAllEventHook;

public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage> recordWriter,
final long maxQueueSizeInBytes) {
Expand All @@ -55,20 +53,24 @@ public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage>
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
this.bufferSizeInBytes = 0;
this.recordSizeEstimator = new RecordSizeEstimator();
this.onFlushAllEventHook = null;
}

@Override
public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
boolean didFlush = false;

final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(message.getRecord());
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
flushAll();
didFlush = true;
bufferSizeInBytes = 0;
}

final List<AirbyteRecordMessage> bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>());
bufferedRecords.add(message.getRecord());
bufferSizeInBytes += messageSizeInBytes;

return didFlush;
}

@Override
Expand All @@ -91,22 +93,13 @@ public void flushAll() throws Exception {
}, Map.of("bufferSizeInBytes", bufferSizeInBytes));
close();
clear();

if (onFlushAllEventHook != null) {
Copy link
Contributor Author

@cgardens cgardens Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fact that all buffers need to replicate this behavior is a smell.

onFlushAllEventHook.call();
}
}

@Override
public void clear() {
streamBuffer = new HashMap<>();
}

@Override
public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) {
this.onFlushAllEventHook = onFlushAllEventHook;
}

@Override
public void close() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.record_buffer;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.string.Strings;
Expand All @@ -27,7 +26,6 @@ public class SerializedBufferingStrategy implements BufferingStrategy {

private final CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyteCatalog, SerializableBuffer, Exception> onCreateBuffer;
private final CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> onStreamFlush;
private VoidCallable onFlushAllEventHook;

private Map<AirbyteStreamNameNamespacePair, SerializableBuffer> allBuffers = new HashMap<>();
private long totalBufferSizeInBytes;
Expand All @@ -40,16 +38,11 @@ public SerializedBufferingStrategy(final CheckedBiFunction<AirbyteStreamNameName
this.catalog = catalog;
this.onStreamFlush = onStreamFlush;
this.totalBufferSizeInBytes = 0;
this.onFlushAllEventHook = null;
}

@Override
public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) {
this.onFlushAllEventHook = onFlushAllEventHook;
}

@Override
public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
boolean didFlush = false;

final SerializableBuffer streamBuffer = allBuffers.computeIfAbsent(stream, k -> {
LOGGER.info("Starting a new buffer for stream {} (current state: {} in {} buffers)",
Expand All @@ -71,10 +64,15 @@ public void addRecord(final AirbyteStreamNameNamespacePair stream, final Airbyte
if (totalBufferSizeInBytes >= streamBuffer.getMaxTotalBufferSizeInBytes()
|| allBuffers.size() >= streamBuffer.getMaxConcurrentStreamsInBuffer()) {
flushAll();
didFlush = true;
totalBufferSizeInBytes = 0;
} else if (streamBuffer.getByteCount() >= streamBuffer.getMaxPerStreamBufferSizeInBytes()) {
flushWriter(stream, streamBuffer);
// cannot mark didFlush here, because there is no guarantee that all records for a given state have
// been flushed.
}

return didFlush;
}

@Override
Expand All @@ -99,9 +97,6 @@ public void flushAll() throws Exception {
clear();
}, Map.of("bufferSizeInBytes", totalBufferSizeInBytes));

if (onFlushAllEventHook != null) {
onFlushAllEventHook.call();
}
totalBufferSizeInBytes = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter;
Expand All @@ -36,25 +35,19 @@ public void testBuffering() throws Exception {
final AirbyteMessage message2 = generateMessage(stream2);
final AirbyteMessage message3 = generateMessage(stream2);
final AirbyteMessage message4 = generateMessage(stream2);
final VoidCallable hook = mock(VoidCallable.class);
buffering.registerFlushAllEventHook(hook);

buffering.addRecord(stream1, message1);
buffering.addRecord(stream2, message2);
// Buffer still has room
verify(hook, times(0)).call();

buffering.addRecord(stream2, message3);
// Buffer limit reach, flushing all messages so far before adding the new incoming one
verify(hook, times(1)).call();
verify(recordWriter, times(1)).accept(stream1, List.of(message1.getRecord()));
verify(recordWriter, times(1)).accept(stream2, List.of(message2.getRecord()));

buffering.addRecord(stream2, message4);

// force flush to terminate test
buffering.flushAll();
verify(hook, times(2)).call();
verify(recordWriter, times(1)).accept(stream2, List.of(message3.getRecord(), message4.getRecord()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
Expand All @@ -39,7 +38,6 @@ public class SerializedBufferingStrategyTest {
private final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
private final CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> perStreamFlushHook =
mock(CheckedBiConsumer.class);
private final VoidCallable flushAllHook = mock(VoidCallable.class);

private final SerializableBuffer recordWriter1 = mock(SerializableBuffer.class);
private final SerializableBuffer recordWriter2 = mock(SerializableBuffer.class);
Expand Down Expand Up @@ -73,15 +71,13 @@ public void testPerStreamThresholdFlush() throws Exception {
final AirbyteMessage message3 = generateMessage(stream2);
final AirbyteMessage message4 = generateMessage(stream2);
final AirbyteMessage message5 = generateMessage(stream2);
buffering.registerFlushAllEventHook(flushAllHook);

when(recordWriter1.getByteCount()).thenReturn(10L); // one record in recordWriter1
buffering.addRecord(stream1, message1);
when(recordWriter2.getByteCount()).thenReturn(10L); // one record in recordWriter2
buffering.addRecord(stream2, message2);

// Total and per stream Buffers still have room
verify(flushAllHook, times(0)).call();
verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2);

Expand All @@ -91,7 +87,6 @@ public void testPerStreamThresholdFlush() throws Exception {
buffering.addRecord(stream2, message4);

// The buffer limit is now reached for stream2, flushing that single stream only
verify(flushAllHook, times(0)).call();
verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2);

Expand All @@ -100,7 +95,6 @@ public void testPerStreamThresholdFlush() throws Exception {

// force flush to terminate test
buffering.flushAll();
verify(flushAllHook, times(1)).call();
verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(2)).accept(stream2, recordWriter2);
}
Expand All @@ -119,12 +113,10 @@ public void testTotalStreamThresholdFlush() throws Exception {
final AirbyteMessage message4 = generateMessage(stream1);
final AirbyteMessage message5 = generateMessage(stream2);
final AirbyteMessage message6 = generateMessage(stream3);
buffering.registerFlushAllEventHook(flushAllHook);

buffering.addRecord(stream1, message1);
buffering.addRecord(stream2, message2);
// Total and per stream Buffers still have room
verify(flushAllHook, times(0)).call();
verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(0)).accept(stream3, recordWriter3);
Expand All @@ -135,15 +127,13 @@ public void testTotalStreamThresholdFlush() throws Exception {
when(recordWriter2.getByteCount()).thenReturn(20L); // second record in recordWriter2
buffering.addRecord(stream2, message5);
// Buffer limit reached for total streams, flushing all streams
verify(flushAllHook, times(1)).call();
verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3);

buffering.addRecord(stream3, message6);
// force flush to terminate test
buffering.flushAll();
verify(flushAllHook, times(2)).call();
verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(2)).accept(stream3, recordWriter3);
Expand All @@ -162,20 +152,17 @@ public void testConcurrentStreamThresholdFlush() throws Exception {
final AirbyteMessage message3 = generateMessage(stream3);
final AirbyteMessage message4 = generateMessage(stream4);
final AirbyteMessage message5 = generateMessage(stream1);
buffering.registerFlushAllEventHook(flushAllHook);

buffering.addRecord(stream1, message1);
buffering.addRecord(stream2, message2);
buffering.addRecord(stream3, message3);
// Total and per stream Buffers still have room
verify(flushAllHook, times(0)).call();
verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(0)).accept(stream3, recordWriter3);

buffering.addRecord(stream4, message4);
// Buffer limit reached for concurrent streams, flushing all streams
verify(flushAllHook, times(1)).call();
verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3);
Expand All @@ -184,7 +171,6 @@ public void testConcurrentStreamThresholdFlush() throws Exception {
buffering.addRecord(stream1, message5);
// force flush to terminate test
buffering.flushAll();
verify(flushAllHook, times(2)).call();
verify(perStreamFlushHook, times(2)).accept(stream1, recordWriter1);
verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2);
verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3);
Expand Down