Skip to content

Commit ff346b5

Browse files
tso: fix tso proxy error propagation and add test (#9268)
ref #9188 Signed-off-by: artem_danilov <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 8a19f46 commit ff346b5

File tree

2 files changed

+215
-3
lines changed

2 files changed

+215
-3
lines changed

pkg/utils/tsoutil/tso_dispatcher.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.uber.org/zap"
2525
"google.golang.org/grpc"
2626

27+
"github.com/pingcap/failpoint"
2728
"github.com/pingcap/kvproto/pkg/pdpb"
2829
"github.com/pingcap/log"
2930

@@ -74,10 +75,10 @@ func (s *TSODispatcher) DispatchRequest(serverCtx context.Context, req Request,
7475
key := req.getForwardedHost()
7576
val, loaded := s.dispatchChs.Load(key)
7677
if !loaded {
77-
val = tsoRequestProxyQueue{requestCh: make(chan Request, maxMergeRequests+1)}
78+
val = &tsoRequestProxyQueue{requestCh: make(chan Request, maxMergeRequests+1)}
7879
val, loaded = s.dispatchChs.LoadOrStore(key, val)
7980
}
80-
tsoQueue := val.(tsoRequestProxyQueue)
81+
tsoQueue := val.(*tsoRequestProxyQueue)
8182
if !loaded {
8283
log.Info("start new tso proxy dispatcher", zap.String("forwarded-host", req.getForwardedHost()))
8384
tsDeadlineCh := make(chan *TSDeadline, 1)
@@ -92,7 +93,7 @@ func (s *TSODispatcher) DispatchRequest(serverCtx context.Context, req Request,
9293
}
9394

9495
func (s *TSODispatcher) dispatch(
95-
tsoQueue tsoRequestProxyQueue,
96+
tsoQueue *tsoRequestProxyQueue,
9697
tsoProtoFactory ProtoFactory,
9798
forwardedHost string,
9899
clientConn *grpc.ClientConn,
@@ -103,6 +104,10 @@ func (s *TSODispatcher) dispatch(
103104
defer s.dispatchChs.Delete(forwardedHost)
104105

105106
forwardStream, cancel, err := tsoProtoFactory.createForwardStream(tsoQueue.ctx, clientConn)
107+
failpoint.Inject("canNotCreateForwardStream", func() {
108+
cancel()
109+
err = errors.New("canNotCreateForwardStream")
110+
})
106111
if err != nil || forwardStream == nil {
107112
log.Error("create tso forwarding stream error",
108113
zap.String("forwarded-host", forwardedHost),
@@ -121,6 +126,10 @@ func (s *TSODispatcher) dispatch(
121126
noProxyRequestsTimer := time.NewTimer(tsoProxyStreamIdleTimeout)
122127
for {
123128
noProxyRequestsTimer.Reset(tsoProxyStreamIdleTimeout)
129+
failpoint.Inject("tsoProxyStreamIdleTimeout", func() {
130+
noProxyRequestsTimer.Reset(0)
131+
<-tsoQueue.requestCh // consume the request so that the select below results in the idle case
132+
})
124133
select {
125134
case first := <-tsoQueue.requestCh:
126135
pendingTSOReqCount := len(tsoQueue.requestCh) + 1

tests/server/tso/tso_proxy_test.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tso_test
16+
17+
import (
18+
"context"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/require"
23+
"github.com/stretchr/testify/suite"
24+
25+
"github.com/pingcap/failpoint"
26+
"github.com/pingcap/kvproto/pkg/pdpb"
27+
28+
"github.com/tikv/pd/pkg/tso"
29+
"github.com/tikv/pd/pkg/utils/grpcutil"
30+
"github.com/tikv/pd/pkg/utils/testutil"
31+
"github.com/tikv/pd/tests"
32+
)
33+
34+
type tsoProxyTestSuite struct {
35+
suite.Suite
36+
serverCtx context.Context
37+
serverCancel context.CancelFunc
38+
cluster *tests.TestCluster
39+
leader *tests.TestServer
40+
follower *tests.TestServer
41+
42+
pdClient pdpb.PDClient
43+
defaultReq *pdpb.TsoRequest
44+
proxyClient pdpb.PD_TsoClient
45+
clientCtx context.Context
46+
clientCancel context.CancelFunc
47+
}
48+
49+
func TestTSOProxyTestSuite(t *testing.T) {
50+
suite.Run(t, new(tsoProxyTestSuite))
51+
}
52+
53+
func (s *tsoProxyTestSuite) SetupTest() {
54+
re := s.Require()
55+
56+
var err error
57+
s.serverCtx, s.serverCancel = context.WithCancel(context.Background())
58+
s.cluster, err = tests.NewTestCluster(s.serverCtx, 2)
59+
re.NoError(err)
60+
61+
re.NoError(s.cluster.RunInitialServers())
62+
re.NotEmpty(s.cluster.WaitLeader())
63+
64+
for _, server := range s.cluster.GetServers() {
65+
if server.GetConfig().Name != s.cluster.GetLeader() {
66+
s.follower = server
67+
} else {
68+
s.leader = server
69+
}
70+
}
71+
72+
s.pdClient = testutil.MustNewGrpcClient(re, s.follower.GetAddr())
73+
clusterID := s.leader.GetClusterID()
74+
s.defaultReq = &pdpb.TsoRequest{
75+
Header: testutil.NewRequestHeader(clusterID),
76+
Count: 1,
77+
DcLocation: tso.GlobalDCLocation,
78+
}
79+
80+
s.reCreateProxyClient()
81+
}
82+
83+
func (s *tsoProxyTestSuite) reCreateProxyClient() {
84+
if s.proxyClient != nil {
85+
_ = s.proxyClient.CloseSend()
86+
s.clientCancel()
87+
}
88+
s.proxyClient, s.clientCtx, s.clientCancel = s.createClient()
89+
}
90+
91+
func (s *tsoProxyTestSuite) createClient() (pdpb.PD_TsoClient, context.Context, context.CancelFunc) {
92+
ctx, cancel := context.WithCancel(context.Background())
93+
ctx = grpcutil.BuildForwardContext(ctx, s.leader.GetAddr())
94+
tsoClient, _ := s.pdClient.Tso(ctx)
95+
return tsoClient, ctx, cancel
96+
}
97+
98+
func (s *tsoProxyTestSuite) TearDownTest() {
99+
_ = s.proxyClient.CloseSend()
100+
s.clientCancel()
101+
s.cluster.Destroy()
102+
s.serverCancel()
103+
}
104+
105+
func (s *tsoProxyTestSuite) verifyProxyIsHealthy() {
106+
s.verifyProxyIsHealthyWith(s.proxyClient)
107+
}
108+
109+
func (s *tsoProxyTestSuite) verifyProxyIsHealthyWith(client pdpb.PD_TsoClient) {
110+
re := s.Require()
111+
re.NoError(client.Send(s.defaultReq))
112+
resp, err := client.Recv()
113+
re.NoError(err)
114+
re.Equal(s.defaultReq.GetCount(), resp.GetCount())
115+
timestamp := resp.GetTimestamp()
116+
re.Positive(timestamp.GetPhysical())
117+
re.GreaterOrEqual(uint32(timestamp.GetLogical()), s.defaultReq.GetCount())
118+
}
119+
120+
func (s *tsoProxyTestSuite) assertReceiveError(re *require.Assertions, errStr string) {
121+
re.NoError(s.proxyClient.Send(s.defaultReq))
122+
_, err := s.proxyClient.Recv()
123+
re.Error(err)
124+
re.Contains(err.Error(), errStr)
125+
}
126+
127+
func (s *tsoProxyTestSuite) TestProxyPropagatesLeaderErrorQuickly() {
128+
re := s.Require()
129+
s.verifyProxyIsHealthy()
130+
131+
// change leader
132+
re.NoError(s.cluster.ResignLeader())
133+
134+
start := time.Now()
135+
s.assertReceiveError(re, "pd is not leader of cluster")
136+
137+
// verify fails faster than timeout, otherwise the unavailable time will be too long.
138+
re.Less(time.Since(start), time.Second)
139+
}
140+
141+
func (s *tsoProxyTestSuite) TestProxyClientIsCancelledQuicklyOnServerShutdown() {
142+
re := s.Require()
143+
// open a proxy stream
144+
s.verifyProxyIsHealthy()
145+
146+
s.serverCancel()
147+
148+
start := time.Now()
149+
s.assertReceiveError(re, "Canceled")
150+
151+
// verify fails faster than timeout, otherwise the unavailable time will be too long.
152+
re.Less(time.Since(start), time.Second)
153+
}
154+
155+
func (s *tsoProxyTestSuite) TestProxyCanNotCreateConnectionToLeader() {
156+
re := s.Require()
157+
158+
// set idle timeout to zero
159+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/tsoutil/canNotCreateForwardStream", `return()`))
160+
161+
// send request to trigger failpoint above
162+
s.assertReceiveError(re, "canNotCreateForwardStream")
163+
164+
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/tsoutil/canNotCreateForwardStream"))
165+
166+
// verify stream can be recreated
167+
s.reCreateProxyClient()
168+
s.verifyProxyIsHealthy()
169+
}
170+
171+
func (s *tsoProxyTestSuite) TestClientsContinueToWorkAfterFirstStreamIsClosed() {
172+
s.verifyProxyIsHealthy()
173+
// open second stream
174+
proxyClient, _, cancel := s.createClient()
175+
defer cancel()
176+
defer proxyClient.CloseSend()
177+
178+
// close the first stream
179+
s.proxyClient.CloseSend()
180+
181+
// verify other streams are still working
182+
s.verifyProxyIsHealthyWith(proxyClient)
183+
184+
// restart new stream again
185+
s.reCreateProxyClient()
186+
s.verifyProxyIsHealthy()
187+
}
188+
189+
func (s *tsoProxyTestSuite) TestIdleStreamToLeaderIsClosedAndRecreated() {
190+
re := s.Require()
191+
192+
// set idle timeout to zero
193+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/tsoutil/tsoProxyStreamIdleTimeout", `return()`))
194+
195+
// send request to trigger failpoint above
196+
s.assertReceiveError(re, "TSOProxyStreamIdleTimeout")
197+
198+
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/tsoutil/tsoProxyStreamIdleTimeout"))
199+
s.reCreateProxyClient()
200+
201+
// now sever proxy is closed, let's send one more request to verify reset stream is recreated on demand
202+
s.verifyProxyIsHealthy()
203+
}

0 commit comments

Comments
 (0)