Skip to content

Improve performance of acknowledge(int index) / override createRecordList implementation #3764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
janeklb opened this issue Feb 24, 2025 · 13 comments · Fixed by #3766
Closed

Comments

@janeklb
Copy link
Contributor

janeklb commented Feb 24, 2025

Expected Behavior

Acknowledging an index in a batch should ideally have constant time, but be no worse than linear.

Current Behavior

Acknowledging an index in a batch has quadratic time N(N+1)/2 ~ N^2

Context

Batch consumers operate on a LinkedList of records. If the consumer uses MANUAL_IMMEDIATE ack mode, and the listener invokes acknowledgement.acknowledge(index) where index is relatively big (eg. when processing batches of 100k), performance takes hit because of the linear lookup records.get(i) in a loop here

Maybe the most robust solution would be to offer clients the ability to customise how the record list is created? eg.

eg.

ConsumerFactory#setRecordListFactory(Function<ConsumerRecords<K, V>,List<ConsumerRecord<K, V>> factory);

This way we could pass in other implementations to replace createRecordList

eg.

private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
  Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
  ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());
  int index = 0;
  while (iterator.hasNext()) {
    recordsArray[index] = iterator.next();
    index += 1;
  }
  return List.of(recordsArray);
}

I also tried to directly replace the existing createRecordList with the above implementation; however, it caused one test to fail (SubBatchPerPartitionTests > withFilter()) because List.of produces an immutable list. List.of could be replaced with Arrays.asList (and yield an ArrayList), but then filtering would be less efficient (.. i presume, but haven't verified).

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

Notably you also rejected this PR which changed the implementation of createRecordList with an ArrayList, though it's unclear if you were rejecting it on the premise of the sorting issue (which my suggestion above addresses), or on a more general principle of not swapping out the underlying records list implementation.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

I would add that IMO it would be preferable to listeners if the records batch was not a linked list (at least not by default), because the only benefit of using a linked list is faster mutations.. and that is not something that listeners should ever be doing

@artembilan
Copy link
Member

We are in the major 4.0 version right now.
So, that is a perfect time to do some breaking changes.

If you can prove (or explain) that ArrayList preserves an order, then we can just do this in the method in question:

public List<ConsumerRecord<K, V>> createRecordList( ConsumerRecords<K, V> records) {
  Collection<T> recordsList = new ArrayList<>();
  records.forEach(recordsList::add);
  return recordsList;
}

I also believe that indeed such a list from the listener container should be immutable, but that (I guess ) is not related to the problem.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

If you can prove (or explain) that ArrayList preserves an order

It does preserve an order -- I think proving it would be time consuming for me to do right now, but it's certainly written about (eg. https://www.linkedin.com/pulse/dynamic-duo-java-lists-deep-dive-arraylist-vector-ml-concepts-com). You can also look at the implementation in your JDK of choice and should see that order is preserved.

then we can just do this in the method in question:

Yes your suggestion works; however, for the sake of efficiency, I would strongly urge you to re-consider my suggestion. Happy to explain the reason in a follow-up comment if you prefer.

We are in the major 4.0 version right now.

Is that just for spring-kafka and if so will it mean that 4.0 will still be compatible with spring-boot etc. 3.x?

@artembilan
Copy link
Member

I would strongly urge you to re-consider my suggestion

I'm not sure what you mean.
If ArrayList preserves an order and it is much efficient than LinkedList, then we don't have any arguments to keep LinkedListand just move on.

will still be compatible with spring-boot etc. 3.x?

No, Spring for Apache Kafka 4.0 is for Spring Boot 4.0 due this November.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

I'm not sure what you mean.

I was referring to your suggestion of how ArrayList could actually be implemented:

public List<ConsumerRecord<K, V>> createRecordList( ConsumerRecords<K, V> records) {
  Collection<T> recordsList = new ArrayList<>();
  records.forEach(recordsList::add);
  return recordsList;
}

If you initialise an ArrayList without specifying an initial capacity and then use add, you will incur the cost of ArrayList resizing itself (under the hood) whenever it's internal array breaches some internal capacity. This is fine for small batch sizes of course.. but if you are dealing with something big (eg. we have listeners with batch size = 100k), then you will be resizing / expanding ArrayList's internal array a lot.

Hence the suggestion (copied again below) to manually build an actual array, and then build a array list using that array.

private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
  Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
  ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());
  int index = 0;
  while (iterator.hasNext()) {
    recordsArray[index] = iterator.next();
    index += 1;
  }
  return List.of(recordsArray);
}

Edit: List.of returns an instance of an immutable list. In order to use an immutable list, the filter mechanism would have to be changed to, so this code isn't fit for purpose atm.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

No, Spring for Apache Kafka 4.0 is for Spring Boot 4.0 due this November.

Ok great. In that case what do you think about making a 4.0 breaking change such that listeners receive an immutable list; but additionally provide a 3.x release that uses an array list? Or, if that's too breaking for 3.x, then offer the ability to override the createRecordList implementation as I suggested above.

(happy to raise a PR if it helps)

@sobychacko
Copy link
Contributor

4.0.x is definitely a good time for breaking changes if those changes improve performance, so we are definitely open to PRs. We should not break anything in 3.0.x, though, but we can consider alternative approaches if it is critical for your use case.

@artembilan
Copy link
Member

to manually build an actual array,

Why cannot we use ConsumerRecords.count() directly for the ArrayList(int initialCapacity) constructor?

I think we can just bite a bullet and use an ArrayList right now, even for 3.x.
I would prefer to have it mutable so far as that LinkedList right now.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

Why cannot we use ConsumerRecords.count() directly for the ArrayList(int initialCapacity) constructor?

Yes, that works too, as long as you use List#set(int index, E element)

// deleted

since List#add(E element) will add it to the end of the list (which will initially be at index records.count())
this is incorrect -- using using add on ArrayList with an initial capacity is totally fine

I think we can just bite a bullet and use an ArrayList right now, even for 3.x.
I would prefer to have it mutable so far as that LinkedList right now.

That is good news :)

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

The difference between these two approaches (new ArrayList<>(recordsArray) vs new ArrayList<>(numRecords)) becomes mostly aesthetic -- i personally prefer the former since, imo, it's more explicit.

The drawback is that you need to do an unchecked cast.. otherwise compiler will complain about

  ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());

but i think it's worth it.

@janeklb
Copy link
Contributor Author

janeklb commented Feb 24, 2025

We should not break anything in 3.0.x

@sobychacko what about in 3.x? And more importantly would you consider changing the underlying List implementation from LinkedList to ArrayList a breaking change?

@sobychacko
Copy link
Contributor

@janeklb as long as we don't see any regression issues while testing, changing from one impl to another like that may not be a breaking change, so we can do that i guess.

janeklb added a commit to janeklb/spring-kafka that referenced this issue Feb 24, 2025
janeklb added a commit to janeklb/spring-kafka that referenced this issue Feb 24, 2025
@artembilan artembilan added this to the 4.0.0-M1 milestone Feb 24, 2025
artembilan pushed a commit that referenced this issue Feb 24, 2025
…records

Fixes: #3764
Issue link: #3764

Acknowledging an index in a batch has quadratic time `N(N+1)/2` ~ `N^2`

Batch consumers operate on a `LinkedList` of records. 
If the consumer uses `MANUAL_IMMEDIATE` ack mode, and the listener invokes `acknowledgement.acknowledge(index)` where index is relatively big (e.g. when processing batches of `100k`), performance takes hit because of the linear lookup `records.get(i)` in a loop

Signed-off-by: Janek Lasocki-Biczysko <[email protected]>

[[email protected]: improve commit message]
**Auto-cherry-pick to `3.3.x`**
Signed-off-by: Artem Bilan <[email protected]>
spring-builds pushed a commit that referenced this issue Feb 24, 2025
…records

Fixes: #3764
Issue link: #3764

Acknowledging an index in a batch has quadratic time `N(N+1)/2` ~ `N^2`

Batch consumers operate on a `LinkedList` of records.
If the consumer uses `MANUAL_IMMEDIATE` ack mode, and the listener invokes `acknowledgement.acknowledge(index)` where index is relatively big (e.g. when processing batches of `100k`), performance takes hit because of the linear lookup `records.get(i)` in a loop

Signed-off-by: Janek Lasocki-Biczysko <[email protected]>

[[email protected]: improve commit message]
Signed-off-by: Artem Bilan <[email protected]>
(cherry picked from commit 53149d4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants