-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Improve performance of acknowledge(int index) / override createRecordList implementation #3764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Notably you also rejected this PR which changed the implementation of |
I would add that IMO it would be preferable to listeners if the records batch was not a linked list (at least not by default), because the only benefit of using a linked list is faster mutations.. and that is not something that listeners should ever be doing |
We are in the major If you can prove (or explain) that
I also believe that indeed such a list from the listener container should be immutable, but that (I guess ) is not related to the problem. |
It does preserve an order -- I think proving it would be time consuming for me to do right now, but it's certainly written about (eg. https://www.linkedin.com/pulse/dynamic-duo-java-lists-deep-dive-arraylist-vector-ml-concepts-com). You can also look at the implementation in your JDK of choice and should see that order is preserved.
Yes your suggestion works; however, for the sake of efficiency, I would strongly urge you to re-consider my suggestion. Happy to explain the reason in a follow-up comment if you prefer.
Is that just for |
I'm not sure what you mean.
No, Spring for Apache Kafka |
I was referring to your suggestion of how public List<ConsumerRecord<K, V>> createRecordList( ConsumerRecords<K, V> records) {
Collection<T> recordsList = new ArrayList<>();
records.forEach(recordsList::add);
return recordsList;
} If you initialise an
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());
int index = 0;
while (iterator.hasNext()) {
recordsArray[index] = iterator.next();
index += 1;
}
return List.of(recordsArray);
} Edit: |
Ok great. In that case what do you think about making a 4.0 breaking change such that listeners receive an immutable list; but additionally provide a 3.x release that uses an array list? Or, if that's too breaking for 3.x, then offer the ability to override the (happy to raise a PR if it helps) |
4.0.x is definitely a good time for breaking changes if those changes improve performance, so we are definitely open to PRs. We should not break anything in 3.0.x, though, but we can consider alternative approaches if it is critical for your use case. |
Why cannot we use I think we can just bite a bullet and use an |
// deleted
That is good news :) |
The difference between these two approaches ( The drawback is that you need to do an unchecked cast.. otherwise compiler will complain about ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count()); but i think it's worth it. |
@sobychacko what about in 3.x? And more importantly would you consider changing the underlying |
@janeklb as long as we don't see any regression issues while testing, changing from one impl to another like that may not be a breaking change, so we can do that i guess. |
As per spring-projects#3764 Signed-off-by: Janek Lasocki-Biczysko <[email protected]>
…records Fixes: #3764 Issue link: #3764 Acknowledging an index in a batch has quadratic time `N(N+1)/2` ~ `N^2` Batch consumers operate on a `LinkedList` of records. If the consumer uses `MANUAL_IMMEDIATE` ack mode, and the listener invokes `acknowledgement.acknowledge(index)` where index is relatively big (e.g. when processing batches of `100k`), performance takes hit because of the linear lookup `records.get(i)` in a loop Signed-off-by: Janek Lasocki-Biczysko <[email protected]> [[email protected]: improve commit message] **Auto-cherry-pick to `3.3.x`** Signed-off-by: Artem Bilan <[email protected]>
…records Fixes: #3764 Issue link: #3764 Acknowledging an index in a batch has quadratic time `N(N+1)/2` ~ `N^2` Batch consumers operate on a `LinkedList` of records. If the consumer uses `MANUAL_IMMEDIATE` ack mode, and the listener invokes `acknowledgement.acknowledge(index)` where index is relatively big (e.g. when processing batches of `100k`), performance takes hit because of the linear lookup `records.get(i)` in a loop Signed-off-by: Janek Lasocki-Biczysko <[email protected]> [[email protected]: improve commit message] Signed-off-by: Artem Bilan <[email protected]> (cherry picked from commit 53149d4)
Expected Behavior
Acknowledging an index in a batch should ideally have constant time, but be no worse than linear.
Current Behavior
Acknowledging an index in a batch has quadratic time
N(N+1)/2
~N^2
Context
Batch consumers operate on a
LinkedList
of records. If the consumer usesMANUAL_IMMEDIATE
ack mode, and the listener invokesacknowledgement.acknowledge(index)
whereindex
is relatively big (eg. when processing batches of 100k), performance takes hit because of the linear lookuprecords.get(i)
in a loop hereMaybe the most robust solution would be to offer clients the ability to customise how the record list is created? eg.
eg.
This way we could pass in other implementations to replace
createRecordList
eg.
I also tried to directly replace the existing
createRecordList
with the above implementation; however, it caused one test to fail (SubBatchPerPartitionTests > withFilter()
) becauseList.of
produces an immutable list.List.of
could be replaced withArrays.asList
(and yield anArrayList
), but then filtering would be less efficient (.. i presume, but haven't verified).The text was updated successfully, but these errors were encountered: