diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 02a3021569160..79deddff325dd 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -193,11 +193,11 @@ func NewLocalMPPCoordinator(ctx context.Context, sctx sessionctx.Context, is inf } func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment, allTiFlashZoneInfo map[string]string) error { - dagReq, err := builder.ConstructDAGReq(c.sessionCtx, []base.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash) + dagReq, err := builder.ConstructDAGReq(c.sessionCtx, []base.PhysicalPlan{pf.Sink}, kv.TiFlash) if err != nil { return errors.Trace(err) } - for i := range pf.ExchangeSender.Schema().Columns { + for i := range pf.Sink.Schema().Columns { dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) } if !pf.IsRoot { @@ -207,7 +207,7 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment, all } zoneHelper := taskZoneInfoHelper{} zoneHelper.init(allTiFlashZoneInfo) - for _, mppTask := range pf.ExchangeSender.Tasks { + for _, mppTask := range pf.Sink.GetSelfTasks() { if mppTask.PartitionTableIDs != nil { err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) } else if !mppTask.TiFlashStaticPrune { @@ -237,9 +237,9 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment, all logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID), zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()), - zap.String("plan", plannercore.ToString(pf.ExchangeSender)), + zap.String("plan", plannercore.ToString(pf.Sink)), zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()), - zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()), + zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()), zap.Uint64("GatherID", c.gatherID), zap.String("resource_group", rgName), ) diff --git a/pkg/planner/core/fragment.go b/pkg/planner/core/fragment.go index fad20178b0759..b557ab20f1b9b 100644 --- a/pkg/planner/core/fragment.go +++ b/pkg/planner/core/fragment.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/logutil" @@ -52,7 +53,7 @@ type Fragment struct { CTEReaders []*PhysicalCTE // The receivers for CTE storage/producer. // following fields are filled after scheduling. - ExchangeSender *PhysicalExchangeSender // data exporter + Sink MPPSink // data exporter IsRoot bool @@ -79,8 +80,8 @@ func (f *Fragment) MemoryUsage() (sum int64) { if f.TableScan != nil { sum += f.TableScan.MemoryUsage() } - if f.ExchangeSender != nil { - sum += f.ExchangeSender.MemoryUsage() + if f.Sink != nil { + sum += f.Sink.MemoryUsage() } for _, receiver := range f.ExchangeReceivers { @@ -89,6 +90,17 @@ func (f *Fragment) MemoryUsage() (sum int64) { return } +// MPPSink is the operator to send data to its parent fragment. +// e.g. ExchangeSender, etc. +type MPPSink interface { + base.PhysicalPlan + GetCompressionMode() vardef.ExchangeCompressionMode + GetSelfTasks() []*kv.MPPTask + SetSelfTasks(tasks []*kv.MPPTask) + SetTargetTasks(tasks []*kv.MPPTask) + AppendTargetTasks(tasks []*kv.MPPTask) +} + type tasksAndFrags struct { tasks []*kv.MPPTask frags []*Fragment @@ -164,7 +176,7 @@ func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragm return nil, errors.Trace(err) } for _, frag := range frags { - frag.ExchangeSender.TargetTasks = []*kv.MPPTask{tidbTask} + frag.Sink.SetTargetTasks([]*kv.MPPTask{tidbTask}) frag.IsRoot = true } return e.frags, nil @@ -340,7 +352,7 @@ func (e *mppTaskGenerator) buildFragments(s *PhysicalExchangeSender) ([]*Fragmen } fragments := make([]*Fragment, 0, len(forest)) for _, s := range forest { - f := &Fragment{ExchangeSender: s} + f := &Fragment{Sink: s} err = f.init(s) if err != nil { return nil, errors.Trace(err) @@ -417,14 +429,14 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv } for _, r := range f.ExchangeReceivers { for _, frag := range r.frags { - frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...) + frag.Sink.AppendTargetTasks(tasks) } } for _, cteR := range f.CTEReaders { e.addReaderTasksForCTEStorage(cteR.CTE.IDForStorage, tasks...) } - f.ExchangeSender.Tasks = tasks - f.flipCTEReader(f.ExchangeSender) + f.Sink.SetSelfTasks(tasks) + f.flipCTEReader(f.Sink) return tasks, nil } @@ -487,7 +499,7 @@ func (e *mppTaskGenerator) generateTasksForCTEReader(cteReader *PhysicalCTE) (er func (e *mppTaskGenerator) addReaderTasksForCTEStorage(storageID int, tasks ...*kv.MPPTask) { group := e.CTEGroups[storageID] for _, frag := range group.StorageFragments { - frag.ExchangeSender.TargetCTEReaderTasks = append(frag.ExchangeSender.TargetCTEReaderTasks, tasks) + frag.Sink.AppendTargetTasks(tasks) } } diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index c9381160570bc..d2606a9c27459 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -1969,6 +1969,31 @@ func (p *PhysicalExchangeSender) MemoryUsage() (sum int64) { return } +// GetCompressionMode returns the compression mode of this exchange sender. +func (p *PhysicalExchangeSender) GetCompressionMode() vardef.ExchangeCompressionMode { + return p.CompressionMode +} + +// GetSelfTasks returns mpp tasks for current PhysicalExchangeSender. +func (p *PhysicalExchangeSender) GetSelfTasks() []*kv.MPPTask { + return p.Tasks +} + +// SetSelfTasks sets mpp tasks for current PhysicalExchangeSender. +func (p *PhysicalExchangeSender) SetSelfTasks(tasks []*kv.MPPTask) { + p.Tasks = tasks +} + +// SetTargetTasks sets mpp tasks for current PhysicalExchangeSender. +func (p *PhysicalExchangeSender) SetTargetTasks(tasks []*kv.MPPTask) { + p.TargetTasks = tasks +} + +// AppendTargetTasks appends mpp tasks for current PhysicalExchangeSender. +func (p *PhysicalExchangeSender) AppendTargetTasks(tasks []*kv.MPPTask) { + p.TargetTasks = append(p.TargetTasks, tasks...) +} + // Clone implements op.PhysicalPlan interface. func (p *PhysicalMergeJoin) Clone(newCtx base.PlanContext) (base.PhysicalPlan, error) { cloned := new(PhysicalMergeJoin)