Skip to content

tso: enhance timestamp persistency with strong leader consistency (#9171) #9311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ error = '''
reset user timestamp failed, %s
'''

<<<<<<< HEAD
["PD:tso:ErrSetLocalTSOConfig"]
error = '''
set local tso config failed, %s
Expand All @@ -859,6 +860,11 @@ set local tso config failed, %s
["PD:tso:ErrSyncMaxTS"]
error = '''
sync max ts failed, %s
=======
["PD:tso:ErrSaveTimestamp"]
error = '''
save timestamp failed, %s
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
'''

["PD:tso:ErrUpdateTimestamp"]
Expand Down
37 changes: 34 additions & 3 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Leadership struct {
client *clientv3.Client
// leaderKey and leaderValue are key-value pair in etcd
leaderKey string
leaderValue string
leaderValue atomic.Value // Stored as string

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
Expand Down Expand Up @@ -114,6 +114,31 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

<<<<<<< HEAD
=======
// GetLeaderValue is used to get the leader value saved in etcd.
func (ls *Leadership) GetLeaderValue() string {
if ls == nil {
return ""
}
leaderValue := ls.leaderValue.Load()
if leaderValue == nil {
return ""
}
return leaderValue.(string)
}

// SetPrimaryWatch sets the primary watch flag.
func (ls *Leadership) SetPrimaryWatch(val bool) {
ls.primaryWatch.Store(val)
}

// IsPrimary gets the primary watch flag.
func (ls *Leadership) IsPrimary() bool {
return ls.primaryWatch.Load()
}

>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
// Need to make sure `AddCampaignTimes` is called before this function.
func (ls *Leadership) GetCampaignTimesNum() int {
Expand Down Expand Up @@ -150,7 +175,7 @@ func (ls *Leadership) AddCampaignTimes() {

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.leaderValue = leaderData
ls.leaderValue.Store(leaderData)
// Create a new lease to campaign
newLease := &lease{
Purpose: ls.purpose,
Expand Down Expand Up @@ -216,7 +241,7 @@ func (ls *Leadership) LeaderTxn(cs ...clientv3.Cmp) clientv3.Txn {
}

func (ls *Leadership) leaderCmp() clientv3.Cmp {
return clientv3.Compare(clientv3.Value(ls.leaderKey), "=", ls.leaderValue)
return clientv3.Compare(clientv3.Value(ls.leaderKey), "=", ls.GetLeaderValue())
}

// DeleteLeaderKey deletes the corresponding leader from etcd by the leaderPath as the key.
Expand Down Expand Up @@ -393,5 +418,11 @@ func (ls *Leadership) Reset() {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
<<<<<<< HEAD
ls.getLease().Close()
=======
ls.GetLease().Close()
ls.SetPrimaryWatch(false)
ls.leaderValue.Store("")
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
}
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrUpdateTimestamp = errors.Normalize("update timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrUpdateTimestamp"))
ErrSaveTimestamp = errors.Normalize("save timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrSaveTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
Expand Down
13 changes: 13 additions & 0 deletions pkg/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
package errs

import (
<<<<<<< HEAD

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / tso-function-test

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (10)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (8)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / statics

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (1)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (4)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (13)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (3)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (2)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (9)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (7)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (12)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (11)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (5)

missing import path

Check failure on line 18 in pkg/errs/errs.go

View workflow job for this annotation

GitHub Actions / chunks (6)

missing import path
"github.com/pingcap/errors"
=======
"strings"

>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand All @@ -34,3 +39,11 @@
}
return zap.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}
}

// IsLeaderChanged returns true if the error is due to leader changed.
func IsLeaderChanged(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), NotLeaderErr)
}
46 changes: 46 additions & 0 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@

"github.com/pingcap/errors"
"github.com/pingcap/log"
<<<<<<< HEAD

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / tso-function-test

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (10)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (8)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / statics

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (1)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (4)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (13)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (3)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (2)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (9)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (7)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (12)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (11)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (5)

missing import path

Check failure on line 24 in pkg/storage/endpoint/tso.go

View workflow job for this annotation

GitHub Actions / chunks (6)

missing import path
=======

"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
Expand All @@ -29,9 +35,15 @@

// TSOStorage is the interface for timestamp storage.
type TSOStorage interface {
<<<<<<< HEAD
LoadTimestamp(prefix string) (time.Time, error)
SaveTimestamp(key string, ts time.Time) error
DeleteTimestamp(key string) error
=======
LoadTimestamp(groupID uint32) (time.Time, error)
SaveTimestamp(groupID uint32, ts time.Time, leadership *election.Leadership) error
DeleteTimestamp(groupID uint32) error
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
}

var _ TSOStorage = (*StorageEndpoint)(nil)
Expand Down Expand Up @@ -67,10 +79,40 @@
return maxTSWindow, nil
}

<<<<<<< HEAD
// SaveTimestamp saves the timestamp to the storage.
func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
value, err := txn.Load(key)
=======
// SaveTimestamp saves the timestamp to the storage. The leadership is used to check if the current server is leader
// before saving the timestamp to ensure a strong consistency for persistence of the TSO timestamp window.
func (se *StorageEndpoint) SaveTimestamp(groupID uint32, ts time.Time, leadership *election.Leadership) error {
logFilds := []zap.Field{
zap.Uint32("group-id", groupID),
zap.Time("ts", ts),
zap.String("leader-key", leadership.GetLeaderKey()),
zap.String("expected-leader-value", leadership.GetLeaderValue()),
}
log.Info("saving timestamp to the storage", logFilds...)
// The PD leadership or TSO primary will always be granted first before the TSO timestamp window is saved.
// So we here check whether the leader value is filled to see if the requirement is met.
if len(leadership.GetLeaderValue()) == 0 {
return errors.Errorf("%s due to leadership has not been granted yet", errs.NotLeaderErr)
}
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
// Ensure the current server is leader by reading and comparing the leader value.
leaderValue, err := txn.Load(leadership.GetLeaderKey())
if err != nil {
return err
}
if expected := leadership.GetLeaderValue(); leaderValue != expected {
log.Error("leader value does not match", append(logFilds, zap.String("current-leader-value", leaderValue))...)
return errors.Errorf("%s due to leader value does not match, current: %s, expected: %s", errs.NotLeaderErr, leaderValue, expected)
}

value, err := txn.Load(keypath.TimestampPath(groupID))
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
if err != nil {
return err
}
Expand All @@ -79,7 +121,11 @@
if value != "" {
previousTS, err = typeutil.ParseTimestamp([]byte(value))
if err != nil {
<<<<<<< HEAD
log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err))
=======
log.Error("parse timestamp failed", append(logFilds, zap.String("value", value), zap.Error(err))...)
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
return err
}
}
Expand Down
86 changes: 84 additions & 2 deletions pkg/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,44 @@
"time"

"github.com/stretchr/testify/require"
<<<<<<< HEAD

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / tso-function-test

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (10)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (8)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / statics

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (1)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (4)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (13)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (3)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (2)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (9)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (7)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (12)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (11)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (5)

missing import path

Check failure on line 24 in pkg/storage/storage_tso_test.go

View workflow job for this annotation

GitHub Actions / chunks (6)

missing import path
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
)

=======

"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil"
)

const (
testGroupID = uint32(1)
testLeaderKey = "test-leader-key"
testLeaderValue = "test-leader-value"
)

func prepare(t *testing.T) (storage Storage, clean func(), leadership *election.Leadership) {
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
storage = NewStorageWithEtcdBackend(client)
leadership = election.NewLeadership(client, testLeaderKey, "storage_tso_test")
err := leadership.Campaign(60, testLeaderValue)
require.NoError(t, err)
return storage, clean, leadership
}

>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
func TestSaveLoadTimestamp(t *testing.T) {
re := require.New(t)
storage, clean := newTestStorage(t)
storage, clean, leadership := prepare(t)
defer clean()
expectedTS := time.Now().Round(0)
<<<<<<< HEAD
err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS)
=======
err := storage.SaveTimestamp(testGroupID, expectedTS, leadership)
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
re.NoError(err)
ts, err := storage.LoadTimestamp("")
re.NoError(err)
Expand Down Expand Up @@ -67,23 +95,77 @@

func TestTimestampTxn(t *testing.T) {
re := require.New(t)
storage, clean := newTestStorage(t)
storage, clean, leadership := prepare(t)
defer clean()
globalTS1 := time.Now().Round(0)
<<<<<<< HEAD
err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1)
re.NoError(err)

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2)
=======
err := storage.SaveTimestamp(testGroupID, globalTS1, leadership)
re.NoError(err)

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(testGroupID, globalTS2, leadership)
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
re.Error(err)

ts, err := storage.LoadTimestamp("")
re.NoError(err)
re.Equal(globalTS1, ts)
}

<<<<<<< HEAD
func newTestStorage(t *testing.T) (Storage, func()) {
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
return NewStorageWithEtcdBackend(client, rootPath), clean
=======
func TestSaveTimestampWithLeaderCheck(t *testing.T) {
re := require.New(t)
storage, clean, leadership := prepare(t)
defer clean()

// testLeaderKey -> testLeaderValue
globalTS := time.Now().Round(0)
err := storage.SaveTimestamp(testGroupID, globalTS, leadership)
re.NoError(err)
ts, err := storage.LoadTimestamp(testGroupID)
re.NoError(err)
re.Equal(globalTS, ts)

err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), &election.Leadership{})
re.True(errs.IsLeaderChanged(err))
ts, err = storage.LoadTimestamp(testGroupID)
re.NoError(err)
re.Equal(globalTS, ts)

// testLeaderKey -> ""
storage.Save(leadership.GetLeaderKey(), "")
err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), leadership)
re.True(errs.IsLeaderChanged(err))
ts, err = storage.LoadTimestamp(testGroupID)
re.NoError(err)
re.Equal(globalTS, ts)

// testLeaderKey -> non-existent
storage.Remove(leadership.GetLeaderKey())
err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), leadership)
re.True(errs.IsLeaderChanged(err))
ts, err = storage.LoadTimestamp(testGroupID)
re.NoError(err)
re.Equal(globalTS, ts)

// testLeaderKey -> testLeaderValue
storage.Save(leadership.GetLeaderKey(), testLeaderValue)
globalTS = globalTS.Add(time.Second)
err = storage.SaveTimestamp(testGroupID, globalTS, leadership)
re.NoError(err)
ts, err = storage.LoadTimestamp(testGroupID)
re.NoError(err)
re.Equal(globalTS, ts)
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
}
Loading
Loading