Skip to content

br: compatibility of log backup and log restore #61238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
Expand All @@ -28,6 +29,7 @@ go_library(
"//pkg/parser/ast",
"//pkg/util",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -50,19 +52,22 @@ go_test(
name = "restore_test",
timeout = "short",
srcs = [
"export_test.go",
"import_mode_switcher_test.go",
"misc_test.go",
"restorer_test.go",
],
embed = [":restore"],
flaky = True,
shard_count = 13,
shard_count = 17,
deps = [
":restore",
"//br/pkg/conn",
"//br/pkg/mock",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//pkg/kv",
"//pkg/parser/ast",
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/restore/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package restore

var (
LogRestoreTableIDMarkerFilePrefix = logRestoreTableIDMarkerFilePrefix
ParseLogRestoreTableIDsMarkerFileName = parseLogRestoreTableIDsMarkerFileName
UnmarshalLogRestoreTableIDsMarkerFile = unmarshalLogRestoreTableIDsMarkerFile
)
185 changes: 185 additions & 0 deletions br/pkg/restore/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
package restore

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"fmt"
"path"
"strconv"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
Expand All @@ -34,6 +42,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// deprecated parameter
Expand All @@ -44,6 +53,182 @@ const (
CoarseGrained Granularity = "coarse-grained"
)

const logRestoreTableIDMarkerFilePrefix = "v1/log_restore_tables_blocklists"

type LogRestoreTableIDsMarkerFile struct {
// RestoreCommitTs records the timestamp after PITR restore done. Only the later PITR restore from the log backup of the cluster,
// whose BackupTS is not less than it, can ignore the restore table IDs blocklist recorded in the file.
RestoreCommitTs uint64 `protobuf:"varint,1,opt,name=restore_commit_ts,proto3"`
// SnapshotBackupTs records the BackupTS of the PITR restore. Any PITR restore from the log backup of the cluster, whose restoredTS
// is less than it, can ignore the restore table IDs blocklist recorded in the file.
SnapshotBackupTs uint64 `protobuf:"varint,2,opt,name=snapshot_backup_ts,proto3"`
// TableIDs records the table IDs blocklist of the cluster running the log backup task.
TableIds []int64 `protobuf:"varint,3,rep,packed,name=table_ids,proto3"`
// Checksum records the checksum of other fields.
Checksum []byte `protobuf:"bytes,4,opt,name=checksum,proto3"`
}

func (m *LogRestoreTableIDsMarkerFile) Reset() { *m = LogRestoreTableIDsMarkerFile{} }
func (m *LogRestoreTableIDsMarkerFile) String() string { return proto.CompactTextString(m) }
func (m *LogRestoreTableIDsMarkerFile) ProtoMessage() {}

func (m *LogRestoreTableIDsMarkerFile) filename() string {
return fmt.Sprintf("%s/R%016X_S%016X.meta", logRestoreTableIDMarkerFilePrefix, m.RestoreCommitTs, m.SnapshotBackupTs)
}

func parseLogRestoreTableIDsMarkerFileName(filename string) (restoreCommitTs, snapshotBackupTs uint64, parsed bool) {
filename = path.Base(filename)
if !strings.HasSuffix(filename, ".meta") {
return 0, 0, false
}
if filename[0] != 'R' {
return 0, 0, false
}
ts, err := strconv.ParseUint(filename[1:17], 16, 64)
if err != nil {
log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err))
return 0, 0, false
}
restoreCommitTs = ts
if filename[17] != '_' || filename[18] != 'S' {
return 0, 0, false
}
ts, err = strconv.ParseUint(filename[19:35], 16, 64)
if err != nil {
log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err))
return 0, 0, false
}
snapshotBackupTs = ts
return restoreCommitTs, snapshotBackupTs, true
}

func (markerFile *LogRestoreTableIDsMarkerFile) checksumLogRestoreTableIDsMarkerFile() []byte {
hasher := sha256.New()
hasher.Write(binary.LittleEndian.AppendUint64(nil, markerFile.RestoreCommitTs))
hasher.Write(binary.LittleEndian.AppendUint64(nil, markerFile.SnapshotBackupTs))
for _, tableId := range markerFile.TableIds {
hasher.Write(binary.LittleEndian.AppendUint64(nil, uint64(tableId)))
}
return hasher.Sum(nil)
}

func (markerFile *LogRestoreTableIDsMarkerFile) setChecksumLogRestoreTableIDsMarkerFile() {
markerFile.Checksum = markerFile.checksumLogRestoreTableIDsMarkerFile()
}

// MarshalLogRestoreTableIDsMarkerFile generates an markerfile and marshals it. It returns its filename and the marshaled data.
func MarshalLogRestoreTableIDsMarkerFile(restoreCommitTs, snapshotBackupTs uint64, tableIds []int64) (string, []byte, error) {
markerFile := &LogRestoreTableIDsMarkerFile{
RestoreCommitTs: restoreCommitTs,
SnapshotBackupTs: snapshotBackupTs,
TableIds: tableIds,
}
markerFile.setChecksumLogRestoreTableIDsMarkerFile()
filename := markerFile.filename()
data, err := proto.Marshal(markerFile)
if err != nil {
return "", nil, errors.Trace(err)
}
return filename, data, nil
}

// unmarshalLogRestoreTableIDsMarkerFile unmarshals the given markerfile.
func unmarshalLogRestoreTableIDsMarkerFile(data []byte) (restoreCommitTs, snapshotBackupTs uint64, tableIds []int64, err error) {
markerFile := &LogRestoreTableIDsMarkerFile{}
if err = proto.Unmarshal(data, markerFile); err != nil {
return 0, 0, nil, errors.Trace(err)
}
if !bytes.Equal(markerFile.checksumLogRestoreTableIDsMarkerFile(), markerFile.Checksum) {
return 0, 0, nil, errors.Errorf(
"checksum mismatch (calculated checksum is %s but the recorded checksum is %s), the log restore table IDs blocklist file may be corrupted",
base64.StdEncoding.EncodeToString(markerFile.checksumLogRestoreTableIDsMarkerFile()),
base64.StdEncoding.EncodeToString(markerFile.Checksum),
)
}
return markerFile.RestoreCommitTs, markerFile.SnapshotBackupTs, markerFile.TableIds, nil
}

func fastWalkLogRestoreTableIDsMarkerFile(
ctx context.Context,
s storage.ExternalStorage,
filterOutFn func(restoreCommitTs, snapshotBackupTs uint64) bool,
executionFn func(ctx context.Context, filename string, restoreCommitTs uint64, tableIds []int64) error,
) error {
filenames := make([]string, 0)
if err := s.WalkDir(ctx, &storage.WalkOption{SubDir: logRestoreTableIDMarkerFilePrefix}, func(path string, _ int64) error {
restoreCommitTs, snapshotBackupTs, parsed := parseLogRestoreTableIDsMarkerFileName(path)
if parsed {
if filterOutFn(restoreCommitTs, snapshotBackupTs) {
return nil
}
}
filenames = append(filenames, path)
return nil
}); err != nil {
return errors.Trace(err)
}
workerpool := tidbutil.NewWorkerPool(8, "walk dir log restore table IDs blocklist files")
eg, ectx := errgroup.WithContext(ctx)
for _, filename := range filenames {
if ectx.Err() != nil {
break
}
workerpool.ApplyOnErrorGroup(eg, func() error {
data, err := s.ReadFile(ectx, filename)
if err != nil {
return errors.Trace(err)
}
restoreCommitTs, snapshotBackupTs, tableIds, err := unmarshalLogRestoreTableIDsMarkerFile(data)
if err != nil {
return errors.Trace(err)
}
if filterOutFn(restoreCommitTs, snapshotBackupTs) {
return nil
}
err = executionFn(ectx, filename, restoreCommitTs, tableIds)
return errors.Trace(err)
})
}
return errors.Trace(eg.Wait())
}

// CheckTableTrackerContainsTableIDsFromMarkerFiles checks whether pitr id tracker contains the filtered table IDs from blocklist file.
func CheckTableTrackerContainsTableIDsFromBlocklistFiles(
ctx context.Context,
s storage.ExternalStorage,
tracker *utils.PiTRIdTracker,
startTs, restoredTs uint64,
) error {
err := fastWalkLogRestoreTableIDsMarkerFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool {
return startTs >= restoreCommitTs || restoredTs <= snapshotBackupTs
}, func(_ context.Context, _ string, restoreCommitTs uint64, tableIds []int64) error {
for _, tableId := range tableIds {
if tracker.ContainsTableId(tableId) || tracker.ContainsPartitionId(tableId) {
return errors.Errorf(
"cannot restore the table(Id=%d) because it is restored(at %d) before snapshot backup(at %d). "+
"Please respecify the filter that does not contain the table or replace with a newer snapshot backup.",
tableId, restoreCommitTs, startTs)
}
}
return nil
})
return errors.Trace(err)
}

// TruncateLogRestoreTableIDsMarkerFiles truncates the blocklist files whose restore commit ts is not larger than truncate until ts.
func TruncateLogRestoreTableIDsBlocklistFiles(
ctx context.Context,
s storage.ExternalStorage,
untilTs uint64,
) error {
err := fastWalkLogRestoreTableIDsMarkerFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool {
return untilTs < restoreCommitTs
}, func(ctx context.Context, filename string, _ uint64, _ []int64) error {
return s.DeleteFile(ctx, filename)
})
return errors.Trace(err)
}

type UniqueTableName struct {
DB string
Table string
Expand Down
Loading