Skip to content

ddl: support refresh meta ddl #60837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Executor interface {
AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) error
DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
// RefreshMeta can only be called by BR during the log restore phase.
RefreshMeta(ctx sessionctx.Context, args *model.RefreshMetaArgs) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down Expand Up @@ -7058,3 +7060,29 @@ func NewDDLReorgMeta(ctx sessionctx.Context) *model.DDLReorgMeta {
Version: model.CurrentReorgMetaVersion,
}
}

// RefreshMeta is a internal DDL job. In some cases, BR log restore will
// exchange table partition in meta kv directly, and table info in meta kv
// is not consistent with info schema. So when BR call AlterTableMode for new
// table will failure. RefreshMeta will reload schema diff to update info schema
// by schema ID and table ID.
func (e *executor) RefreshMeta(sctx sessionctx.Context, args *model.RefreshMetaArgs) error {
is := e.infoCache.GetLatest()
_, ok := is.SchemaByID(args.SchemaID)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(fmt.Sprintf("SchemaID: %v", args.SchemaID))
}

job := &model.Job{
Version: model.JobVersion2,
SchemaID: args.SchemaID,
TableID: args.TableID,
Type: model.ActionRefreshMeta,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}
sctx.SetValue(sessionctx.QueryString, "skip")
err := e.doDDLJob2(sctx, job, args)
return errors.Trace(err)
}
2 changes: 2 additions & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ func (w *worker) runOneJobStep(
ver, err = onDropCheckConstraint(jobCtx, job)
case model.ActionAlterCheckConstraint:
ver, err = w.onAlterCheckConstraint(jobCtx, job)
case model.ActionRefreshMeta:
ver, err = onRefreshMeta(jobCtx, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ func (d *Checker) AlterTableMode(ctx sessionctx.Context, args *model.AlterTableM
return d.realExecutor.AlterTableMode(ctx, args)
}

// RefreshMeta implements the DDL interface.
func (d *Checker) RefreshMeta(ctx sessionctx.Context, args *model.RefreshMetaArgs) error {
return d.realExecutor.RefreshMeta(ctx, args)
}

// CleanupTableLock implements the DDL interface.
func (d *Checker) CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error {
return d.realExecutor.CleanupTableLock(ctx, tables)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,3 +1197,8 @@ func (d *SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema
func (*SchemaTracker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.PolicyInfo, _ ddl.OnExist) error {
return nil
}

// RefreshMeta implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) RefreshMeta(_ sessionctx.Context, _ *model.RefreshMetaArgs) error {
return nil
}
22 changes: 22 additions & 0 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,3 +1699,25 @@ func onAlterNoCacheTable(jobCtx *jobContext, job *model.Job) (ver int64, err err
}
return ver, err
}

func onRefreshMeta(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
Copy link
Contributor

@Tristan1900 Tristan1900 May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add check here or at RefreshMeta to verify that the info schema doesn't have tableInfo with same table id or name as the job specified but without restoreMode? so to be safe that refresh doesn't override anything unexpected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can skip refresh when tableinfo is exist in info schema and the tablemode isn't restoreMode?

_, err = model.GetRefreshMetaArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if underlying table is removed by BR, can we update that to info schema as well. Right now it's assuming underlying TiKV also contain the specified table meta KV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if underlying table is removed by BR

now skip table validate in ddl

// update schema version
ver, err = updateSchemaVersion(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}

var tbInfo *model.TableInfo
metaMut := jobCtx.metaMut
tbInfo, err = GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, err
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
return ver, nil
}
4 changes: 4 additions & 0 deletions pkg/ddl/table_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
)
Expand Down Expand Up @@ -47,6 +48,9 @@ func onAlterTableMode(jobCtx *jobContext, job *model.Job) (ver int64, err error)
}
// update table info and schema version
ver, err = updateVersionAndTableInfo(jobCtx, job, tbInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
default:
job.State = model.JobStateCancelled
Expand Down
67 changes: 67 additions & 0 deletions pkg/ddl/table_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"context"
"strings"
"sync"
"testing"

Expand All @@ -24,9 +25,12 @@ import (
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -300,3 +304,66 @@ func TestTableModeConcurrent(t *testing.T) {
require.NotNil(t, failedErr3)
checkErrorCode(t, failedErr3, errno.ErrInvalidTableModeSet)
}

// TestTableModeWithRefreshMeta tests update table meta by txn(exchange partition ID),
// after RefreshMeta can modify TableMode.
func TestTableModeWithRefreshMeta(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)
de := domain.DDLExecutor()
tk := testkit.NewTestKit(t, store)
sctx := testkit.NewTestKit(t, store).Session()

tk.MustExec("use test")
tk.MustExec("create table nt(id int, c1 int)")
tk.MustExec("create table pt(id int, c1 int) partition by range (c1) (partition p10 values less than (10))")
tk.MustExec("insert into nt values(3, 3), (4, 4), (5, 5)")
tk.MustExec("insert into pt values(1, 1), (2, 2)")

dbInfo, ok := domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
require.NotNil(t, dbInfo)
ntInfo, ptInfo := getClonedTableInfoFromDomain(t, "test", "nt", domain), getClonedTableInfoFromDomain(t, "test", "pt", domain)
// change non-partition table ID to partition ID
partID := ptInfo.Partition.Definitions[0].ID
recreateTableWithPartitionID(t, &store, dbInfo.ID, ntInfo, ptInfo, "p10")
ntInfo = testutil.GetTableInfoByTxn(t, store, dbInfo.ID, ntInfo.ID)
require.Equal(t, partID, ntInfo.ID)
// set table mode failure before refresh meta
err := testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeImport)
require.ErrorContains(t, err, "doesn't exist")
testutil.RefreshMeta(sctx, t, de, dbInfo.ID, ntInfo.ID)
// set table mode success after refresh meta
err = testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeImport)
require.NoError(t, err)
tk.MustGetErrCode("select * from nt", errno.ErrProtectedTableMode)
err = testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeNormal)
require.NoError(t, err)
tk.MustExec("select * from nt")
}

// recreateTableWithPartitionID update table ID to partition ID and recreate table.
func recreateTableWithPartitionID(t *testing.T, store *kv.Storage, dbID int64, ntInfo, ptInfo *model.TableInfo, partName string) {
_, partDef, err := getPartitionDef(ptInfo, partName)
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, *store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := m.DropTableOrView(dbID, ntInfo.ID)
require.NoError(t, err)
ntInfo.ID = partDef.ID
err = m.CreateTableOrView(dbID, ntInfo)
require.NoError(t, err)
return nil
})
require.NoError(t, err)
}

func getPartitionDef(tblInfo *model.TableInfo, partName string) (index int, def *model.PartitionDefinition, _ error) {
defs := tblInfo.Partition.Definitions
for i := 0; i < len(defs); i++ {
if strings.EqualFold(defs[i].Name.L, strings.ToLower(partName)) {
return i, &(defs[i]), nil
}
}
return index, nil, table.ErrUnknownPartition.GenWithStackByArgs(partName, tblInfo.Name.O)
}
30 changes: 30 additions & 0 deletions pkg/ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down Expand Up @@ -836,3 +837,32 @@ func TestIssue59238(t *testing.T) {
tk.MustExec("alter table t exchange partition p1 with table t1")
require.True(t, tk.MustQuery("select distinct create_time from information_schema.partitions where table_name = 't'").Equal(testkit.Rows(rs)))
}

func TestRefreshMetaBasic(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)
de := domain.DDLExecutor()
tk := testkit.NewTestKit(t, store)
sctx := testkit.NewTestKit(t, store).Session()

// get t1 table info
tk.MustExec("use test")
tk.MustExec("create table t1(id int)")
dbInfo, ok := domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
t1TableInfo := getClonedTableInfoFromDomain(t, "test", "t1", domain)
// update t1 table name to t2 by txn
t1TableInfo.Name = ast.NewCIStr("t2")
updateTableMeta(t, store, dbInfo.ID, t1TableInfo)
t2TableInfo := testutil.GetTableInfoByTxn(t, store, dbInfo.ID, t1TableInfo.ID)
require.Equal(t, t1TableInfo, t2TableInfo)
// validate infoschema doesn't conatain t2 table info
_, err := domain.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
require.ErrorContains(t, err, "Table 'test.t2' doesn't exist")
// refresh meta, validate infoschema store table t2 and schema version increase 1
oldSchemaVer := getSchemaVer(t, sctx)
testutil.RefreshMeta(sctx, t, de, dbInfo.ID, t1TableInfo.ID)
newSchemaVer := getSchemaVer(t, sctx)
require.Equal(t, oldSchemaVer+1, newSchemaVer)
_, err = domain.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
require.NoError(t, err)
}
33 changes: 33 additions & 0 deletions pkg/ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,36 @@ func SetTableMode(

return err
}

// GetTableInfoByTxn get table info by transaction.
func GetTableInfoByTxn(t *testing.T, store kv.Storage, dbID int64, tableID int64) *model.TableInfo {
var (
tableInfo *model.TableInfo
err error
)
err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
_, err = m.GetDatabase(dbID)
require.NoError(t, err)
tableInfo, err = m.GetTable(dbID, tableID)
require.NoError(t, err)
require.NotNil(t, tableInfo)
return nil
})
return tableInfo
}

// RefreshMeta sets the table mode of a table in the store.
func RefreshMeta(
ctx sessionctx.Context,
t *testing.T,
de ddl.Executor,
dbID, tableID int64,
) {
args := &model.RefreshMetaArgs{
SchemaID: dbID,
TableID: tableID,
}
err := de.RefreshMeta(ctx, args)
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions pkg/meta/model/bdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var BDRActionMap = map[DDLBDRType][]ActionType{
ActionAddColumnarIndex,
ActionModifyEngineAttribute,
ActionAlterTableMode,
ActionRefreshMeta,
},
UnmanagementDDL: {
ActionCreatePlacementPolicy,
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const (
ActionAddColumnarIndex ActionType = 73
ActionModifyEngineAttribute ActionType = 74
ActionAlterTableMode ActionType = 75
ActionRefreshMeta ActionType = 76
)

// ActionMap is the map of DDL ActionType to string.
Expand Down Expand Up @@ -188,6 +189,7 @@ var ActionMap = map[ActionType]string{
ActionAddColumnarIndex: "add columnar index",
ActionModifyEngineAttribute: "modify engine attribute",
ActionAlterTableMode: "alter table mode",
ActionRefreshMeta: "refresh meta",

// `ActionAlterTableAlterPartition` is removed and will never be used.
// Just left a tombstone here for compatibility.
Expand Down
19 changes: 19 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,3 +1785,22 @@ func GetFinishedModifyColumnArgs(job *Job) (*ModifyColumnArgs, error) {
}
return getOrDecodeArgsV2[*ModifyColumnArgs](job)
}

// RefreshMetaArgs is the argument for RefreshMeta.
type RefreshMetaArgs struct {
SchemaID int64 `json:"schema_id,omitempty"`
TableID int64 `json:"table_id,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be faster if we can specify batch table IDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but now for AlterTableMode() only support for single table ID. How about extend both interfaces to batch as improvement later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get

}

func (a *RefreshMetaArgs) getArgsV1(*Job) []any {
return []any{a}
}

func (a *RefreshMetaArgs) decodeV1(job *Job) error {
return errors.Trace(job.decodeArgs(a))
}

// GetRefreshMetaArgs get the refresh meta argument.
func GetRefreshMetaArgs(job *Job) (*RefreshMetaArgs, error) {
return getOrDecodeArgs[*RefreshMetaArgs](&RefreshMetaArgs{}, job)
}