From 17379674eb8add90097e4ac2c2bb989f75a105e2 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 22 May 2025 14:49:29 -0400 Subject: [PATCH 1/2] br: add restoreid to idmap table Signed-off-by: Wenqi Mou --- br/pkg/restore/log_client/client.go | 1 + br/pkg/restore/log_client/export_test.go | 1 + br/pkg/restore/log_client/id_map.go | 9 ++-- .../snap_client/systable_restore_test.go | 2 +- pkg/session/bootstrap.go | 46 +++++++------------ 5 files changed, 25 insertions(+), 34 deletions(-) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 85309940850fa..ef3e2348743c6 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -201,6 +201,7 @@ type LogClient struct { currentTS uint64 upstreamClusterID uint64 + restoreID uint64 // the query to insert rows into table `gc_delete_range`, lack of ts. deleteRangeQuery []*stream.PreDelRangeQuery diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 7bf61555a5694..8b4e63a954fcf 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -99,6 +99,7 @@ func TEST_NewLogClient(clusterID, startTS, restoreTS, upstreamClusterID uint64, dom: dom, unsafeSession: se, upstreamClusterID: upstreamClusterID, + restoreID: 0, LogFileManager: &LogFileManager{ startTS: startTS, restoreTS: restoreTS, diff --git a/br/pkg/restore/log_client/id_map.go b/br/pkg/restore/log_client/id_map.go index cf481e75b2e60..03afa8b692b77 100644 --- a/br/pkg/restore/log_client/id_map.go +++ b/br/pkg/restore/log_client/id_map.go @@ -107,14 +107,14 @@ func (rc *LogClient) saveIDMap2Table(ctx context.Context, dbMaps []*backuppb.Pit return errors.Trace(err) } // clean the dirty id map at first - err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID) + err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? and restore_id = %?;", rc.restoreTS, rc.upstreamClusterID, rc.restoreID) if err != nil { return errors.Trace(err) } - replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);" + replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restore_id, restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?, %?);" for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 { endIdx := min(startIdx+PITRIdMapBlockSize, len(data)) - err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx]) + err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreID, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx]) if err != nil { return errors.Trace(err) } @@ -173,12 +173,13 @@ func (rc *LogClient) loadSchemasMapFromTable( ctx context.Context, restoredTS uint64, ) ([]*backuppb.PitrDBMap, error) { - getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;" + getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restore_id = %? and restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;" execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor() rows, _, errSQL := execCtx.ExecRestrictedSQL( kv.WithInternalSourceType(ctx, kv.InternalTxnBR), nil, getPitrIDMapSQL, + rc.restoreID, restoredTS, rc.upstreamClusterID, ) diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index ddea0825e561f..7a0faaf46902e 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -117,5 +117,5 @@ func TestCheckSysTableCompatibility(t *testing.T) { // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(247), session.CurrentBootstrapVersion) + require.Equal(t, int64(248), session.CurrentBootstrapVersion) } diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 43d9259f52978..a215128ca388c 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" infoschemacontext "github.com/pingcap/tidb/pkg/infoschema/context" @@ -62,7 +61,6 @@ import ( "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/timeutil" - "go.etcd.io/etcd/client/v3/concurrency" "go.uber.org/zap" ) @@ -722,12 +720,13 @@ const ( // CreatePITRIDMap is a table that records the id map from upstream to downstream for PITR. CreatePITRIDMap = `CREATE TABLE IF NOT EXISTS mysql.tidb_pitr_id_map ( + restore_id BIGINT NOT NULL, restored_ts BIGINT NOT NULL, upstream_cluster_id BIGINT NOT NULL, segment_id BIGINT NOT NULL, id_map BLOB(524288) NOT NULL, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (restored_ts, upstream_cluster_id, segment_id));` + PRIMARY KEY (restore_id, restored_ts, upstream_cluster_id, segment_id));` // DropMySQLIndexUsageTable removes the table `mysql.schema_index_usage` DropMySQLIndexUsageTable = "DROP TABLE IF EXISTS mysql.schema_index_usage" @@ -1284,11 +1283,15 @@ const ( // version 247 // Add last_stats_histograms_version to mysql.stats_meta. version247 = 247 + + // version 248 + // Update mysql.tidb_pitr_id_map to add restore_id as a primary key field + version248 = 248 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version247 +var currentBootstrapVersion int64 = version248 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1471,6 +1474,7 @@ var ( upgradeToVer245, upgradeToVer246, upgradeToVer247, + upgradeToVer248, } ) @@ -1626,31 +1630,6 @@ func upgrade(s sessiontypes.Session) { } } -// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB. -func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - logutil.BgLogger().Info("Waiting for the DDL owner to be elected in the cluster") - for { - select { - case <-ctx.Done(): - return false, ctx.Err() - case <-ticker.C: - ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx) - if err == concurrency.ErrElectionNoLeader { - continue - } - info, err := infosync.GetAllServerInfo(ctx) - if err != nil { - return false, err - } - if s, ok := info[ownerID]; ok { - return s.Version == mysql.ServerVersion, nil - } - } - } -} - // upgradeToVer2 updates to version 2. func upgradeToVer2(s sessiontypes.Session, ver int64) { if ver >= version2 { @@ -3477,6 +3456,15 @@ func upgradeToVer247(s sessiontypes.Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_stats_histograms_version bigint unsigned DEFAULT NULL", infoschema.ErrColumnExists) } +func upgradeToVer248(s sessiontypes.Session, ver int64) { + if ver >= version248 { + return + } + doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD COLUMN restore_id BIGINT NOT NULL", infoschema.ErrColumnExists) + doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map DROP PRIMARY KEY") + doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD PRIMARY KEY(restore_id, restored_ts, upstream_cluster_id, segment_id)") +} + // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) From e96b674b9cc0999376374dfe0695c0d44788e711 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 22 May 2025 15:40:48 -0400 Subject: [PATCH 2/2] update bazel Signed-off-by: Wenqi Mou --- pkg/session/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 239119a5ce83e..5f00aae5df43b 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -133,7 +133,6 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", - "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore",