Skip to content

Commit 9d64c1d

Browse files
committed
address comments
Signed-off-by: Wenqi Mou <[email protected]>
1 parent 0381199 commit 9d64c1d

File tree

12 files changed

+440
-137
lines changed

12 files changed

+440
-137
lines changed

br/pkg/registry/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ go_library(
1212
"//br/pkg/utils",
1313
"//pkg/domain",
1414
"//pkg/kv",
15+
"//pkg/util/sqlexec",
1516
"//pkg/util/table-filter",
16-
"@com_github_google_uuid//:uuid",
1717
"@com_github_pingcap_errors//:errors",
1818
"@com_github_pingcap_log//:log",
1919
"@org_uber_go_zap//:zap",

br/pkg/registry/registration.go

Lines changed: 111 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"fmt"
2020
"strings"
2121

22-
"github.com/google/uuid"
2322
"github.com/pingcap/errors"
2423
"github.com/pingcap/log"
2524
berrors "github.com/pingcap/tidb/br/pkg/errors"
@@ -28,6 +27,7 @@ import (
2827
"github.com/pingcap/tidb/br/pkg/utils"
2928
"github.com/pingcap/tidb/pkg/domain"
3029
"github.com/pingcap/tidb/pkg/kv"
30+
"github.com/pingcap/tidb/pkg/util/sqlexec"
3131
filter "github.com/pingcap/tidb/pkg/util/table-filter"
3232
"go.uber.org/zap"
3333
)
@@ -38,33 +38,31 @@ const (
3838

3939
// createRegistrationTableSQL is the SQL to create the registration table
4040
// we use unique index to prevent race condition that two threads inserting tasks with same parameters.
41-
// we use uuid to verify the task is indeed created by the current thread, since duplicate key errors thrown by
42-
// unique index checking might be silent and not returned to caller.
4341
// we initialize auto increment id to be 1 to make sure default value 0 is not used when insertion failed.
4442
createRegistrationTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s (
4543
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
4644
filter_strings TEXT NOT NULL,
45+
filter_hash VARCHAR(64) NOT NULL,
4746
start_ts BIGINT UNSIGNED NOT NULL,
4847
restored_ts BIGINT UNSIGNED NOT NULL,
4948
upstream_cluster_id BIGINT UNSIGNED,
5049
with_sys_table BOOLEAN NOT NULL DEFAULT TRUE,
5150
status VARCHAR(20) NOT NULL DEFAULT 'running',
5251
cmd TEXT,
53-
uuid VARCHAR(64),
5452
UNIQUE KEY unique_registration_params (
55-
filter_strings(255),
53+
filter_hash,
5654
start_ts,
5755
restored_ts,
5856
upstream_cluster_id,
5957
with_sys_table,
60-
cmd(255)
58+
cmd(256)
6159
)
6260
) AUTO_INCREMENT = 1`
6361

6462
// lookupRegistrationSQLTemplate is the SQL template for looking up a registration by its parameters
6563
lookupRegistrationSQLTemplate = `
66-
SELECT id, uuid FROM %s.%s
67-
WHERE filter_strings = %%?
64+
SELECT id, status FROM %s.%s
65+
WHERE filter_hash = MD5(%%?)
6866
AND start_ts = %%?
6967
AND restored_ts = %%?
7068
AND upstream_cluster_id = %%?
@@ -78,45 +76,28 @@ const (
7876
SET status = %%?
7977
WHERE id = %%? AND status = %%?`
8078

79+
// resumeTaskByIDSQLTemplate is the SQL template for resuming a paused task by its ID
80+
resumeTaskByIDSQLTemplate = `
81+
UPDATE %s.%s
82+
SET status = 'running'
83+
WHERE id = %%?`
84+
8185
// deleteRegistrationSQLTemplate is the SQL template for deleting a registration
8286
deleteRegistrationSQLTemplate = `DELETE FROM %s.%s WHERE id = %%?`
8387

8488
// selectRegistrationsByMaxIDSQLTemplate is the SQL template for selecting registrations by max ID
8589
selectRegistrationsByMaxIDSQLTemplate = `
8690
SELECT
87-
id, filter_strings, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd
91+
id, filter_strings, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd, filter_hash
8892
FROM %s.%s
8993
WHERE id < %%?
9094
ORDER BY id ASC`
9195

92-
// resumePausedTaskSQLTemplate is the SQL template for resuming a paused task
93-
resumePausedTaskSQLTemplate = `
94-
UPDATE %s.%s
95-
SET status = 'running', uuid = %%?
96-
WHERE filter_strings = %%?
97-
AND start_ts = %%?
98-
AND restored_ts = %%?
99-
AND upstream_cluster_id = %%?
100-
AND with_sys_table = %%?
101-
AND cmd = %%?
102-
AND status = 'paused'`
103-
104-
// getRunningTaskIDSQLTemplate is the SQL template for getting restore ID of a running task
105-
getRunningTaskIDSQLTemplate = `
106-
SELECT id FROM %s.%s
107-
WHERE filter_strings = %%?
108-
AND start_ts = %%?
109-
AND restored_ts = %%?
110-
AND upstream_cluster_id = %%?
111-
AND with_sys_table = %%?
112-
AND cmd = %%?
113-
AND status = 'running'`
114-
11596
// createNewTaskSQLTemplate is the SQL template for creating a new task
11697
createNewTaskSQLTemplate = `
11798
INSERT INTO %s.%s
118-
(filter_strings, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd, uuid)
119-
VALUES (%%?, %%?, %%?, %%?, %%?, 'running', %%?, %%?)`
99+
(filter_strings, filter_hash, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd)
100+
VALUES (%%?, MD5(%%?), %%?, %%?, %%?, %%?, 'running', %%?)`
120101
)
121102

122103
// TaskStatus represents the current state of a restore task
@@ -203,136 +184,152 @@ func (r *Registry) ResumeOrCreateRegistration(ctx context.Context, info Registra
203184
return 0, errors.Trace(err)
204185
}
205186

206-
operationUUID := uuid.New().String()
207-
208187
filterStrings := strings.Join(info.FilterStrings, ",")
209188

210-
log.Info("checking for task to resume",
189+
log.Info("attempting to resume or create registration",
211190
zap.String("filter_strings", filterStrings),
212191
zap.Uint64("start_ts", info.StartTS),
213192
zap.Uint64("restored_ts", info.RestoredTS),
214193
zap.Uint64("upstream_cluster_id", info.UpstreamClusterID),
215194
zap.Bool("with_sys_table", info.WithSysTable),
216-
zap.String("cmd", info.Cmd),
217-
zap.String("uuid", operationUUID))
195+
zap.String("cmd", info.Cmd))
218196

219-
// first try to update if exists and paused
220-
updateSQL := fmt.Sprintf(resumePausedTaskSQLTemplate, RegistrationDBName, RegistrationTableName)
197+
execCtx := r.se.GetSessionCtx().GetRestrictedSQLExecutor()
221198

222-
if err := r.se.ExecuteInternal(ctx, updateSQL,
223-
operationUUID, filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID,
224-
info.WithSysTable, info.Cmd); err != nil {
225-
return 0, errors.Annotatef(err, "failed to update existing registration")
199+
// Set transaction mode to pessimistic
200+
_, _, err := execCtx.ExecRestrictedSQL(
201+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
202+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
203+
"SET @@tidb_txn_mode = 'pessimistic'")
204+
if err != nil {
205+
return 0, errors.Annotate(err, "failed to set pessimistic mode")
226206
}
227207

228-
// check if a task with our parameters and in running state exists
229-
execCtx := r.se.GetSessionCtx().GetRestrictedSQLExecutor()
208+
// Begin transaction
209+
_, _, err = execCtx.ExecRestrictedSQL(
210+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
211+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
212+
"BEGIN")
213+
if err != nil {
214+
return 0, errors.Annotate(err, "failed to begin transaction")
215+
}
216+
217+
defer func() {
218+
_, _, _ = execCtx.ExecRestrictedSQL(
219+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
220+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
221+
"ROLLBACK")
222+
}()
223+
224+
// First look for an existing task with the same parameters
225+
lookupSQL := fmt.Sprintf(lookupRegistrationSQLTemplate, RegistrationDBName, RegistrationTableName)
230226
rows, _, err := execCtx.ExecRestrictedSQL(
231227
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
232-
nil,
233-
fmt.Sprintf(lookupRegistrationSQLTemplate, RegistrationDBName, RegistrationTableName),
228+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
229+
lookupSQL,
234230
filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd)
235231
if err != nil {
236-
return 0, errors.Annotatef(err, "failed to look up task")
232+
return 0, errors.Annotate(err, "failed to look up existing task")
237233
}
238234

239-
// if we found a task, check if it was resumed by us
235+
// If task found, check its status
240236
if len(rows) > 0 {
241237
taskID := rows[0].GetUint64(0)
242-
foundUUID := rows[0].GetString(1)
238+
status := rows[0].GetString(1)
243239

244240
if taskID == 0 {
245241
return 0, errors.New("invalid task ID: got 0 from lookup")
246242
}
247243

248-
// check if this task has our UUID
249-
if foundUUID == operationUUID {
250-
log.Info("successfully resumed existing registration by this process",
251-
zap.Uint64("restore_id", taskID),
252-
zap.String("uuid", operationUUID),
253-
zap.Strings("filters", info.FilterStrings))
254-
return taskID, nil
255-
}
256-
// task exists but was either created by another process or has been running
257-
// check if it's in running state
258-
runningRows, _, err := execCtx.ExecRestrictedSQL(
259-
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
260-
nil,
261-
fmt.Sprintf(getRunningTaskIDSQLTemplate, RegistrationDBName, RegistrationTableName),
262-
filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd)
263-
if err != nil {
264-
return 0, errors.Annotatef(err, "failed to check for running task")
265-
}
266-
267-
if len(runningRows) > 0 {
268-
taskID := runningRows[0].GetUint64(0)
244+
// If task exists and is running, return error
245+
if status == string(TaskStatusRunning) {
269246
log.Warn("task already exists and is running",
270247
zap.Uint64("restore_id", taskID))
271248
return 0, errors.Annotatef(berrors.ErrInvalidArgument,
272249
"task with ID %d already exists and is running", taskID)
273250
}
274-
// Task exists but is not running - unexpected state
275-
log.Warn("task exists but is in an unexpected state",
251+
252+
// Strictly check for paused status
253+
if status == string(TaskStatusPaused) {
254+
updateSQL := fmt.Sprintf(resumeTaskByIDSQLTemplate, RegistrationDBName, RegistrationTableName)
255+
256+
_, _, err = execCtx.ExecRestrictedSQL(
257+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
258+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
259+
updateSQL,
260+
taskID)
261+
if err != nil {
262+
return 0, errors.Annotate(err, "failed to resume paused task")
263+
}
264+
265+
_, _, err = execCtx.ExecRestrictedSQL(
266+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
267+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
268+
"COMMIT")
269+
if err != nil {
270+
return 0, errors.Annotate(err, "failed to commit transaction")
271+
}
272+
273+
log.Info("successfully resumed existing registration",
274+
zap.Uint64("restore_id", taskID),
275+
zap.Strings("filters", info.FilterStrings))
276+
return taskID, nil
277+
}
278+
279+
// Task exists but is not running or paused - this is an unexpected state
280+
log.Warn("task exists but in unexpected state",
276281
zap.Uint64("restore_id", taskID),
277-
zap.String("uuid", foundUUID))
278-
return 0, errors.New("task exists but is in an unexpected state")
282+
zap.String("status", status))
283+
return 0, errors.Annotatef(berrors.ErrInvalidArgument,
284+
"task with ID %d exists but is in unexpected state: %s", taskID, status)
279285
}
280286

281-
// no existing task found, create a new one
287+
// No existing task found, create a new one
282288
insertSQL := fmt.Sprintf(createNewTaskSQLTemplate, RegistrationDBName, RegistrationTableName)
283-
284-
log.Info("attempting to create new registration",
285-
zap.String("filter_strings", filterStrings),
286-
zap.Uint64("start_ts", info.StartTS),
287-
zap.Uint64("restored_ts", info.RestoredTS),
288-
zap.Uint64("upstream_cluster_id", info.UpstreamClusterID),
289-
zap.Bool("with_sys_table", info.WithSysTable),
290-
zap.String("cmd", info.Cmd),
291-
zap.String("uuid", operationUUID))
292-
293-
if err := r.se.ExecuteInternal(ctx, insertSQL,
294-
filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable,
295-
info.Cmd, operationUUID); err != nil {
296-
return 0, errors.Annotatef(err, "failed to create new registration")
289+
_, _, err = execCtx.ExecRestrictedSQL(
290+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
291+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
292+
insertSQL,
293+
filterStrings, filterStrings, info.StartTS, info.RestoredTS,
294+
info.UpstreamClusterID, info.WithSysTable, info.Cmd)
295+
if err != nil {
296+
return 0, errors.Annotate(err, "failed to create new registration")
297297
}
298298

299-
// check if a row with our parameters exists
299+
// Get the ID of the task we just created
300+
lookupSQL = fmt.Sprintf(`
301+
SELECT LAST_INSERT_ID()`)
302+
300303
rows, _, err = execCtx.ExecRestrictedSQL(
301304
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
302-
nil,
303-
fmt.Sprintf(lookupRegistrationSQLTemplate, RegistrationDBName, RegistrationTableName),
304-
filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd)
305+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
306+
lookupSQL)
305307
if err != nil {
306-
return 0, errors.Annotatef(err, "failed to look up inserted task")
308+
return 0, errors.Annotate(err, "failed to get ID of newly created task")
307309
}
308310

309311
if len(rows) == 0 {
310-
// this is really unexpected - record doesn't exist at all
311-
log.Error("failed to find task after insertion - record doesn't exist")
312-
return 0, errors.New("failed to find task after insertion")
312+
return 0, errors.New("failed to get LAST_INSERT_ID()")
313313
}
314314

315315
taskID := rows[0].GetUint64(0)
316-
foundUUID := rows[0].GetString(1)
317316

318317
if taskID == 0 {
319-
return 0, errors.New("invalid task ID: got 0 from lookup")
318+
return 0, errors.New("invalid task ID: got 0 from LAST_INSERT_ID()")
320319
}
321320

322-
// check if this task was created by us or another process
323-
if foundUUID != operationUUID {
324-
log.Info("task was created by another process",
325-
zap.Uint64("restore_id", taskID),
326-
zap.String("our_uuid", operationUUID),
327-
zap.String("found_uuid", foundUUID),
328-
zap.Strings("filters", info.FilterStrings))
329-
} else {
330-
log.Info("successfully created new registration by this process",
331-
zap.Uint64("restore_id", taskID),
332-
zap.String("uuid", operationUUID),
333-
zap.Strings("filters", info.FilterStrings))
321+
_, _, err = execCtx.ExecRestrictedSQL(
322+
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
323+
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
324+
"COMMIT")
325+
if err != nil {
326+
return 0, errors.Annotate(err, "failed to commit transaction")
334327
}
335328

329+
log.Info("successfully created new registration",
330+
zap.Uint64("restore_id", taskID),
331+
zap.Strings("filters", info.FilterStrings))
332+
336333
return taskID, nil
337334
}
338335

br/pkg/restore/internal/prealloc_db/db_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,12 @@ func cloneTableInfos(
234234
var ids *prealloctableid.PreallocIDs
235235
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
236236
tableInfos = make([]*metautil.Table, 0, len(originTableInfos))
237+
238+
// Use standard transaction with pessimistic mode enabled
237239
err := kv.RunInNewTxn(ctx, dom.Store(), true, func(_ context.Context, txn kv.Transaction) error {
240+
// Enable pessimistic mode
241+
txn.SetOption(kv.Pessimistic, true)
242+
238243
allocater := meta.NewMutator(txn)
239244
id, e := allocater.GetGlobalID()
240245
if e != nil {

br/pkg/restore/internal/prealloc_table_id/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ go_library(
88
deps = [
99
"//br/pkg/metautil",
1010
"//pkg/meta/model",
11+
"@com_github_pingcap_log//:log",
1112
"@com_github_pkg_errors//:errors",
13+
"@org_uber_go_zap//:zap",
1214
],
1315
)
1416

0 commit comments

Comments
 (0)