diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index c7ae54a0ed815..d8ec4e9a85978 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -84,7 +84,11 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume private boolean hasStarted; private boolean hasClosed; - private AirbyteMessage lastFlushedState; + // represents the last state message for which all of it records have been flushed to tmp storage in + // the destination. + private AirbyteMessage lastFlushedToTmpDstState; + // presents the last state message whose state is waiting to be flushed to tmp storage in the + // destination. private AirbyteMessage pendingState; public BufferedStreamConsumer(final Consumer outputRecordCollector, @@ -103,7 +107,6 @@ public BufferedStreamConsumer(final Consumer outputRecordCollect this.isValidRecord = isValidRecord; this.streamToIgnoredRecordCount = new HashMap<>(); this.bufferingStrategy = bufferingStrategy; - bufferingStrategy.registerFlushAllEventHook(this::flushQueueToDestination); } @Override @@ -134,7 +137,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { return; } - bufferingStrategy.addRecord(stream, message); + // if the buffer flushes, update the states appropriately. + if (bufferingStrategy.addRecord(stream, message)) { + markStatesAsFlushedToTmpDestination(); + } + } else if (message.getType() == Type.STATE) { pendingState = message; } else { @@ -143,9 +150,9 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { } - private void flushQueueToDestination() { + private void markStatesAsFlushedToTmpDestination() { if (pendingState != null) { - lastFlushedState = pendingState; + lastFlushedToTmpDstState = pendingState; pendingState = null; } } @@ -169,13 +176,14 @@ protected void close(final boolean hasFailed) throws Exception { } else { LOGGER.info("executing on success close procedure."); bufferingStrategy.flushAll(); + markStatesAsFlushedToTmpDestination(); } bufferingStrategy.close(); try { // if no state was emitted (i.e. full refresh), if there were still no failures, then we can // still succeed. - if (lastFlushedState == null) { + if (lastFlushedToTmpDstState == null) { onClose.accept(hasFailed); } else { // if any state message flushed that means we can still go for at least a partial success. @@ -184,8 +192,8 @@ protected void close(final boolean hasFailed) throws Exception { // if onClose succeeds without exception then we can emit the state record because it means its // records were not only flushed, but committed. - if (lastFlushedState != null) { - outputRecordCollector.accept(lastFlushedState); + if (lastFlushedToTmpDstState != null) { + outputRecordCollector.accept(lastFlushedToTmpDstState); } } catch (final Exception e) { LOGGER.error("Close failed.", e); diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java index 85f0d0022763c..b638906666282 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; @@ -22,8 +21,13 @@ public interface BufferingStrategy extends AutoCloseable { /** * Add a new message to the buffer while consuming streams + * + * @param stream - stream associated with record + * @param message - message to buffer + * @return true if this record cause ALL records in the buffer to flush, otherwise false. + * @throws Exception throw on failure */ - void addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception; + boolean addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception; /** * Flush buffered messages in a writer from a particular stream @@ -40,12 +44,4 @@ public interface BufferingStrategy extends AutoCloseable { */ void clear() throws Exception; - /** - * When all buffers are being flushed, we can signal some parent function of this event for further - * processing. - * - * THis install such a hook to be triggered when that happens. - */ - void registerFlushAllEventHook(VoidCallable onFlushAllEventHook); - } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java index d01454b500eed..50f01ceece6bd 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.sentry.AirbyteSentry; import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter; @@ -39,7 +38,6 @@ public class InMemoryRecordBufferingStrategy implements BufferingStrategy { private final RecordSizeEstimator recordSizeEstimator; private final long maxQueueSizeInBytes; private long bufferSizeInBytes; - private VoidCallable onFlushAllEventHook; public InMemoryRecordBufferingStrategy(final RecordWriter recordWriter, final long maxQueueSizeInBytes) { @@ -55,20 +53,24 @@ public InMemoryRecordBufferingStrategy(final RecordWriter this.maxQueueSizeInBytes = maxQueueSizeInBytes; this.bufferSizeInBytes = 0; this.recordSizeEstimator = new RecordSizeEstimator(); - this.onFlushAllEventHook = null; } @Override - public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception { + public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception { + boolean didFlush = false; + final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(message.getRecord()); if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { flushAll(); + didFlush = true; bufferSizeInBytes = 0; } final List bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>()); bufferedRecords.add(message.getRecord()); bufferSizeInBytes += messageSizeInBytes; + + return didFlush; } @Override @@ -91,10 +93,6 @@ public void flushAll() throws Exception { }, Map.of("bufferSizeInBytes", bufferSizeInBytes)); close(); clear(); - - if (onFlushAllEventHook != null) { - onFlushAllEventHook.call(); - } } @Override @@ -102,11 +100,6 @@ public void clear() { streamBuffer = new HashMap<>(); } - @Override - public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) { - this.onFlushAllEventHook = onFlushAllEventHook; - } - @Override public void close() throws Exception {} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java index ee4b5b4417506..4ae15e7bdb598 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.string.Strings; @@ -27,7 +26,6 @@ public class SerializedBufferingStrategy implements BufferingStrategy { private final CheckedBiFunction onCreateBuffer; private final CheckedBiConsumer onStreamFlush; - private VoidCallable onFlushAllEventHook; private Map allBuffers = new HashMap<>(); private long totalBufferSizeInBytes; @@ -40,16 +38,11 @@ public SerializedBufferingStrategy(final CheckedBiFunction { LOGGER.info("Starting a new buffer for stream {} (current state: {} in {} buffers)", @@ -71,10 +64,28 @@ public void addRecord(final AirbyteStreamNameNamespacePair stream, final Airbyte if (totalBufferSizeInBytes >= streamBuffer.getMaxTotalBufferSizeInBytes() || allBuffers.size() >= streamBuffer.getMaxConcurrentStreamsInBuffer()) { flushAll(); + didFlush = true; totalBufferSizeInBytes = 0; } else if (streamBuffer.getByteCount() >= streamBuffer.getMaxPerStreamBufferSizeInBytes()) { flushWriter(stream, streamBuffer); + /* + * Note: We intentionally do not mark didFlush as true in the branch of this conditional. Because + * this branch flushes individual streams, there is no guaranteee that it will flush records in the + * same order that state messages were received. The outcome here is that records get flushed but + * our updating of which state messages have been flushed falls behind. + * + * This is not ideal from a checkpoint point of view, because it means in the case where there is a + * failure, we will not be able to report that those records that were flushed and committed were + * committed because there corresponding state messages weren't marked as flushed. Thus, it weakens + * checkpointing, but it does not cause a correctness issue. + * + * In non-failure cases, using this conditional branch relies on the state messages getting flushed + * by some other means. That can be caused by the previous branch in this conditional. It is + * guaranteed by the fact that we always flush all state messages at the end of a sync. + */ } + + return didFlush; } @Override @@ -99,9 +110,6 @@ public void flushAll() throws Exception { clear(); }, Map.of("bufferSizeInBytes", totalBufferSizeInBytes)); - if (onFlushAllEventHook != null) { - onFlushAllEventHook.call(); - } totalBufferSizeInBytes = 0; } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java index bc1029f952931..330b3c998e11c 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java @@ -4,12 +4,13 @@ package io.airbyte.integrations.destination.record_buffer; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; @@ -25,6 +26,7 @@ public class InMemoryRecordBufferingStrategyTest { // instances private static final int MAX_QUEUE_SIZE_IN_BYTES = 130; + @SuppressWarnings("unchecked") private final RecordWriter recordWriter = mock(RecordWriter.class); @Test @@ -36,17 +38,12 @@ public void testBuffering() throws Exception { final AirbyteMessage message2 = generateMessage(stream2); final AirbyteMessage message3 = generateMessage(stream2); final AirbyteMessage message4 = generateMessage(stream2); - final VoidCallable hook = mock(VoidCallable.class); - buffering.registerFlushAllEventHook(hook); - buffering.addRecord(stream1, message1); - buffering.addRecord(stream2, message2); + assertFalse(buffering.addRecord(stream1, message1)); + assertFalse(buffering.addRecord(stream2, message2)); // Buffer still has room - verify(hook, times(0)).call(); - - buffering.addRecord(stream2, message3); + assertTrue(buffering.addRecord(stream2, message3)); // Buffer limit reach, flushing all messages so far before adding the new incoming one - verify(hook, times(1)).call(); verify(recordWriter, times(1)).accept(stream1, List.of(message1.getRecord())); verify(recordWriter, times(1)).accept(stream2, List.of(message2.getRecord())); @@ -54,7 +51,6 @@ public void testBuffering() throws Exception { // force flush to terminate test buffering.flushAll(); - verify(hook, times(2)).call(); verify(recordWriter, times(1)).accept(stream2, List.of(message3.getRecord(), message4.getRecord())); } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java index 397d09e97dadb..2de320114ebed 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java @@ -4,7 +4,9 @@ package io.airbyte.integrations.destination.record_buffer; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -12,7 +14,6 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; @@ -37,9 +38,9 @@ public class SerializedBufferingStrategyTest { private static final long MAX_PER_STREAM_BUFFER_SIZE_BYTES = 21L; private final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); + @SuppressWarnings("unchecked") private final CheckedBiConsumer perStreamFlushHook = mock(CheckedBiConsumer.class); - private final VoidCallable flushAllHook = mock(VoidCallable.class); private final SerializableBuffer recordWriter1 = mock(SerializableBuffer.class); private final SerializableBuffer recordWriter2 = mock(SerializableBuffer.class); @@ -73,34 +74,30 @@ public void testPerStreamThresholdFlush() throws Exception { final AirbyteMessage message3 = generateMessage(stream2); final AirbyteMessage message4 = generateMessage(stream2); final AirbyteMessage message5 = generateMessage(stream2); - buffering.registerFlushAllEventHook(flushAllHook); when(recordWriter1.getByteCount()).thenReturn(10L); // one record in recordWriter1 - buffering.addRecord(stream1, message1); + assertFalse(buffering.addRecord(stream1, message1)); when(recordWriter2.getByteCount()).thenReturn(10L); // one record in recordWriter2 - buffering.addRecord(stream2, message2); + assertFalse(buffering.addRecord(stream2, message2)); // Total and per stream Buffers still have room - verify(flushAllHook, times(0)).call(); verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2); when(recordWriter2.getByteCount()).thenReturn(20L); // second record in recordWriter2 - buffering.addRecord(stream2, message3); + assertFalse(buffering.addRecord(stream2, message3)); when(recordWriter2.getByteCount()).thenReturn(30L); // third record in recordWriter2 - buffering.addRecord(stream2, message4); + assertFalse(buffering.addRecord(stream2, message4)); // The buffer limit is now reached for stream2, flushing that single stream only - verify(flushAllHook, times(0)).call(); verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2); when(recordWriter2.getByteCount()).thenReturn(10L); // back to one record in recordWriter2 - buffering.addRecord(stream2, message5); + assertFalse(buffering.addRecord(stream2, message5)); // force flush to terminate test buffering.flushAll(); - verify(flushAllHook, times(1)).call(); verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(2)).accept(stream2, recordWriter2); } @@ -119,31 +116,27 @@ public void testTotalStreamThresholdFlush() throws Exception { final AirbyteMessage message4 = generateMessage(stream1); final AirbyteMessage message5 = generateMessage(stream2); final AirbyteMessage message6 = generateMessage(stream3); - buffering.registerFlushAllEventHook(flushAllHook); - buffering.addRecord(stream1, message1); - buffering.addRecord(stream2, message2); + assertFalse(buffering.addRecord(stream1, message1)); + assertFalse(buffering.addRecord(stream2, message2)); // Total and per stream Buffers still have room - verify(flushAllHook, times(0)).call(); verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(0)).accept(stream3, recordWriter3); - buffering.addRecord(stream3, message3); + assertFalse(buffering.addRecord(stream3, message3)); when(recordWriter1.getByteCount()).thenReturn(20L); // second record in recordWriter1 - buffering.addRecord(stream1, message4); + assertFalse(buffering.addRecord(stream1, message4)); when(recordWriter2.getByteCount()).thenReturn(20L); // second record in recordWriter2 - buffering.addRecord(stream2, message5); + assertTrue(buffering.addRecord(stream2, message5)); // Buffer limit reached for total streams, flushing all streams - verify(flushAllHook, times(1)).call(); verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3); - buffering.addRecord(stream3, message6); + assertFalse(buffering.addRecord(stream3, message6)); // force flush to terminate test buffering.flushAll(); - verify(flushAllHook, times(2)).call(); verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(2)).accept(stream3, recordWriter3); @@ -162,29 +155,25 @@ public void testConcurrentStreamThresholdFlush() throws Exception { final AirbyteMessage message3 = generateMessage(stream3); final AirbyteMessage message4 = generateMessage(stream4); final AirbyteMessage message5 = generateMessage(stream1); - buffering.registerFlushAllEventHook(flushAllHook); - buffering.addRecord(stream1, message1); - buffering.addRecord(stream2, message2); - buffering.addRecord(stream3, message3); + assertFalse(buffering.addRecord(stream1, message1)); + assertFalse(buffering.addRecord(stream2, message2)); + assertFalse(buffering.addRecord(stream3, message3)); // Total and per stream Buffers still have room - verify(flushAllHook, times(0)).call(); verify(perStreamFlushHook, times(0)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(0)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(0)).accept(stream3, recordWriter3); - buffering.addRecord(stream4, message4); + assertTrue(buffering.addRecord(stream4, message4)); // Buffer limit reached for concurrent streams, flushing all streams - verify(flushAllHook, times(1)).call(); verify(perStreamFlushHook, times(1)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3); verify(perStreamFlushHook, times(1)).accept(stream4, recordWriter4); - buffering.addRecord(stream1, message5); + assertFalse(buffering.addRecord(stream1, message5)); // force flush to terminate test buffering.flushAll(); - verify(flushAllHook, times(2)).call(); verify(perStreamFlushHook, times(2)).accept(stream1, recordWriter1); verify(perStreamFlushHook, times(1)).accept(stream2, recordWriter2); verify(perStreamFlushHook, times(1)).accept(stream3, recordWriter3);