Skip to content

Use consistent hashing in KeyShared distribution #6791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 1, 2020

Conversation

merlimat
Copy link
Contributor

Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves.

There are few problems with the current approach:

  1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either.

  2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}

As you can see, c1 will take most of the traffic.

Most likely it will not be able to process all the messages and the backlog builds up.

Modifications

  • No functional difference from user perspective
  • Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
  • Number of points in the ring is configurable, default=100.
  • Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  • @codelipenghui I've removed the selectByIndex(). In my opinion there's absolutely no difference in efficiency/performance as I've also explained on Improve Key_Shared subscription message dispatching performance. #6647 (comment). I'm happy to discuss more about it.

@merlimat merlimat added the type/bug The PR fixed a bug or issue reported a bug label Apr 22, 2020
@merlimat merlimat added this to the 2.6.0 milestone Apr 22, 2020
@merlimat merlimat self-assigned this Apr 22, 2020
@codelipenghui
Copy link
Contributor

@merlimat I'm working on fixing ordering dispatching for consumers adds and removes. This is issue #6554 for it and we have discussed in the slack channel https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1585579082105200

I want to introduce a concept named fenced range, the main idea is for do not block the whole dispatching task when consumers changed(especially add consumer).

The general approach is when a new consumer added, the split range becomes fenced. And every split range has a buffer(it's more like messagesToRedeliver in the dispatcher). The messages that dispatch to this range will add the buffer and do not dispatch to the consumer. When the owner consumer of the split range takes all messages of its receiver queue, the fenced range become un-fenced, so that we can redeliver these messages. Every fenced range has a fence timeout, this can be controlled by users.

I need to rethink how to leverage this idea on the consistent hashing approach. If you have any idea, please ping me, thanks.

The PR looks good.

@merlimat
Copy link
Contributor Author

I need to rethink how to leverage this idea on the consistent hashing approach. If you have any idea, please ping me, thanks.

@codelipenghui Thanks for pointing to this. I have a couple of rough ideas. Let me think it through it a bit more and I'll chime in on that issue.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat Sorry, I forgot one thing. Currently, we have a KeySharedPolicy and KeySharedMode at the client-side so that the user can choose a different selector. I think we can add a CONSISTENT_HASHING in the KeySharedMode. So that you can add a new selector implementation. Any thought?

@merlimat
Copy link
Contributor Author

@codelipenghui AUTO_SPLIT and CONSISTENT_HASHING are essentially 2 different implementations for the same stated behavior. In my view, we should just give the user one good default option. That option should work across all corner cases.

Do you see any particular downside in using consistent hashing compared to the autosplit approach?

@codelipenghui
Copy link
Contributor

AUTO_SPLIT and CONSISTENT_HASHING are essentially 2 different implementations for the same stated behavior. In my view, we should just give the user one good default option. That option should work across all corner cases.

Ok, make sense to me.

If users want to more evenly distributed, they will increase the points. I'm worried about too many consumers(e.g. 5000 consumers) may result in too many nodes in the TreeMap. I'm not sure if this will require more CPU resources, related to #6647 (comment). I will test it later.

Or, can we add a configuration in the broker.conf? so that we don't need to introduce more options at the client-side. I met some users who already use Key_Shared subscription in production. I think we'd better give the user a transition time and we can deprecate the current implementation, change the default value in the broker.conf and finally delete the source code.

@merlimat
Copy link
Contributor Author

If users want to more evenly distributed, they will increase the points. I'm worried about too many consumers(e.g. 5000 consumers) may result in too many nodes in the TreeMap.

Regarding many consumers, sure the default of 100 points could be overkill, since we'd be fine with 1 point per consumer. It would be easy though to dynamically change the number of points in the ring based on the number of consumers. The only part that we need to be careful during this operation is the same as described in #6554, maintaining the ordering after the hash ranges get readjusted.

I'd say to start with current simple approach, and make it dynamic if it becomes a pain point.

I'm not sure if this will require more CPU resources, related to #6647 (comment). I will test it later.

I have a 2nd PR ready (based on top of this one) to address that and other issues on the dispatcher code.

Or, can we add a configuration in the broker.conf? so that we don't need to introduce more options at the client-side. I met some users who already use Key_Shared subscription in production. I think we'd better give the user a transition time and we can deprecate the current implementation, change the default value in the broker.conf and finally delete the source code.

The code for the consumer selection is pretty much self-contained and a pretty basic implementation of consistent hashing.

For what is worth, this new implementation is also already running in production without issues.

That also doesn't remove the fact that the current implementation can easily lead to very imbalanced traffic distribution. In fact, the very first user we got to use key-shared feature got bit by this. That should be regarded as a bug in any case.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great change. Although I would suggest adding it as a separate implementation instead of removing the old implementation completely. This can be a broker side configuration (and potentially be added to namespace policy). Since the current algorithm has been used in many production environments already, we should be concerned about removing an existing implementation. Providing a way to opt-in a different algorithm should be the right approach to go - just like what we did for other algorithms like bundle split, load shedder and etc.

@merlimat
Copy link
Contributor Author

Since the current algorithm has been used in many production environments already, we should be concerned about removing an existing implementation. Providing a way to opt-in a different algorithm should be the right approach to go - just like what we did for other algorithms like bundle split, load shedder and etc.

The current implementation is severely broken. This PR is fixing the bug. I don't see an easy way to fix it based on the splitting range design.

@sijie
Copy link
Member

sijie commented Apr 23, 2020

The current implementation is severely broken. This PR is fixing the bug. I don't see an easy way to fix it based on the splitting range design.

I see there is a problem in the current algorithm when scaling down the number of consumers. But I don't think its broken. In most of the production, people usually don't scale down the number of consumers. They usually scale up the number of consumers. So the extreme case you described in the description probably wasn't really noticed. This is the caveat of the current algorithm. We can find ways to improve it or other people in the community can find a way to improve it.

If you have a better algorithm to resolve the key distribution and the screw problem during split-and-merge, that's great and the community is excited about it. Then let's introduce that algorithm. This allows people to slowly migrate to a new algorithm and fall back as needed.

@merlimat
Copy link
Contributor Author

I see there is a problem in the current algorithm when scaling down the number of consumers. But I don't think its broken.

The Key Shared subscriptions has 2 stated goals:

  1. Distribute the load evenly across available consumers
  2. Retain ordering

The current implementation cannot guarantee either of the 2:

  1. Apart from the scale down I've seen 1 consumer out of 10 being active after a rolling restart
  2. A simple sequence of "add new consumer" and "remove a different consumer" is breaking the ordering guarantee.

So the extreme case you described in the description probably wasn't really noticed.

The first user I saw using this feature for a very simple use case kept going into this state after each rolling restart.
For sure, bugs are always around the corner, though once identified we should should strive to fix them.

If you have a better algorithm to resolve the key distribution and the screw problem during split-and-merge,

That's what I said: There is no easy way to do it in that mode and that's the reason why the textbook solution for this kind of problems is to use consistent hashing. This is not something I came up with.

So, if you're asking "do you have a better algorithm to implement the goals of KeyShared subscription", the answer is: yes, consistent hashing.

Instead, if the question is "do you have a better algorithm to resolve the key distribution and the screw problem during split-and-merge?", my answer is no and: why is that a requirement? We just need one solution that fulfills the 2 goals (traffic distribution and ordering).

Going back to the original question on having a configuration switch. We can do that if there are concerns on this implementation, though I think that, at least by default, the behavior should not have "known" blind spots.

@sijie
Copy link
Member

sijie commented Apr 27, 2020

the original question on having a configuration switch.

Yes. I think the question is to have a configuration switch to allow people to deprecate the current implementation and opt-in the new implementation.

@codelipenghui
Copy link
Contributor

@merlimat I'm planning for 2.6.0 release and It's great if we can onboard the consistent hashing distribution in KeyShared subscription.

@merlimat merlimat force-pushed the key-shared-master branch from 92703ae to 3724dfc Compare May 27, 2020 23:37
@merlimat
Copy link
Contributor Author

@sijie @codelipenghui Added configuration switch for consistent hashing, by default turned off. PTAL.

@merlimat merlimat requested review from codelipenghui and sijie May 27, 2020 23:41
@merlimat merlimat force-pushed the key-shared-master branch from 3724dfc to bf2833c Compare May 27, 2020 23:54
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@merlimat
Copy link
Contributor Author

@sijie PTAL again

@codelipenghui
Copy link
Contributor

ping @sijie Please help take a look at this PR again.

@sijie sijie merged commit 4bf8268 into apache:master Jun 1, 2020
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
@merlimat merlimat deleted the key-shared-master branch June 1, 2020 16:17
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 12, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants