Skip to content

br: add restoreid to idmap table #61278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ type LogClient struct {
currentTS uint64

upstreamClusterID uint64
restoreID uint64

// the query to insert rows into table `gc_delete_range`, lack of ts.
deleteRangeQuery []*stream.PreDelRangeQuery
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TEST_NewLogClient(clusterID, startTS, restoreTS, upstreamClusterID uint64,
dom: dom,
unsafeSession: se,
upstreamClusterID: upstreamClusterID,
restoreID: 0,
LogFileManager: &LogFileManager{
startTS: startTS,
restoreTS: restoreTS,
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/restore/log_client/id_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func (rc *LogClient) saveIDMap2Table(ctx context.Context, dbMaps []*backuppb.Pit
return errors.Trace(err)
}
// clean the dirty id map at first
err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID)
err = rc.unsafeSession.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? and restore_id = %?;", rc.restoreTS, rc.upstreamClusterID, rc.restoreID)
if err != nil {
return errors.Trace(err)
}
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);"
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restore_id, restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?, %?);"
for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 {
endIdx := min(startIdx+PITRIdMapBlockSize, len(data))
err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
err := rc.unsafeSession.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreID, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -173,12 +173,13 @@ func (rc *LogClient) loadSchemasMapFromTable(
ctx context.Context,
restoredTS uint64,
) ([]*backuppb.PitrDBMap, error) {
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restore_id = %? and restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()
rows, _, errSQL := execCtx.ExecRestrictedSQL(
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
nil,
getPitrIDMapSQL,
rc.restoreID,
restoredTS,
rc.upstreamClusterID,
)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {

// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(247), session.CurrentBootstrapVersion)
require.Equal(t, int64(248), session.CurrentBootstrapVersion)
}
46 changes: 17 additions & 29 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
infoschemacontext "github.com/pingcap/tidb/pkg/infoschema/context"
Expand Down Expand Up @@ -62,7 +61,6 @@ import (
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/timeutil"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -722,12 +720,13 @@ const (

// CreatePITRIDMap is a table that records the id map from upstream to downstream for PITR.
CreatePITRIDMap = `CREATE TABLE IF NOT EXISTS mysql.tidb_pitr_id_map (
restore_id BIGINT NOT NULL,
restored_ts BIGINT NOT NULL,
upstream_cluster_id BIGINT NOT NULL,
segment_id BIGINT NOT NULL,
id_map BLOB(524288) NOT NULL,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (restored_ts, upstream_cluster_id, segment_id));`
PRIMARY KEY (restore_id, restored_ts, upstream_cluster_id, segment_id));`

// DropMySQLIndexUsageTable removes the table `mysql.schema_index_usage`
DropMySQLIndexUsageTable = "DROP TABLE IF EXISTS mysql.schema_index_usage"
Expand Down Expand Up @@ -1284,11 +1283,15 @@ const (
// version 247
// Add last_stats_histograms_version to mysql.stats_meta.
version247 = 247

// version 248
// Update mysql.tidb_pitr_id_map to add restore_id as a primary key field
version248 = 248
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version247
var currentBootstrapVersion int64 = version248

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1471,6 +1474,7 @@ var (
upgradeToVer245,
upgradeToVer246,
upgradeToVer247,
upgradeToVer248,
}
)

Expand Down Expand Up @@ -1626,31 +1630,6 @@ func upgrade(s sessiontypes.Session) {
}
}

// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused method so removed

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
logutil.BgLogger().Info("Waiting for the DDL owner to be elected in the cluster")
for {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-ticker.C:
ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx)
if err == concurrency.ErrElectionNoLeader {
continue
}
info, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return false, err
}
if s, ok := info[ownerID]; ok {
return s.Version == mysql.ServerVersion, nil
}
}
}
}

// upgradeToVer2 updates to version 2.
func upgradeToVer2(s sessiontypes.Session, ver int64) {
if ver >= version2 {
Expand Down Expand Up @@ -3477,6 +3456,15 @@ func upgradeToVer247(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_stats_histograms_version bigint unsigned DEFAULT NULL", infoschema.ErrColumnExists)
}

func upgradeToVer248(s sessiontypes.Session, ver int64) {
if ver >= version248 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD COLUMN restore_id BIGINT NOT NULL", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map DROP PRIMARY KEY")
doReentrantDDL(s, "ALTER TABLE mysql.tidb_pitr_id_map ADD PRIMARY KEY(restore_id, restored_ts, upstream_cluster_id, segment_id)")
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down