Skip to content

Commit 270848a

Browse files
committed
some more fix
Signed-off-by: Wenqi Mou <[email protected]>
1 parent cf8b70c commit 270848a

File tree

4 files changed

+35
-30
lines changed

4 files changed

+35
-30
lines changed

br/pkg/registry/registration.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ type RegistrationInfo struct {
148148
Cmd string
149149
}
150150

151+
type RegistrationInfoWithID struct {
152+
RegistrationInfo
153+
restoreID uint64
154+
}
155+
151156
// Registry manages registrations of restore tasks
152157
type Registry struct {
153158
se glue.Session
@@ -368,13 +373,13 @@ func (r *Registry) PauseTask(ctx context.Context, restoreID uint64) error {
368373
}
369374

370375
// GetRegistrationsByMaxID returns all registrations with IDs smaller than maxID
371-
func (r *Registry) GetRegistrationsByMaxID(ctx context.Context, maxID uint64) ([]RegistrationInfo, error) {
376+
func (r *Registry) GetRegistrationsByMaxID(ctx context.Context, maxID uint64) ([]RegistrationInfoWithID, error) {
372377
if err := r.createTableIfNotExist(ctx); err != nil {
373378
return nil, errors.Trace(err)
374379
}
375380

376381
selectSQL := fmt.Sprintf(selectRegistrationsByMaxIDSQLTemplate, RegistrationDBName, RegistrationTableName)
377-
registrations := make([]RegistrationInfo, 0)
382+
registrations := make([]RegistrationInfoWithID, 0)
378383

379384
execCtx := r.se.GetSessionCtx().GetRestrictedSQLExecutor()
380385
rows, _, errSQL := execCtx.ExecRestrictedSQL(
@@ -408,7 +413,12 @@ func (r *Registry) GetRegistrationsByMaxID(ctx context.Context, maxID uint64) ([
408413
Cmd: cmd,
409414
}
410415

411-
registrations = append(registrations, info)
416+
infoWithID := RegistrationInfoWithID{
417+
info,
418+
row.GetUint64(0),
419+
}
420+
421+
registrations = append(registrations, infoWithID)
412422
}
413423

414424
return registrations, nil
@@ -460,16 +470,17 @@ func (r *Registry) CheckTablesWithRegisteredTasks(
460470
func (r *Registry) checkForTableConflicts(
461471
tracker *utils.PiTRIdTracker,
462472
tables []*metautil.Table,
463-
regInfo RegistrationInfo,
473+
regInfo RegistrationInfoWithID,
464474
f filter.Filter,
465-
restoreID uint64,
475+
curRestoreID uint64,
466476
) error {
467477
// function to handle conflict when found
468478
handleConflict := func(dbName, tableName string) error {
469479
log.Warn("table already covered by another restore task",
480+
zap.Uint64("existing_restore_id", regInfo.restoreID),
481+
zap.Uint64("current_restore_id", curRestoreID),
470482
zap.String("database", dbName),
471483
zap.String("table", tableName),
472-
zap.Uint64("current_restore_id", restoreID),
473484
zap.Strings("filter_strings", regInfo.FilterStrings),
474485
zap.Uint64("start_ts", regInfo.StartTS),
475486
zap.Uint64("restored_ts", regInfo.RestoredTS),
@@ -478,8 +489,8 @@ func (r *Registry) checkForTableConflicts(
478489
zap.String("cmd", regInfo.Cmd))
479490
return errors.Annotatef(berrors.ErrTablesAlreadyExisted,
480491
"table %s.%s cannot be restored by current task with ID %d "+
481-
"because it is already being restored by task (time range: %d->%d, cmd: %s)",
482-
dbName, tableName, restoreID, regInfo.StartTS, regInfo.RestoredTS, regInfo.Cmd)
492+
"because it is already being restored by task (restoreId: %d, time range: %d->%d, cmd: %s)",
493+
dbName, tableName, curRestoreID, regInfo.restoreID, regInfo.StartTS, regInfo.RestoredTS, regInfo.Cmd)
483494
}
484495

485496
// Use PiTRTableTracker if available for PiTR task

br/pkg/task/restore.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -842,20 +842,18 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
842842
// if checkpoint is not persisted, let's just unregister the task since we don't need it
843843
if hasCheckpointPersisted(c, cfg) {
844844
log.Info("pausing restore task from registry",
845-
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
845+
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(restoreError))
846846
if err := restoreRegistry.PauseTask(c, cfg.RestoreID); err != nil {
847847
log.Error("failed to pause restore task from registry",
848848
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
849849
}
850850
} else {
851851
log.Info("unregistering restore task from registry",
852-
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
852+
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(restoreError))
853853
if err := restoreRegistry.Unregister(c, cfg.RestoreID); err != nil {
854854
log.Error("failed to unregister restore task from registry",
855855
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
856856
}
857-
// clean up checkpoint just in case there are some empty checkpoint db/table created but no data
858-
cleanUpCheckpoints(c, cfg, cmdName)
859857
}
860858

861859
return errors.Trace(restoreError)
@@ -874,32 +872,29 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
874872

875873
func cleanUpCheckpoints(ctx context.Context, cfg *RestoreConfig, cmdName string) {
876874
if cfg.UseCheckpoint {
877-
if IsStreamRestore(cmdName) {
878-
log.Info("start to remove checkpoint data for PITR restore")
875+
log.Info("start to remove checkpoint data for PITR restore")
876+
if cfg.logCheckpointMetaManager != nil {
879877
err := cfg.logCheckpointMetaManager.RemoveCheckpointData(ctx)
880878
if err != nil {
881879
log.Warn("failed to remove checkpoint data for log restore", zap.Error(err))
882880
}
883-
err = cfg.sstCheckpointMetaManager.RemoveCheckpointData(ctx)
881+
}
882+
if cfg.sstCheckpointMetaManager != nil {
883+
err := cfg.sstCheckpointMetaManager.RemoveCheckpointData(ctx)
884884
if err != nil {
885885
log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err))
886886
}
887-
// Skip removing snapshot checkpoint data if this is a pure log restore
888-
// (i.e. restoring only from log backup without a base snapshot backup),
889-
// since snapshotCheckpointMetaManager would be nil in that case
890-
if cfg.snapshotCheckpointMetaManager != nil {
891-
err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(ctx)
892-
if err != nil {
893-
log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err))
894-
}
895-
}
896-
} else {
887+
}
888+
// Skip removing snapshot checkpoint data if this is a pure log restore
889+
// (i.e. restoring only from log backup without a base snapshot backup),
890+
// since snapshotCheckpointMetaManager would be nil in that case
891+
if cfg.snapshotCheckpointMetaManager != nil {
897892
err := cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(ctx)
898893
if err != nil {
899894
log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err))
900895
}
901896
}
902-
log.Info("all the checkpoint data removed.")
897+
log.Info("all checkpoint data removed.")
903898
} else {
904899
log.Info("checkpoint not enabled, skip to remove checkpoint data")
905900
}

br/tests/br_pitr/run.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ latest_log_db=$(run_sql "select table_schema from information_schema.tables wher
117117
latest_sst_db=$(run_sql "select table_schema from information_schema.tables where table_schema like '__TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint%' order by table_schema desc limit 1;" | tail -n 1 | awk '{print $2}')
118118
run_sql "DROP DATABASE IF EXISTS \`$latest_log_db\`;"
119119
run_sql "DROP DATABASE IF EXISTS \`$latest_sst_db\`;"
120+
run_sql "DROP DATABASE IF EXISTS __TiDB_BR_Temporary_Restore_Registration_DB;"
121+
120122
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 || ( cat $res_file && exit 1 )
121123

122124
check_result

tests/realtikvtest/brietest/brie_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ func TestCancel(t *testing.T) {
135135
// cleanupRegistry drops the registry tables and database
136136
func cleanupRegistry(tk *testkit.TestKit) {
137137
registryDB := "__TiDB_BR_Temporary_Restore_Registration_DB"
138-
registryTable := "restore_registry"
139-
tk.MustExec(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", registryDB, registryTable))
140138
tk.MustExec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", registryDB))
141139
}
142140

@@ -207,8 +205,7 @@ func TestExistedTables(t *testing.T) {
207205
require.NoError(t, err)
208206

209207
_, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res)
210-
// due to previous restore didn't succeed with checkpoint enabled
211-
require.ErrorContains(t, err, "it is already being restored by task")
208+
require.ErrorContains(t, err, "table already exists")
212209
}()
213210
select {
214211
case <-time.After(20 * time.Second):

0 commit comments

Comments
 (0)