Skip to content

Commit 23ecde6

Browse files
committed
add maglev consistent hashing
Signed-off-by: dongjiang <[email protected]>
1 parent 1d1f32a commit 23ecde6

File tree

7 files changed

+278
-4
lines changed

7 files changed

+278
-4
lines changed

cluster/loadbalance/consistenthashing/loadbalance.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import (
2121
"encoding/json"
2222
"hash/crc32"
2323
"regexp"
24-
)
2524

26-
import (
2725
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
2826
"dubbo.apache.org/dubbo-go/v3/common/constant"
2927
"dubbo.apache.org/dubbo-go/v3/common/extension"

cluster/loadbalance/loadbalance_benchmarks_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ package loadbalance_test
2020
import (
2121
"fmt"
2222
"testing"
23-
)
2423

25-
import (
2624
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
2725
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/aliasmethod"
2826
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing"
2927
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/iwrr"
3028
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive"
29+
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/maglevconsistenthashing"
3130
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c"
3231
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
3332
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/ringhash"
@@ -85,3 +84,7 @@ func BenchmarkRandomLoadbalance(b *testing.B) {
8584
func BenchmarkAliasMethodLoadbalance(b *testing.B) {
8685
Benchloadbalance(b, extension.GetLoadbalance(constant.LoadBalanceKeyAliasMethod))
8786
}
87+
88+
func BenchmarkMaglevConsistentHashingLoadBalance(b *testing.B) {
89+
Benchloadbalance(b, extension.GetLoadbalance(constant.LoadBalanceKeyMaglevConsistentHashing))
90+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// Package maglevconsistenthashing implements ConsistentHash load balance strategy.
19+
package maglevconsistenthashing
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package maglevconsistenthashing
19+
20+
import (
21+
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
22+
"dubbo.apache.org/dubbo-go/v3/common/constant"
23+
"dubbo.apache.org/dubbo-go/v3/common/extension"
24+
"dubbo.apache.org/dubbo-go/v3/protocol"
25+
)
26+
27+
func init() {
28+
extension.SetLoadbalance(constant.LoadBalanceKeyMaglevConsistentHashing, newMaglevConsistentHashingLoadBalance)
29+
}
30+
31+
type maglevConsistentHashingLoadBalance struct{}
32+
33+
// newMaglevConsistentHashingLoadBalance returns a loadbalancer using maglev consistent hashing algorithm.
34+
func newMaglevConsistentHashingLoadBalance() loadbalance.LoadBalance {
35+
return &maglevConsistentHashingLoadBalance{}
36+
}
37+
38+
// Select gets invoker based on interleaved weighted round robine load balancing strategy
39+
func (lb *maglevConsistentHashingLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
40+
count := len(invokers)
41+
if count == 0 {
42+
return nil
43+
}
44+
if count == 1 {
45+
return invokers[0]
46+
}
47+
48+
maglev := NewMaglev(invokers)
49+
50+
methodName := invocation.MethodName()
51+
key := invokers[0].GetURL().ServiceKey() + "." + methodName
52+
53+
return maglev.Pick(key)
54+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package maglevconsistenthashing
19+
20+
import (
21+
"fmt"
22+
"testing"
23+
24+
"dubbo.apache.org/dubbo-go/v3/common"
25+
"dubbo.apache.org/dubbo-go/v3/common/constant"
26+
"dubbo.apache.org/dubbo-go/v3/protocol"
27+
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
func TestMaglevConshashLoadBalanceSelect(t *testing.T) {
32+
loadBalance := newMaglevConsistentHashingLoadBalance()
33+
34+
var invokers []protocol.Invoker
35+
36+
url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
37+
constant.LocalHostValue, constant.DefaultPort))
38+
invokers = append(invokers, protocol.NewBaseInvoker(url))
39+
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
40+
assert.True(t, i.GetURL().URLEqual(url))
41+
42+
for i := 1; i < 10; i++ {
43+
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
44+
invokers = append(invokers, protocol.NewBaseInvoker(url))
45+
}
46+
loadBalance.Select(invokers, &invocation.RPCInvocation{})
47+
}
48+
49+
func TestMaglevConshashLoadBalance(t *testing.T) {
50+
loadBalance := newMaglevConsistentHashingLoadBalance()
51+
52+
var invokers []protocol.Invoker
53+
loop := 10
54+
for i := 1; i <= loop; i++ {
55+
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
56+
invokers = append(invokers, protocol.NewBaseInvoker(url))
57+
}
58+
59+
loop = (1 + loop) * loop / 2
60+
selected := make(map[protocol.Invoker]int)
61+
62+
for i := 1; i <= loop; i++ {
63+
invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
64+
selected[invoker]++
65+
}
66+
67+
sum := 0
68+
for _, value := range selected {
69+
sum += value
70+
}
71+
72+
assert.Equal(t, loop, sum)
73+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package maglevconsistenthashing
19+
20+
import (
21+
"hash/fnv"
22+
23+
"dubbo.apache.org/dubbo-go/v3/protocol"
24+
)
25+
26+
type Maglev struct {
27+
invokers []protocol.Invoker // Instance
28+
lookupTable []int
29+
30+
tableSize int // Prime number
31+
}
32+
33+
func NewMaglev(invokers []protocol.Invoker) *Maglev {
34+
// Next invokers Prime number
35+
pn := NextPrime(len(invokers))
36+
m := &Maglev{
37+
lookupTable: make([]int, pn),
38+
invokers: invokers,
39+
tableSize: pn,
40+
}
41+
m.buildLookupTable()
42+
return m
43+
}
44+
45+
// buildLookupTable 构建 Maglev 查找表
46+
func (m *Maglev) buildLookupTable() {
47+
permutations := m.generatePermutations()
48+
next := make([]int, len(m.invokers))
49+
inserted := 0
50+
51+
for i := range m.lookupTable {
52+
m.lookupTable[i] = -1
53+
}
54+
55+
for inserted < m.tableSize {
56+
for i := range m.invokers {
57+
if inserted >= m.tableSize {
58+
break
59+
}
60+
offset := permutations[i][next[i]%m.tableSize]
61+
for m.lookupTable[offset] != -1 {
62+
next[i]++
63+
offset = permutations[i][next[i]%m.tableSize]
64+
}
65+
m.lookupTable[offset] = i
66+
next[i]++
67+
inserted++
68+
}
69+
}
70+
}
71+
72+
// generatePermutations 生成每个节点的排列
73+
func (m *Maglev) generatePermutations() [][]int {
74+
permutations := make([][]int, len(m.invokers))
75+
for i, node := range m.invokers {
76+
permutations[i] = m.calculatePermutation(node)
77+
}
78+
return permutations
79+
}
80+
81+
// calculatePermutation 计算单个节点的排列
82+
func (m *Maglev) calculatePermutation(node protocol.Invoker) []int {
83+
u := node.GetURL()
84+
address := u.Ip + ":" + u.Port
85+
offset := hash(address) % uint64(m.tableSize)
86+
skip := 1 + (hash(address+"skip") % uint64(m.tableSize-1))
87+
permutation := make([]int, m.tableSize)
88+
for i := 0; i < m.tableSize; i++ {
89+
permutation[i] = int((offset + uint64(i)*skip) % uint64(m.tableSize))
90+
}
91+
return permutation
92+
}
93+
94+
func (m *Maglev) Pick(key string) protocol.Invoker {
95+
index := int(hash(key) % uint64(m.tableSize))
96+
return m.invokers[m.lookupTable[index]]
97+
}
98+
99+
// hash 简单的哈希函数示例
100+
func hash(s string) uint64 {
101+
h := fnv.New64a()
102+
h.Write([]byte(s))
103+
return h.Sum64()
104+
}
105+
106+
// isPrime 函数用于判断一个数是否为素数
107+
func isPrime(num int) bool {
108+
if num < 2 {
109+
return false
110+
}
111+
for i := 2; i*i <= num; i++ {
112+
if num%i == 0 {
113+
return false
114+
}
115+
}
116+
return true
117+
}
118+
119+
// NextPrime 函数用于找出比 n 大的第一个素数
120+
func NextPrime(n int) int {
121+
num := n + 1
122+
for !isPrime(num) {
123+
num++
124+
}
125+
return num
126+
}

common/constant/loadbalance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ const (
2626
LoadXDSRingHash = "xdsringhash"
2727
LoadBalanceKeyInterleavedWeightedRoundRobin = "interleavedweightedroundrobin"
2828
LoadBalanceKeyAliasMethod = "aliasmethod"
29+
LoadBalanceKeyMaglevConsistentHashing = "maglevconsistenthashing"
2930
)

0 commit comments

Comments
 (0)