Skip to content

Commit cac7292

Browse files
authored
br: add restoreid to idmap table (#61278)
close #61277
1 parent 9b7cd6a commit cac7292

File tree

6 files changed

+27
-35
lines changed

6 files changed

+27
-35
lines changed

br/pkg/restore/log_client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ type LogClient struct {
201201
currentTS uint64
202202

203203
upstreamClusterID uint64
204+
restoreID uint64
204205

205206
// the query to insert rows into table `gc_delete_range`, lack of ts.
206207
deleteRangeQuery []*stream.PreDelRangeQuery

br/pkg/restore/log_client/export_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func TEST_NewLogClient(clusterID, startTS, restoreTS, upstreamClusterID uint64,
9999
dom: dom,
100100
unsafeSession: se,
101101
upstreamClusterID: upstreamClusterID,
102+
restoreID: 0,
102103
LogFileManager: &LogFileManager{
103104
startTS: startTS,
104105
restoreTS: restoreTS,

br/pkg/restore/log_client/id_map.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ func (rc *LogClient) saveIDMap2Table(ctx context.Context, dbMaps []*backuppb.Pit
107107
return errors.Trace(err)
108108
}
109109
// clean the dirty id map at first
110-
err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID)
110+
err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? and restore_id = %?;", rc.restoreTS, rc.upstreamClusterID, rc.restoreID)
111111
if err != nil {
112112
return errors.Trace(err)
113113
}
114-
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);"
114+
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restore_id, restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?, %?);"
115115
for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 {
116116
endIdx := min(startIdx+PITRIdMapBlockSize, len(data))
117-
err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
117+
err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreID, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
118118
if err != nil {
119119
return errors.Trace(err)
120120
}
@@ -173,12 +173,13 @@ func (rc *LogClient) loadSchemasMapFromTable(
173173
ctx context.Context,
174174
restoredTS uint64,
175175
) ([]*backuppb.PitrDBMap, error) {
176-
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
176+
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restore_id = %? and restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
177177
execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()
178178
rows, _, errSQL := execCtx.ExecRestrictedSQL(
179179
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
180180
nil,
181181
getPitrIDMapSQL,
182+
rc.restoreID,
182183
restoredTS,
183184
rc.upstreamClusterID,
184185
)

br/pkg/restore/snap_client/systable_restore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestCheckSysTableCompatibility(t *testing.T) {
121121
//
122122
// The above variables are in the file br/pkg/restore/systable_restore.go
123123
func TestMonitorTheSystemTableIncremental(t *testing.T) {
124-
require.Equal(t, int64(247), session.CurrentBootstrapVersion)
124+
require.Equal(t, int64(248), session.CurrentBootstrapVersion)
125125
}
126126

127127
func TestIsStatsTemporaryTable(t *testing.T) {

pkg/session/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ go_library(
133133
"@com_github_tikv_client_go_v2//txnkv/transaction",
134134
"@com_github_tikv_client_go_v2//util",
135135
"@io_etcd_go_etcd_client_v3//:client",
136-
"@io_etcd_go_etcd_client_v3//concurrency",
137136
"@org_uber_go_atomic//:atomic",
138137
"@org_uber_go_zap//:zap",
139138
"@org_uber_go_zap//zapcore",

pkg/session/bootstrap.go

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/pingcap/tidb/pkg/bindinfo"
3535
"github.com/pingcap/tidb/pkg/config"
3636
"github.com/pingcap/tidb/pkg/domain"
37-
"github.com/pingcap/tidb/pkg/domain/infosync"
3837
"github.com/pingcap/tidb/pkg/expression"
3938
"github.com/pingcap/tidb/pkg/infoschema"
4039
infoschemacontext "github.com/pingcap/tidb/pkg/infoschema/context"
@@ -62,7 +61,6 @@ import (
6261
"github.com/pingcap/tidb/pkg/util/sqlescape"
6362
"github.com/pingcap/tidb/pkg/util/sqlexec"
6463
"github.com/pingcap/tidb/pkg/util/timeutil"
65-
"go.etcd.io/etcd/client/v3/concurrency"
6664
"go.uber.org/zap"
6765
)
6866

@@ -721,13 +719,16 @@ const (
721719
KEY (status));`
722720

723721
// CreatePITRIDMap is a table that records the id map from upstream to downstream for PITR.
722+
// set restore id default to 0 to make it compatible for old BR tool to restore to a new TiDB, such case should be
723+
// rare though.
724724
CreatePITRIDMap = `CREATE TABLE IF NOT EXISTS mysql.tidb_pitr_id_map (
725+
restore_id BIGINT NOT NULL DEFAULT 0,
725726
restored_ts BIGINT NOT NULL,
726727
upstream_cluster_id BIGINT NOT NULL,
727728
segment_id BIGINT NOT NULL,
728729
id_map BLOB(524288) NOT NULL,
729730
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
730-
PRIMARY KEY (restored_ts, upstream_cluster_id, segment_id));`
731+
PRIMARY KEY (restore_id, restored_ts, upstream_cluster_id, segment_id));`
731732

732733
// DropMySQLIndexUsageTable removes the table `mysql.schema_index_usage`
733734
DropMySQLIndexUsageTable = "DROP TABLE IF EXISTS mysql.schema_index_usage"
@@ -1284,11 +1285,15 @@ const (
12841285
// version 247
12851286
// Add last_stats_histograms_version to mysql.stats_meta.
12861287
version247 = 247
1288+
1289+
// version 248
1290+
// Update mysql.tidb_pitr_id_map to add restore_id as a primary key field
1291+
version248 = 248
12871292
)
12881293

12891294
// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
12901295
// please make sure this is the largest version
1291-
var currentBootstrapVersion int64 = version247
1296+
var currentBootstrapVersion int64 = version248
12921297

12931298
// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
12941299
var internalSQLTimeout = owner.ManagerSessionTTL + 15
@@ -1471,6 +1476,7 @@ var (
14711476
upgradeToVer245,
14721477
upgradeToVer246,
14731478
upgradeToVer247,
1479+
upgradeToVer248,
14741480
}
14751481
)
14761482

@@ -1626,31 +1632,6 @@ func upgrade(s sessiontypes.Session) {
16261632
}
16271633
}
16281634

1629-
// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
1630-
func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) {
1631-
ticker := time.NewTicker(100 * time.Millisecond)
1632-
defer ticker.Stop()
1633-
logutil.BgLogger().Info("Waiting for the DDL owner to be elected in the cluster")
1634-
for {
1635-
select {
1636-
case <-ctx.Done():
1637-
return false, ctx.Err()
1638-
case <-ticker.C:
1639-
ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx)
1640-
if err == concurrency.ErrElectionNoLeader {
1641-
continue
1642-
}
1643-
info, err := infosync.GetAllServerInfo(ctx)
1644-
if err != nil {
1645-
return false, err
1646-
}
1647-
if s, ok := info[ownerID]; ok {
1648-
return s.Version == mysql.ServerVersion, nil
1649-
}
1650-
}
1651-
}
1652-
}
1653-
16541635
// upgradeToVer2 updates to version 2.
16551636
func upgradeToVer2(s sessiontypes.Session, ver int64) {
16561637
if ver >= version2 {
@@ -3477,6 +3458,15 @@ func upgradeToVer247(s sessiontypes.Session, ver int64) {
34773458
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_stats_histograms_version bigint unsigned DEFAULT NULL", infoschema.ErrColumnExists)
34783459
}
34793460

3461+
func upgradeToVer248(s sessiontypes.Session, ver int64) {
3462+
if ver >= version248 {
3463+
return
3464+
}
3465+
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD COLUMN restore_id BIGINT NOT NULL DEFAULT 0", infoschema.ErrColumnExists)
3466+
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map DROP PRIMARY KEY")
3467+
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD PRIMARY KEY(restore_id, restored_ts, upstream_cluster_id, segment_id)")
3468+
}
3469+
34803470
// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
34813471
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
34823472
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)

0 commit comments

Comments
 (0)