-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Use consistent hashing in KeyShared distribution #6791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@merlimat I'm working on fixing ordering dispatching for consumers adds and removes. This is issue #6554 for it and we have discussed in the slack channel https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1585579082105200 I want to introduce a concept named fenced range, the main idea is for do not block the whole dispatching task when consumers changed(especially add consumer). The general approach is when a new consumer added, the split range becomes fenced. And every split range has a buffer(it's more like I need to rethink how to leverage this idea on the consistent hashing approach. If you have any idea, please ping me, thanks. The PR looks good. |
@codelipenghui Thanks for pointing to this. I have a couple of rough ideas. Let me think it through it a bit more and I'll chime in on that issue. |
.../main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
Outdated
Show resolved
Hide resolved
...roker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat Sorry, I forgot one thing. Currently, we have a KeySharedPolicy
and KeySharedMode
at the client-side so that the user can choose a different selector. I think we can add a CONSISTENT_HASHING
in the KeySharedMode
. So that you can add a new selector implementation. Any thought?
@codelipenghui AUTO_SPLIT and CONSISTENT_HASHING are essentially 2 different implementations for the same stated behavior. In my view, we should just give the user one good default option. That option should work across all corner cases. Do you see any particular downside in using consistent hashing compared to the autosplit approach? |
Ok, make sense to me. If users want to more evenly distributed, they will increase the points. I'm worried about too many consumers(e.g. 5000 consumers) may result in too many nodes in the TreeMap. I'm not sure if this will require more CPU resources, related to #6647 (comment). I will test it later. Or, can we add a configuration in the broker.conf? so that we don't need to introduce more options at the client-side. I met some users who already use Key_Shared subscription in production. I think we'd better give the user a transition time and we can deprecate the current implementation, change the default value in the broker.conf and finally delete the source code. |
Regarding many consumers, sure the default of 100 points could be overkill, since we'd be fine with 1 point per consumer. It would be easy though to dynamically change the number of points in the ring based on the number of consumers. The only part that we need to be careful during this operation is the same as described in #6554, maintaining the ordering after the hash ranges get readjusted. I'd say to start with current simple approach, and make it dynamic if it becomes a pain point.
I have a 2nd PR ready (based on top of this one) to address that and other issues on the dispatcher code.
The code for the consumer selection is pretty much self-contained and a pretty basic implementation of consistent hashing. For what is worth, this new implementation is also already running in production without issues. That also doesn't remove the fact that the current implementation can easily lead to very imbalanced traffic distribution. In fact, the very first user we got to use key-shared feature got bit by this. That should be regarded as a bug in any case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great change. Although I would suggest adding it as a separate implementation instead of removing the old implementation completely. This can be a broker side configuration (and potentially be added to namespace policy). Since the current algorithm has been used in many production environments already, we should be concerned about removing an existing implementation. Providing a way to opt-in a different algorithm should be the right approach to go - just like what we did for other algorithms like bundle split, load shedder and etc.
The current implementation is severely broken. This PR is fixing the bug. I don't see an easy way to fix it based on the splitting range design. |
I see there is a problem in the current algorithm when scaling down the number of consumers. But I don't think its broken. In most of the production, people usually don't scale down the number of consumers. They usually scale up the number of consumers. So the extreme case you described in the description probably wasn't really noticed. This is the caveat of the current algorithm. We can find ways to improve it or other people in the community can find a way to improve it. If you have a better algorithm to resolve the key distribution and the screw problem during split-and-merge, that's great and the community is excited about it. Then let's introduce that algorithm. This allows people to slowly migrate to a new algorithm and fall back as needed. |
The Key Shared subscriptions has 2 stated goals:
The current implementation cannot guarantee either of the 2:
The first user I saw using this feature for a very simple use case kept going into this state after each rolling restart.
That's what I said: There is no easy way to do it in that mode and that's the reason why the textbook solution for this kind of problems is to use consistent hashing. This is not something I came up with. So, if you're asking "do you have a better algorithm to implement the goals of KeyShared subscription", the answer is: yes, consistent hashing. Instead, if the question is "do you have a better algorithm to resolve the key distribution and the screw problem during split-and-merge?", my answer is no and: why is that a requirement? We just need one solution that fulfills the 2 goals (traffic distribution and ordering). Going back to the original question on having a configuration switch. We can do that if there are concerns on this implementation, though I think that, at least by default, the behavior should not have "known" blind spots. |
Yes. I think the question is to have a configuration switch to allow people to deprecate the current implementation and opt-in the new implementation. |
@merlimat I'm planning for 2.6.0 release and It's great if we can onboard the consistent hashing distribution in KeyShared subscription. |
92703ae
to
3724dfc
Compare
@sijie @codelipenghui Added configuration switch for consistent hashing, by default turned off. PTAL. |
3724dfc
to
bf2833c
Compare
/pulsarbot run-failure-checks |
@sijie PTAL again |
ping @sijie Please help take a look at this PR again. |
### Motivation The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. There are few problems with the current approach: 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic. This is an example of removing consumers in sequence, with attached the size of their respective hash ranges: ``` Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096} Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096} ``` As you can see, `c1` will take most of the traffic. Most likely it will not be able to process all the messages and the backlog builds up. ### Modifications * No functional difference from user perspective * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases. * Number of points in the ring is configurable, default=100. * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers. * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
### Motivation The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. There are few problems with the current approach: 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic. This is an example of removing consumers in sequence, with attached the size of their respective hash ranges: ``` Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096} Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096} ``` As you can see, `c1` will take most of the traffic. Most likely it will not be able to process all the messages and the backlog builds up. ### Modifications * No functional difference from user perspective * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases. * Number of points in the ring is configurable, default=100. * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers. * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
### Motivation The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. There are few problems with the current approach: 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic. This is an example of removing consumers in sequence, with attached the size of their respective hash ranges: ``` Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096} Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096} ``` As you can see, `c1` will take most of the traffic. Most likely it will not be able to process all the messages and the backlog builds up. ### Modifications * No functional difference from user perspective * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases. * Number of points in the ring is configurable, default=100. * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers. * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
### Motivation The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. There are few problems with the current approach: 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic. This is an example of removing consumers in sequence, with attached the size of their respective hash ranges: ``` Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096} Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096} Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096} ``` As you can see, `c1` will take most of the traffic. Most likely it will not be able to process all the messages and the backlog builds up. ### Modifications * No functional difference from user perspective * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases. * Number of points in the ring is configurable, default=100. * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers. * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Motivation
The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves.
There are few problems with the current approach:
When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either.
When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.
This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:
As you can see,
c1
will take most of the traffic.Most likely it will not be able to process all the messages and the backlog builds up.
Modifications
selectByIndex()
. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on Improve Key_Shared subscription message dispatching performance. #6647 (comment). I'm happy to discuss more about it.