Skip to content

Commit 7ad0c6c

Browse files
committed
add failfast. resolve #139
1 parent 7f0abe3 commit 7ad0c6c

File tree

9 files changed

+304
-15
lines changed

9 files changed

+304
-15
lines changed

cluster/cluster_impl/base_cluster_invoker.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
import (
2626
"github.com/apache/dubbo-go/cluster"
2727
"github.com/apache/dubbo-go/common"
28+
"github.com/apache/dubbo-go/common/constant"
29+
"github.com/apache/dubbo-go/common/extension"
2830
"github.com/apache/dubbo-go/common/utils"
2931
"github.com/apache/dubbo-go/protocol"
3032
"github.com/apache/dubbo-go/version"
@@ -115,12 +117,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
115117
}
116118

117119
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
118-
if len(invoked) > 0 {
119-
for _, i := range invoked {
120-
if i == selectedInvoker {
121-
return true
122-
}
120+
for _, i := range invoked {
121+
if i == selectedInvoker {
122+
return true
123123
}
124124
}
125125
return false
126126
}
127+
128+
func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
129+
url := invoker.GetUrl()
130+
131+
methodName := invocation.MethodName()
132+
//Get the service loadbalance config
133+
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
134+
135+
//Get the service method loadbalance config if have
136+
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {
137+
lb = v
138+
}
139+
return extension.GetLoadbalance(lb)
140+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"github.com/apache/dubbo-go/cluster"
22+
"github.com/apache/dubbo-go/common/extension"
23+
"github.com/apache/dubbo-go/protocol"
24+
)
25+
26+
type failfastCluster struct{}
27+
28+
const failfast = "failfast"
29+
30+
func init() {
31+
extension.SetCluster(failfast, NewFailFastCluster)
32+
}
33+
34+
func NewFailFastCluster() cluster.Cluster {
35+
return &failfastCluster{}
36+
}
37+
38+
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
39+
return newFailFastClusterInvoker(directory)
40+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"github.com/apache/dubbo-go/cluster"
22+
"github.com/apache/dubbo-go/protocol"
23+
)
24+
25+
type failfastClusterInvoker struct {
26+
baseClusterInvoker
27+
}
28+
29+
func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
30+
return &failfastClusterInvoker{
31+
baseClusterInvoker: newBaseClusterInvoker(directory),
32+
}
33+
}
34+
35+
func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
36+
invokers := invoker.directory.List(invocation)
37+
err := invoker.checkInvokers(invokers, invocation)
38+
if err != nil {
39+
return &protocol.RPCResult{Err: err}
40+
}
41+
42+
loadbalance := getLoadBalance(invokers[0], invocation)
43+
44+
err = invoker.checkWhetherDestroyed()
45+
if err != nil {
46+
return &protocol.RPCResult{Err: err}
47+
}
48+
49+
ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
50+
return ivk.Invoke(invocation)
51+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"context"
22+
"testing"
23+
)
24+
25+
import (
26+
"github.com/golang/mock/gomock"
27+
perrors "github.com/pkg/errors"
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
import (
32+
"github.com/apache/dubbo-go/cluster/directory"
33+
"github.com/apache/dubbo-go/cluster/loadbalance"
34+
"github.com/apache/dubbo-go/common"
35+
"github.com/apache/dubbo-go/common/extension"
36+
"github.com/apache/dubbo-go/protocol"
37+
"github.com/apache/dubbo-go/protocol/invocation"
38+
"github.com/apache/dubbo-go/protocol/mock"
39+
)
40+
41+
var (
42+
failfastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
43+
)
44+
45+
// registerFailfast register failfastCluster to cluster extension.
46+
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
47+
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
48+
failfastCluster := NewFailFastCluster()
49+
50+
invokers := []protocol.Invoker{}
51+
invokers = append(invokers, invoker)
52+
53+
invoker.EXPECT().GetUrl().Return(failfastUrl)
54+
55+
staticDir := directory.NewStaticDirectory(invokers)
56+
clusterInvoker := failfastCluster.Join(staticDir)
57+
return clusterInvoker
58+
}
59+
60+
func Test_FailfastInvokeSuccess(t *testing.T) {
61+
ctrl := gomock.NewController(t)
62+
defer ctrl.Finish()
63+
64+
invoker := mock.NewMockInvoker(ctrl)
65+
clusterInvoker := registerFailfast(t, invoker)
66+
67+
invoker.EXPECT().GetUrl().Return(failfastUrl)
68+
69+
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
70+
71+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
72+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
73+
74+
assert.NoError(t, result.Error())
75+
res := result.Result().(rest)
76+
assert.True(t, res.success)
77+
assert.Equal(t, 0, res.tried)
78+
}
79+
80+
func Test_FailfastInvokeFail(t *testing.T) {
81+
ctrl := gomock.NewController(t)
82+
defer ctrl.Finish()
83+
84+
invoker := mock.NewMockInvoker(ctrl)
85+
clusterInvoker := registerFailfast(t, invoker)
86+
87+
invoker.EXPECT().GetUrl().Return(failfastUrl)
88+
89+
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
90+
91+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
92+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
93+
94+
assert.NotNil(t, result.Error())
95+
assert.Equal(t, "error", result.Error().Error())
96+
assert.Nil(t, result.Result())
97+
}

cluster/cluster_impl/failover_cluster_invoker.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
import (
2525
"github.com/apache/dubbo-go/cluster"
2626
"github.com/apache/dubbo-go/common/constant"
27-
"github.com/apache/dubbo-go/common/extension"
2827
"github.com/apache/dubbo-go/common/utils"
2928
"github.com/apache/dubbo-go/protocol"
3029
"github.com/apache/dubbo-go/version"
@@ -48,17 +47,11 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
4847
if err != nil {
4948
return &protocol.RPCResult{Err: err}
5049
}
51-
url := invokers[0].GetUrl()
5250

53-
methodName := invocation.MethodName()
54-
//Get the service loadbalance config
55-
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
51+
loadbalance := getLoadBalance(invokers[0], invocation)
5652

57-
//Get the service method loadbalance config if have
58-
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
59-
lb = v
60-
}
61-
loadbalance := extension.GetLoadbalance(lb)
53+
methodName := invocation.MethodName()
54+
url := invokers[0].GetUrl()
6255

6356
//get reties
6457
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ require (
44
github.com/dubbogo/getty v1.2.0
55
github.com/dubbogo/gost v1.1.1
66
github.com/dubbogo/hessian2 v1.2.0
7+
github.com/golang/mock v1.3.1
78
github.com/magiconair/properties v1.8.1
89
github.com/pkg/errors v0.8.1
910
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
88
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
99
github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw=
1010
github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY=
11+
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
12+
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
1113
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
1214
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
1315
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
@@ -30,12 +32,15 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
3032
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
3133
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
3234
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
35+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
3336
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
3437
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
38+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
3539
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
3640
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
3741
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
3842
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
43+
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
3944
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
4045
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
4146
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

protocol/invoker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/apache/dubbo-go/common/logger"
2323
)
2424

25+
//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker
2526
// Extension - Invoker
2627
type Invoker interface {
2728
common.Node

protocol/mock/mock_invoker.go

Lines changed: 87 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)