Skip to content

Commit c0aea84

Browse files
authored
planner: update logging for SyncWaitStatsLoad and SubLoadWorker functions (#59978)
ref #59939
1 parent ad7df79 commit c0aea84

File tree

10 files changed

+70
-53
lines changed

10 files changed

+70
-53
lines changed

pkg/planner/core/rule_collect_plan_stats.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,11 @@ func SyncWaitStatsLoad(plan base.LogicalPlan) error {
279279
if err != nil {
280280
stmtCtx.IsSyncStatsFailed = true
281281
if vardef.StatsLoadPseudoTimeout.Load() {
282-
logutil.BgLogger().Warn("SyncWaitStatsLoad failed", zap.Error(err))
282+
logutil.ErrVerboseLogger().Warn("SyncWaitStatsLoad failed", zap.Error(err))
283283
stmtCtx.AppendWarning(err)
284284
return nil
285285
}
286-
logutil.BgLogger().Error("SyncWaitStatsLoad failed", zap.Error(err))
286+
logutil.ErrVerboseLogger().Error("SyncWaitStatsLoad failed", zap.Error(err))
287287
return err
288288
}
289289
return nil

pkg/statistics/handle/autoanalyze/priorityqueue/job.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func isValidToAnalyze(
117117
lastFailedAnalysisDuration, err :=
118118
GetLastFailedAnalysisDuration(sctx, schema, table, partitionNames...)
119119
if err != nil {
120-
logutil.SingletonStatsSamplerLogger().Warn(
120+
logutil.StatsSampleLogger().Warn(
121121
"Fail to get last failed analysis duration",
122122
zap.String("schema", schema),
123123
zap.String("table", table),
@@ -130,7 +130,7 @@ func isValidToAnalyze(
130130
averageAnalysisDuration, err :=
131131
GetAverageAnalysisDuration(sctx, schema, table, partitionNames...)
132132
if err != nil {
133-
logutil.SingletonStatsSamplerLogger().Warn(
133+
logutil.StatsSampleLogger().Warn(
134134
"Fail to get average analysis duration",
135135
zap.String("schema", schema),
136136
zap.String("table", table),
@@ -143,7 +143,7 @@ func isValidToAnalyze(
143143
// Last analysis just failed, we should not analyze it again.
144144
if lastFailedAnalysisDuration == justFailed {
145145
// The last analysis failed, we should not analyze it again.
146-
logutil.SingletonStatsSamplerLogger().Info(
146+
logutil.StatsSampleLogger().Info(
147147
"Skip analysis because the last analysis just failed",
148148
zap.String("schema", schema),
149149
zap.String("table", table),
@@ -156,7 +156,7 @@ func isValidToAnalyze(
156156
// Skip this table to avoid too much failed analysis.
157157
onlyFailedAnalysis := lastFailedAnalysisDuration != NoRecord && averageAnalysisDuration == NoRecord
158158
if onlyFailedAnalysis && lastFailedAnalysisDuration < defaultFailedAnalysisWaitTime {
159-
logutil.SingletonStatsSamplerLogger().Info(
159+
logutil.StatsSampleLogger().Info(
160160
fmt.Sprintf("Skip analysis because the last failed analysis duration is less than %v", defaultFailedAnalysisWaitTime),
161161
zap.String("schema", schema),
162162
zap.String("table", table),
@@ -170,7 +170,7 @@ func isValidToAnalyze(
170170
meetSkipCondition := lastFailedAnalysisDuration != NoRecord &&
171171
lastFailedAnalysisDuration < 2*averageAnalysisDuration
172172
if meetSkipCondition {
173-
logutil.SingletonStatsSamplerLogger().Info(
173+
logutil.StatsSampleLogger().Info(
174174
"Skip analysis because the last failed analysis duration is less than 2 times the average analysis duration",
175175
zap.String("schema", schema),
176176
zap.String("table", table),

pkg/statistics/handle/autoanalyze/priorityqueue/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ func (pq *AnalysisPriorityQueue) pushWithoutLock(job AnalysisJob) error {
758758
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
759759
weight := pq.calculator.CalculateWeight(job)
760760
if weight <= 0 {
761-
statslogutil.SingletonStatsSamplerLogger().Warn(
761+
statslogutil.StatsSampleLogger().Warn(
762762
"Table gets a negative weight",
763763
zap.Float64("weight", weight),
764764
zap.Stringer("job", job),

pkg/statistics/handle/autoanalyze/refresher/refresher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
129129
currentRunningJobs := r.worker.GetRunningJobs()
130130
remainConcurrency := maxConcurrency - len(currentRunningJobs)
131131
if remainConcurrency <= 0 {
132-
statslogutil.SingletonStatsSamplerLogger().Info("No concurrency available")
132+
statslogutil.StatsSampleLogger().Info("No concurrency available")
133133
return false
134134
}
135135

@@ -151,7 +151,7 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
151151
continue
152152
}
153153
if valid, failReason := job.ValidateAndPrepare(sctx); !valid {
154-
statslogutil.SingletonStatsSamplerLogger().Info(
154+
statslogutil.StatsSampleLogger().Info(
155155
"Table not ready for analysis",
156156
zap.String("reason", failReason),
157157
zap.Stringer("job", job),
@@ -190,7 +190,7 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
190190
return true
191191
}
192192

193-
statslogutil.SingletonStatsSamplerLogger().Info("No tables to analyze")
193+
statslogutil.StatsSampleLogger().Info("No tables to analyze")
194194
return false
195195
}
196196

pkg/statistics/handle/globalstats/global_stats.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func MergePartitionStats2GlobalStats(
103103
histIDs []int64,
104104
) (globalStats *GlobalStats, err error) {
105105
if sc.GetSessionVars().EnableAsyncMergeGlobalStats {
106-
statslogutil.SingletonStatsSamplerLogger().Info("use async merge global stats",
106+
statslogutil.StatsSampleLogger().Info("use async merge global stats",
107107
zap.Int64("tableID", globalTableInfo.ID),
108108
zap.String("table", globalTableInfo.Name.L),
109109
)
@@ -117,7 +117,7 @@ func MergePartitionStats2GlobalStats(
117117
}
118118
return worker.Result(), nil
119119
}
120-
statslogutil.SingletonStatsSamplerLogger().Info("use blocking merge global stats",
120+
statslogutil.StatsSampleLogger().Info("use blocking merge global stats",
121121
zap.Int64("tableID", globalTableInfo.ID),
122122
zap.String("table", globalTableInfo.Name.L),
123123
)

pkg/statistics/handle/logutil/logutil.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,21 @@ func StatsLogger() *zap.Logger {
2929

3030
var (
3131
sampleLoggerFactory = logutil.SampleLoggerFactory(5*time.Minute, 1, zap.String(logutil.LogFieldCategory, "stats"))
32+
// sampleErrVerboseLoggerFactory creates a logger for error messages with a higher
33+
// sampling rate (once per 10 minutes) since error logs tend to be more verbose.
34+
sampleErrVerboseLoggerFactory = logutil.SampleErrVerboseLoggerFactory(10*time.Minute, 1, zap.String(logutil.LogFieldCategory, "stats"))
3235
)
3336

34-
// SingletonStatsSamplerLogger with category "stats" is used to log statistic related messages.
37+
// StatsSampleLogger with category "stats" is used to log statistic related messages.
3538
// It is used to sample the log to avoid too many logs.
3639
// Do not use it to log the message that is not related to statistics.
37-
func SingletonStatsSamplerLogger() *zap.Logger {
40+
func StatsSampleLogger() *zap.Logger {
3841
return sampleLoggerFactory()
3942
}
43+
44+
// StatsErrVerboseSampleLogger with category "stats" is used to log statistics-related messages with verbose error details.
45+
// It is used to sample the log to avoid too many logs.
46+
// Do not use it to log the message that is not related to statistics.
47+
func StatsErrVerboseSampleLogger() *zap.Logger {
48+
return sampleErrVerboseLoggerFactory()
49+
}

pkg/statistics/handle/syncload/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ go_library(
1717
"//pkg/sessionctx/vardef",
1818
"//pkg/sessionctx/variable",
1919
"//pkg/statistics",
20+
"//pkg/statistics/handle/logutil",
2021
"//pkg/statistics/handle/storage",
2122
"//pkg/statistics/handle/types",
2223
"//pkg/types",
2324
"//pkg/util",
2425
"//pkg/util/intest",
25-
"//pkg/util/logutil",
2626
"@com_github_pingcap_errors//:errors",
2727
"@com_github_pingcap_failpoint//:failpoint",
2828
"@org_golang_x_sync//singleflight",

pkg/statistics/handle/syncload/stats_syncload.go

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ import (
3434
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
3535
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3636
"github.com/pingcap/tidb/pkg/statistics"
37+
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
3738
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
3839
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
3940
"github.com/pingcap/tidb/pkg/types"
4041
"github.com/pingcap/tidb/pkg/util"
4142
"github.com/pingcap/tidb/pkg/util/intest"
42-
"github.com/pingcap/tidb/pkg/util/logutil"
4343
"go.uber.org/zap"
4444
"golang.org/x/sync/singleflight"
4545
)
@@ -165,7 +165,7 @@ func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
165165
var errorMsgs []string
166166
defer func() {
167167
if len(errorMsgs) > 0 {
168-
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
168+
statslogutil.StatsLogger().Warn("SyncWaitStatsLoad meets error",
169169
zap.Strings("errors", errorMsgs))
170170
}
171171
sc.StatsLoad.NeededItems = nil
@@ -249,7 +249,7 @@ var errExit = errors.New("Stop loading since domain is closed")
249249
func (s *statsSyncLoad) SubLoadWorker(exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
250250
defer func() {
251251
exitWg.Done()
252-
logutil.BgLogger().Info("SubLoadWorker exited.")
252+
statslogutil.StatsLogger().Info("SubLoadWorker: exited.")
253253
}()
254254
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
255255
var lastTask *statstypes.NeededItemTask
@@ -259,8 +259,21 @@ func (s *statsSyncLoad) SubLoadWorker(exit chan struct{}, exitWg *util.WaitGroup
259259
if err != nil {
260260
switch err {
261261
case errExit:
262+
statslogutil.StatsLogger().Info("SubLoadWorker: exits now because the domain is closed.")
262263
return
263264
default:
265+
const msg = "SubLoadWorker: failed to handle one task"
266+
if task != nil {
267+
statslogutil.StatsErrVerboseSampleLogger().Warn(msg,
268+
zap.Error(err),
269+
zap.String("task", task.Item.Key()),
270+
zap.Int("retry", task.Retry),
271+
)
272+
} else {
273+
statslogutil.StatsErrVerboseSampleLogger().Warn(msg,
274+
zap.Error(err),
275+
)
276+
}
264277
// To avoid the thundering herd effect
265278
// thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs.
266279
r := rand.Intn(500)
@@ -279,15 +292,15 @@ func (s *statsSyncLoad) HandleOneTask(lastTask *statstypes.NeededItemTask, exit
279292
defer func() {
280293
// recover for each task, worker keeps working
281294
if r := recover(); r != nil {
282-
logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack"))
295+
statslogutil.StatsLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack"))
283296
err = errors.Errorf("stats loading panicked: %v", r)
284297
}
285298
}()
286299
if lastTask == nil {
287300
task, err = s.drainColTask(exit)
288301
if err != nil {
289302
if err != errExit {
290-
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
303+
statslogutil.StatsLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
291304
}
292305
return task, err
293306
}
@@ -323,7 +336,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
323336
defer func() {
324337
// recover for each task, worker keeps working
325338
if r := recover(); r != nil {
326-
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
339+
statslogutil.StatsLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
327340
err = errors.Errorf("stats loading panicked: %v", r)
328341
}
329342
if err == nil { // only recycle when no error
@@ -337,7 +350,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
337350
var skipTypes map[string]struct{}
338351
val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(vardef.TiDBAnalyzeSkipColumnTypes)
339352
if err != nil {
340-
logutil.BgLogger().Warn("failed to get global variable", zap.Error(err))
353+
statslogutil.StatsLogger().Warn("failed to get global variable", zap.Error(err))
341354
} else {
342355
skipTypes = variable.ParseAnalyzeSkipColumnTypes(val)
343356
}
@@ -439,7 +452,7 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
439452
return nil, err
440453
}
441454
if hg == nil {
442-
logutil.BgLogger().Warn("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID),
455+
statslogutil.StatsLogger().Warn("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID),
443456
zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex),
444457
)
445458
return nil, errGetHistMeta
@@ -563,31 +576,6 @@ func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask,
563576
}
564577
}
565578

566-
// writeToChanWithTimeout writes a task to a channel and blocks until timeout.
567-
func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error {
568-
timer := time.NewTimer(timeout)
569-
defer timer.Stop()
570-
select {
571-
case taskCh <- task:
572-
case <-timer.C:
573-
return errors.New("Channel is full and timeout writing to channel")
574-
}
575-
return nil
576-
}
577-
578-
// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
579-
func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
580-
defer func() {
581-
if r := recover(); r != nil {
582-
logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
583-
}
584-
}()
585-
select {
586-
case resultCh <- rs:
587-
default:
588-
}
589-
}
590-
591579
// updateCachedItem updates the column/index hist to global statsCache.
592580
func (s *statsSyncLoad) updateCachedItem(tblInfo *model.TableInfo, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
593581
s.mutexForStatsCache.Lock()

pkg/statistics/handle/usage/session_stats_collect.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
101101
s.SessionTableDelta().Merge(deltaMap)
102102
}()
103103
if time.Since(start) > tooSlowThreshold {
104-
statslogutil.SingletonStatsSamplerLogger().Warn("Sweeping session list is too slow",
104+
statslogutil.StatsSampleLogger().Warn("Sweeping session list is too slow",
105105
zap.Int("tableCount", len(deltaMap)),
106106
zap.Duration("duration", time.Since(start)))
107107
}
@@ -138,7 +138,7 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
138138
batchUpdates = append(batchUpdates, storage.NewDeltaUpdate(id, item, false))
139139
}
140140
if time.Since(batchStart) > tooSlowThreshold {
141-
statslogutil.SingletonStatsSamplerLogger().Warn("Collecting batch updates is too slow",
141+
statslogutil.StatsSampleLogger().Warn("Collecting batch updates is too slow",
142142
zap.Int("tableCount", len(batchUpdates)),
143143
zap.Duration("duration", time.Since(batchStart)))
144144
}
@@ -173,7 +173,7 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
173173
}
174174

175175
if time.Since(batchStart) > tooSlowThreshold {
176-
statslogutil.SingletonStatsSamplerLogger().Warn("Dumping batch updates is too slow",
176+
statslogutil.StatsSampleLogger().Warn("Dumping batch updates is too slow",
177177
zap.Int("tableCount", len(batchUpdates)),
178178
zap.Duration("duration", time.Since(batchStart)))
179179
}
@@ -196,7 +196,7 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
196196
s.statsHandle.RecordHistoricalStatsMeta(statsVersion, "flush stats", false, unlockedTableIDs...)
197197
// Log a warning if recording historical stats meta takes too long, as it can be slow for large table counts
198198
if time.Since(startRecordHistoricalStatsMeta) > time.Minute*15 {
199-
statslogutil.SingletonStatsSamplerLogger().Warn("Recording historical stats meta is too slow",
199+
statslogutil.StatsSampleLogger().Warn("Recording historical stats meta is too slow",
200200
zap.Int("tableCount", len(batchUpdates)),
201201
zap.Duration("duration", time.Since(startRecordHistoricalStatsMeta)))
202202
}

pkg/util/logutil/log.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,22 @@ func SampleLoggerFactory(tick time.Duration, first int, fields ...zap.Field) fun
445445
return logger
446446
}
447447
}
448+
449+
// SampleErrVerboseLoggerFactory returns a factory function that creates a sample logger with error verbose logging.
450+
// It works similarly to SampleLoggerFactory but ensures that error details are always logged,
451+
// regardless of the logging configuration.
452+
func SampleErrVerboseLoggerFactory(tick time.Duration, first int, fields ...zap.Field) func() *zap.Logger {
453+
var (
454+
once sync.Once
455+
logger *zap.Logger
456+
)
457+
return func() *zap.Logger {
458+
once.Do(func() {
459+
sampleCore := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
460+
return zapcore.NewSamplerWithOptions(core, tick, first, 0)
461+
})
462+
logger = ErrVerboseLogger().With(fields...).With(zap.String("sampled", "")).WithOptions(sampleCore)
463+
})
464+
return logger
465+
}
466+
}

0 commit comments

Comments
 (0)