Skip to content

Commit 2854159

Browse files
committed
some more fix
Signed-off-by: Wenqi Mou <[email protected]>
1 parent f9fa7b8 commit 2854159

File tree

6 files changed

+86
-17
lines changed

6 files changed

+86
-17
lines changed

br/pkg/registry/registration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (r *Registry) checkForTableConflicts(
476476
zap.Uint64("upstream_cluster_id", regInfo.UpstreamClusterID),
477477
zap.Bool("with_sys_table", regInfo.WithSysTable),
478478
zap.String("cmd", regInfo.Cmd))
479-
return errors.Annotatef(berrors.ErrInvalidArgument,
479+
return errors.Annotatef(berrors.ErrTablesAlreadyExisted,
480480
"table %s.%s cannot be restored by current task with ID %d "+
481481
"because it is already being restored by task (time range: %d->%d, cmd: %s)",
482482
dbName, tableName, restoreID, regInfo.StartTS, regInfo.RestoredTS, regInfo.Cmd)

br/pkg/restore/misc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,6 @@ func GetTSWithRetry(ctx context.Context, pdClient pd.Client) (uint64, error) {
162162
}
163163

164164
// IsBRInternalDB checks whether it's a db used internally by BR.
165-
func IsBRInternalDB(dbLowerName string) bool {
166-
return registry.IsRestoreRegistryDB(dbLowerName)
165+
func IsBRInternalDB(dbName string) bool {
166+
return registry.IsRestoreRegistryDB(dbName)
167167
}

br/pkg/task/restore.go

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -835,19 +835,29 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
835835
restoreError = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig)
836836
}
837837
if restoreError != nil {
838-
// if err happens at register phase no restoreID will be generated and default is 0
839-
if cfg.UseCheckpoint && cfg.RestoreID != 0 {
838+
// if err happens at register phase no restoreID will be generated and default is 0.
839+
if cfg.RestoreID == 0 {
840+
return errors.Trace(restoreError)
841+
}
842+
// if checkpoint is not persisted, let's just unregister the task since we don't need it
843+
if hasCheckpointPersisted(c, cfg) {
844+
log.Info("pausing restore task from registry",
845+
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
840846
if err := restoreRegistry.PauseTask(c, cfg.RestoreID); err != nil {
841-
log.Error("failed to pause restore task after restore failed", zap.Error(err),
847+
log.Error("failed to pause restore task from registry",
842848
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
843849
}
844850
} else {
845-
// if no checkpoint enabled, will not retry using checkpoint so just unregister
851+
log.Info("unregistering restore task from registry",
852+
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
846853
if err := restoreRegistry.Unregister(c, cfg.RestoreID); err != nil {
847854
log.Error("failed to unregister restore task from registry",
848855
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
849856
}
857+
// clean up checkpoint just in case there are some empty checkpoint db/table created but no data
858+
cleanUpCheckpoints(c, cfg, cmdName)
850859
}
860+
851861
return errors.Trace(restoreError)
852862
}
853863

@@ -857,36 +867,65 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
857867
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
858868
}
859869

860-
// Clear the checkpoint data
870+
// Clear the checkpoint data if needed
871+
cleanUpCheckpoints(c, cfg, cmdName)
872+
return nil
873+
}
874+
875+
func cleanUpCheckpoints(ctx context.Context, cfg *RestoreConfig, cmdName string) {
861876
if cfg.UseCheckpoint {
862877
if IsStreamRestore(cmdName) {
863878
log.Info("start to remove checkpoint data for PITR restore")
864-
err = cfg.logCheckpointMetaManager.RemoveCheckpointData(c)
879+
err := cfg.logCheckpointMetaManager.RemoveCheckpointData(ctx)
865880
if err != nil {
866881
log.Warn("failed to remove checkpoint data for log restore", zap.Error(err))
867882
}
868-
err = cfg.sstCheckpointMetaManager.RemoveCheckpointData(c)
883+
err = cfg.sstCheckpointMetaManager.RemoveCheckpointData(ctx)
869884
if err != nil {
870885
log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err))
871886
}
872887
// Skip removing snapshot checkpoint data if this is a pure log restore
873888
// (i.e. restoring only from log backup without a base snapshot backup),
874889
// since snapshotCheckpointMetaManager would be nil in that case
875890
if cfg.snapshotCheckpointMetaManager != nil {
876-
err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(c)
891+
err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(ctx)
877892
if err != nil {
878893
log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err))
879894
}
880895
}
881896
} else {
882-
err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(c)
897+
err := cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(ctx)
883898
if err != nil {
884899
log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err))
885900
}
886901
}
887-
log.Info("all the checkpoint data is removed.")
902+
log.Info("all the checkpoint data removed.")
903+
} else {
904+
log.Info("checkpoint not enabled, skip to remove checkpoint data")
888905
}
889-
return nil
906+
}
907+
908+
// hasCheckpointPersisted checks if there are any checkpoint data persisted in storage or tables
909+
func hasCheckpointPersisted(ctx context.Context, cfg *RestoreConfig) bool {
910+
if cfg.snapshotCheckpointMetaManager != nil {
911+
exists, err := cfg.snapshotCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
912+
if err == nil && exists {
913+
return true
914+
}
915+
}
916+
if cfg.logCheckpointMetaManager != nil {
917+
exists, err := cfg.logCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
918+
if err == nil && exists {
919+
return true
920+
}
921+
}
922+
if cfg.sstCheckpointMetaManager != nil {
923+
exists, err := cfg.sstCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
924+
if err == nil && exists {
925+
return true
926+
}
927+
}
928+
return false
890929
}
891930

892931
type SnapshotRestoreConfig struct {

br/tests/br_pitr/run.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ fi
113113

114114
# PITR restore
115115
echo "run pitr"
116-
run_sql "DROP DATABASE __TiDB_BR_Temporary_Log_Restore_Checkpoint;"
117-
run_sql "DROP DATABASE __TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint;"
116+
latest_log_db=$(run_sql "select table_schema from information_schema.tables where table_schema like '__TiDB_BR_Temporary_Log_Restore_Checkpoint%' order by table_schema desc limit 1;" | tail -n 1 | awk '{print $2}')
117+
latest_sst_db=$(run_sql "select table_schema from information_schema.tables where table_schema like '__TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint%' order by table_schema desc limit 1;" | tail -n 1 | awk '{print $2}')
118+
run_sql "DROP DATABASE IF EXISTS \`$latest_log_db\`;"
119+
run_sql "DROP DATABASE IF EXISTS \`$latest_sst_db\`;"
118120
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 || ( cat $res_file && exit 1 )
119121

120122
check_result

br/tests/br_restore_checkpoint/run.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ if [ $restore_fail -ne 1 ]; then
8080
fi
8181

8282
# check the snapshot restore has checkpoint data
83-
run_sql 'select count(*) from '"__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"'.`cpt_data`;'
83+
latest_db=$(run_sql "select table_schema from information_schema.tables where table_schema like '__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint%' order by table_schema desc limit 1;" | tail -n 1 | awk '{print $2}')
84+
run_sql "select count(*) from \`$latest_db\`.\`cpt_data\`;"
8485
check_contains "count(*): 1"
8586

8687
# PITR with checkpoint but failed in the log restore datakv stage

tests/realtikvtest/brietest/brie_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,21 @@ func TestCancel(t *testing.T) {
132132
}
133133
}
134134

135+
// cleanupRegistry drops the registry tables and database
136+
func cleanupRegistry(tk *testkit.TestKit) {
137+
registryDB := "__TiDB_BR_Temporary_Restore_Registration_DB"
138+
registryTable := "restore_registry"
139+
tk.MustExec(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", registryDB, registryTable))
140+
tk.MustExec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", registryDB))
141+
}
142+
135143
func TestExistedTables(t *testing.T) {
136144
tk := initTestKit(t)
145+
// Register cleanup function
146+
t.Cleanup(func() {
147+
cleanupRegistry(tk)
148+
})
149+
137150
tmp := makeTempDirForBackup(t)
138151
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
139152
executor.ResetGlobalBRIEQueueForTest()
@@ -212,6 +225,11 @@ func TestExistedTables(t *testing.T) {
212225
// full backup * -> incremental backup * -> restore full backup * -> restore incremental backup *
213226
func TestExistedTablesOfIncremental(t *testing.T) {
214227
tk := initTestKit(t)
228+
// Register cleanup function
229+
t.Cleanup(func() {
230+
cleanupRegistry(tk)
231+
})
232+
215233
tmp := makeTempDirForBackup(t)
216234
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
217235
executor.ResetGlobalBRIEQueueForTest()
@@ -260,6 +278,10 @@ func TestExistedTablesOfIncremental(t *testing.T) {
260278
// full backup * -> incremental backup * -> restore full backup `test` -> restore incremental backup `test`
261279
func TestExistedTablesOfIncremental_1(t *testing.T) {
262280
tk := initTestKit(t)
281+
t.Cleanup(func() {
282+
cleanupRegistry(tk)
283+
})
284+
263285
tmp := makeTempDirForBackup(t)
264286
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
265287
executor.ResetGlobalBRIEQueueForTest()
@@ -308,6 +330,11 @@ func TestExistedTablesOfIncremental_1(t *testing.T) {
308330
// full backup `test` -> incremental backup `test` -> restore full backup * -> restore incremental backup *
309331
func TestExistedTablesOfIncremental_2(t *testing.T) {
310332
tk := initTestKit(t)
333+
// Register cleanup function
334+
t.Cleanup(func() {
335+
cleanupRegistry(tk)
336+
})
337+
311338
tmp := makeTempDirForBackup(t)
312339
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
313340
executor.ResetGlobalBRIEQueueForTest()

0 commit comments

Comments
 (0)