Skip to content

Commit 2bfc6a9

Browse files
authored
Merge pull request #158 from xujianhai666/feat-broadcast
add broadcast cluster
2 parents 8194fe8 + a6d900f commit 2bfc6a9

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"github.com/apache/dubbo-go/cluster"
22+
"github.com/apache/dubbo-go/common/extension"
23+
"github.com/apache/dubbo-go/protocol"
24+
)
25+
26+
type broadcastCluster struct{}
27+
28+
const broadcast = "broadcast"
29+
30+
func init() {
31+
extension.SetCluster(broadcast, NewBroadcastCluster)
32+
}
33+
34+
func NewBroadcastCluster() cluster.Cluster {
35+
return &broadcastCluster{}
36+
}
37+
38+
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
39+
return newBroadcastClusterInvoker(directory)
40+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"github.com/apache/dubbo-go/cluster"
22+
"github.com/apache/dubbo-go/common/logger"
23+
"github.com/apache/dubbo-go/protocol"
24+
)
25+
26+
type broadcastClusterInvoker struct {
27+
baseClusterInvoker
28+
}
29+
30+
func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
31+
return &broadcastClusterInvoker{
32+
baseClusterInvoker: newBaseClusterInvoker(directory),
33+
}
34+
}
35+
36+
func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
37+
invokers := invoker.directory.List(invocation)
38+
err := invoker.checkInvokers(invokers, invocation)
39+
if err != nil {
40+
return &protocol.RPCResult{Err: err}
41+
}
42+
err = invoker.checkWhetherDestroyed()
43+
if err != nil {
44+
return &protocol.RPCResult{Err: err}
45+
}
46+
47+
var result protocol.Result
48+
for _, ivk := range invokers {
49+
result = ivk.Invoke(invocation)
50+
if result.Error() != nil {
51+
logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk)
52+
err = result.Error()
53+
}
54+
}
55+
if err != nil {
56+
return &protocol.RPCResult{Err: err}
57+
}
58+
return result
59+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"context"
22+
"errors"
23+
"testing"
24+
)
25+
26+
import (
27+
"github.com/golang/mock/gomock"
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
import (
32+
"github.com/apache/dubbo-go/cluster/directory"
33+
"github.com/apache/dubbo-go/cluster/loadbalance"
34+
"github.com/apache/dubbo-go/common"
35+
"github.com/apache/dubbo-go/common/extension"
36+
"github.com/apache/dubbo-go/protocol"
37+
"github.com/apache/dubbo-go/protocol/invocation"
38+
"github.com/apache/dubbo-go/protocol/mock"
39+
)
40+
41+
var (
42+
broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
43+
)
44+
45+
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
46+
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
47+
48+
invokers := []protocol.Invoker{}
49+
for i, ivk := range mockInvokers {
50+
invokers = append(invokers, ivk)
51+
if i == 0 {
52+
ivk.EXPECT().GetUrl().Return(broadcastUrl)
53+
}
54+
}
55+
staticDir := directory.NewStaticDirectory(invokers)
56+
57+
broadcastCluster := NewBroadcastCluster()
58+
clusterInvoker := broadcastCluster.Join(staticDir)
59+
return clusterInvoker
60+
}
61+
62+
func Test_BroadcastInvokeSuccess(t *testing.T) {
63+
ctrl := gomock.NewController(t)
64+
defer ctrl.Finish()
65+
66+
invokers := make([]*mock.MockInvoker, 0)
67+
68+
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
69+
for i := 0; i < 3; i++ {
70+
invoker := mock.NewMockInvoker(ctrl)
71+
invokers = append(invokers, invoker)
72+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
73+
}
74+
75+
clusterInvoker := registerBroadcast(t, invokers...)
76+
77+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
78+
assert.Equal(t, mockResult, result)
79+
}
80+
81+
func Test_BroadcastInvokeFailed(t *testing.T) {
82+
ctrl := gomock.NewController(t)
83+
defer ctrl.Finish()
84+
85+
invokers := make([]*mock.MockInvoker, 0)
86+
87+
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
88+
mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")}
89+
for i := 0; i < 10; i++ {
90+
invoker := mock.NewMockInvoker(ctrl)
91+
invokers = append(invokers, invoker)
92+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
93+
}
94+
{
95+
invoker := mock.NewMockInvoker(ctrl)
96+
invokers = append(invokers, invoker)
97+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
98+
}
99+
for i := 0; i < 10; i++ {
100+
invoker := mock.NewMockInvoker(ctrl)
101+
invokers = append(invokers, invoker)
102+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
103+
}
104+
105+
clusterInvoker := registerBroadcast(t, invokers...)
106+
107+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
108+
assert.Equal(t, mockFailedResult.Err, result.Error())
109+
}

0 commit comments

Comments
 (0)