-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Refactor state management out of BufferStrategy #13669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
92af841
0da650b
3231e0a
6da5a73
a8e0387
a74026b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ | |
|
||
package io.airbyte.integrations.destination.record_buffer; | ||
|
||
import io.airbyte.commons.concurrency.VoidCallable; | ||
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
|
||
|
@@ -22,8 +21,13 @@ public interface BufferingStrategy extends AutoCloseable { | |
|
||
/** | ||
* Add a new message to the buffer while consuming streams | ||
* | ||
* @param stream - stream associated with record | ||
* @param message - message to buffer | ||
* @return true if this record cause the buffer to flush, otherwise false. | ||
* @throws Exception throw on failure | ||
*/ | ||
void addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception; | ||
boolean addRecord(AirbyteStreamNameNamespacePair stream, AirbyteMessage message) throws Exception; | ||
|
||
/** | ||
* Flush buffered messages in a writer from a particular stream | ||
|
@@ -40,12 +44,4 @@ public interface BufferingStrategy extends AutoCloseable { | |
*/ | ||
void clear() throws Exception; | ||
|
||
/** | ||
* When all buffers are being flushed, we can signal some parent function of this event for further | ||
* processing. | ||
* | ||
* THis install such a hook to be triggered when that happens. | ||
*/ | ||
void registerFlushAllEventHook(VoidCallable onFlushAllEventHook); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the callback pattern that doesn't seem idiomatic. the fact that the event hook is set after the instantiation of object is a code smell. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the "flush all event" is indeed a code smell to be looked at, thank you for investigating this. The buffering strategy refactor was already a pretty large PR on handling AirbyteMessage differently, handling the state messages in a good manner was not necessarily the primary priority there... So the hook "pattern" was introduced/hacked to avoid diverging too much from the previous behavior of destinations on that matter. |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ | |
|
||
package io.airbyte.integrations.destination.record_buffer; | ||
|
||
import io.airbyte.commons.concurrency.VoidCallable; | ||
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.integrations.base.sentry.AirbyteSentry; | ||
import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter; | ||
|
@@ -39,7 +38,6 @@ public class InMemoryRecordBufferingStrategy implements BufferingStrategy { | |
private final RecordSizeEstimator recordSizeEstimator; | ||
private final long maxQueueSizeInBytes; | ||
private long bufferSizeInBytes; | ||
private VoidCallable onFlushAllEventHook; | ||
|
||
public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage> recordWriter, | ||
final long maxQueueSizeInBytes) { | ||
|
@@ -55,20 +53,24 @@ public InMemoryRecordBufferingStrategy(final RecordWriter<AirbyteRecordMessage> | |
this.maxQueueSizeInBytes = maxQueueSizeInBytes; | ||
this.bufferSizeInBytes = 0; | ||
this.recordSizeEstimator = new RecordSizeEstimator(); | ||
this.onFlushAllEventHook = null; | ||
} | ||
|
||
@Override | ||
public void addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception { | ||
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception { | ||
boolean didFlush = false; | ||
|
||
final long messageSizeInBytes = recordSizeEstimator.getEstimatedByteSize(message.getRecord()); | ||
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { | ||
flushAll(); | ||
didFlush = true; | ||
bufferSizeInBytes = 0; | ||
} | ||
|
||
final List<AirbyteRecordMessage> bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>()); | ||
bufferedRecords.add(message.getRecord()); | ||
bufferSizeInBytes += messageSizeInBytes; | ||
|
||
return didFlush; | ||
} | ||
|
||
@Override | ||
|
@@ -91,22 +93,13 @@ public void flushAll() throws Exception { | |
}, Map.of("bufferSizeInBytes", bufferSizeInBytes)); | ||
close(); | ||
clear(); | ||
|
||
if (onFlushAllEventHook != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the fact that all buffers need to replicate this behavior is a smell. |
||
onFlushAllEventHook.call(); | ||
} | ||
} | ||
|
||
@Override | ||
public void clear() { | ||
streamBuffer = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public void registerFlushAllEventHook(final VoidCallable onFlushAllEventHook) { | ||
this.onFlushAllEventHook = onFlushAllEventHook; | ||
} | ||
|
||
@Override | ||
public void close() throws Exception {} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.