Skip to content

br: enable parallel restore #58724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ func TestCheckpointMetaForRestoreOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
defer logMetaManager.Close()
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
}

func TestCheckpointMetaForRestoreOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer logMetaManager.Close()
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
Expand Down Expand Up @@ -137,7 +137,7 @@ func testCheckpointMetaForRestore(
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
taskInfo, err := checkpoint.GetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
require.NoError(t, err)
require.Equal(t, uint64(123), taskInfo.Metadata.UpstreamClusterID)
require.Equal(t, uint64(222), taskInfo.Metadata.RestoredTS)
Expand Down Expand Up @@ -302,15 +302,15 @@ func TestCheckpointRestoreRunnerOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRestoreRunner(t, snapshotMetaManager)
}

func TestCheckpointRestoreRunnerOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRestoreRunner(t, snapshotMetaManager)
Expand Down Expand Up @@ -411,15 +411,15 @@ func TestCheckpointRunnerRetryOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRunnerRetry(t, snapshotMetaManager)
}

func TestCheckpointRunnerRetryOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRunnerRetry(t, snapshotMetaManager)
Expand Down Expand Up @@ -478,15 +478,15 @@ func TestCheckpointRunnerNoRetryOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
}

func TestCheckpointRunnerNoRetryOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
Expand Down Expand Up @@ -533,15 +533,15 @@ func TestCheckpointLogRestoreRunnerOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
defer logMetaManager.Close()
testCheckpointLogRestoreRunner(t, logMetaManager)
}

func TestCheckpointLogRestoreRunnerOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer logMetaManager.Close()
testCheckpointLogRestoreRunner(t, logMetaManager)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *TaskInfoForLogRestore) IdMapSaved() bool {
return t.Progress == InLogRestoreAndIdMapPersisted
}

func TryToGetCheckpointTaskInfo(
func GetCheckpointTaskInfo(
ctx context.Context,
snapshotManager SnapshotMetaManagerT,
logManager LogMetaManagerT,
Expand Down
14 changes: 9 additions & 5 deletions br/pkg/checkpoint/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func NewLogTableMetaManager(
g glue.Glue,
dom *domain.Domain,
dbName string,
restoreID uint64,
) (LogMetaManagerT, error) {
se, err := g.CreateSession(dom.Store())
if err != nil {
Expand All @@ -107,14 +108,15 @@ func NewLogTableMetaManager(
se: se,
runnerSe: runnerSe,
dom: dom,
dbName: dbName,
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
}, nil
}

func NewSnapshotTableMetaManager(
g glue.Glue,
dom *domain.Domain,
dbName string,
restoreID uint64,
) (SnapshotMetaManagerT, error) {
se, err := g.CreateSession(dom.Store())
if err != nil {
Expand All @@ -130,7 +132,7 @@ func NewSnapshotTableMetaManager(
se: se,
runnerSe: runnerSe,
dom: dom,
dbName: dbName,
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
}, nil
}

Expand All @@ -147,7 +149,7 @@ func (manager *TableMetaManager[K, SV, LV, M]) Close() {
}
}

// load the whole checkpoint range data and retrieve the metadata of restored ranges
// LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges
// and return the total time cost in the past executions
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
ctx context.Context,
Expand Down Expand Up @@ -287,14 +289,15 @@ func NewSnapshotStorageMetaManager(
cipher *backuppb.CipherInfo,
clusterID uint64,
prefix string,
restoreID uint64,
) SnapshotMetaManagerT {
return &StorageMetaManager[
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
]{
storage: storage,
cipher: cipher,
clusterID: fmt.Sprintf("%d", clusterID),
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
}
}

Expand All @@ -303,14 +306,15 @@ func NewLogStorageMetaManager(
cipher *backuppb.CipherInfo,
clusterID uint64,
prefix string,
restoreID uint64,
) LogMetaManagerT {
return &StorageMetaManager[
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
]{
storage: storage,
cipher: cipher,
clusterID: fmt.Sprintf("%d", clusterID),
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
}
}

Expand Down
8 changes: 5 additions & 3 deletions br/pkg/checkpoint/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -90,9 +91,10 @@ const (

// IsCheckpointDB checks whether the dbname is checkpoint database.
func IsCheckpointDB(dbname ast.CIStr) bool {
return dbname.O == LogRestoreCheckpointDatabaseName ||
dbname.O == SnapshotRestoreCheckpointDatabaseName ||
dbname.O == CustomSSTRestoreCheckpointDatabaseName
// Check if the database name starts with any of the checkpoint database name prefixes
return strings.HasPrefix(dbname.O, LogRestoreCheckpointDatabaseName) ||
strings.HasPrefix(dbname.O, SnapshotRestoreCheckpointDatabaseName) ||
strings.HasPrefix(dbname.O, CustomSSTRestoreCheckpointDatabaseName)
}

const CheckpointIdMapBlockSize int = 524288
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/registry/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "registry",
srcs = [
"heartbeat.go",
"registration.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/registry",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/metautil",
"//br/pkg/utils",
"//pkg/domain",
"//pkg/kv",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)
114 changes: 114 additions & 0 deletions br/pkg/registry/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
UpdateHeartbeatSQLTemplate = `
UPDATE %s.%s
SET last_heartbeat = %%?
WHERE id = %%?`

// defaultHeartbeatIntervalSeconds is the default interval in seconds between heartbeat updates
defaultHeartbeatIntervalSeconds = 60
)

// UpdateHeartbeat updates the last_heartbeat timestamp for a task
func (r *Registry) UpdateHeartbeat(ctx context.Context, restoreID uint64) error {
currentTime := uint64(time.Now().Unix())
updateSQL := fmt.Sprintf(UpdateHeartbeatSQLTemplate, RegistrationDBName, RegistrationTableName)

if err := r.se.ExecuteInternal(ctx, updateSQL, currentTime, restoreID); err != nil {
return errors.Annotatef(err, "failed to update heartbeat for task %d", restoreID)
}

log.Debug("updated task heartbeat",
zap.Uint64("restore_id", restoreID),
zap.Uint64("timestamp", currentTime))

return nil
}

// HeartbeatManager handles periodic heartbeat updates for a restore task
// it only updates the restore task but will not remove any stalled tasks, the purpose of this logic is to provide
// some insights to user of the task status
type HeartbeatManager struct {
registry *Registry
restoreID uint64
interval time.Duration
stopCh chan struct{}
doneCh chan struct{}
}

// NewHeartbeatManager creates a new heartbeat manager for the given restore task
func NewHeartbeatManager(registry *Registry, restoreID uint64) *HeartbeatManager {
return &HeartbeatManager{
registry: registry,
restoreID: restoreID,
interval: time.Duration(defaultHeartbeatIntervalSeconds) * time.Second,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}

// Start begins the heartbeat background process
func (m *HeartbeatManager) Start(ctx context.Context) {
go func() {
defer close(m.doneCh)

ticker := time.NewTicker(m.interval)
defer ticker.Stop()

// send an initial heartbeat
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
log.Warn("failed to send initial heartbeat",
zap.Uint64("restore_id", m.restoreID),
zap.Error(err))
}

for {
select {
case <-ticker.C:
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
log.Warn("failed to update heartbeat",
zap.Uint64("restore_id", m.restoreID),
zap.Error(err))
}
case <-m.stopCh:
return
case <-ctx.Done():
log.Warn("heartbeat manager context done",
zap.Uint64("restore_id", m.restoreID),
zap.Error(ctx.Err()))
return
}
}
}()
}

// Stop ends the heartbeat background process
func (m *HeartbeatManager) Stop() {
close(m.stopCh)
<-m.doneCh // Wait for goroutine to exit
}
Loading
Loading