Skip to content

Commit c9e2fcf

Browse files
rleungxdisksing
authored andcommitted
statistics: fix the hot region API cannot work without hot scheduler (tikv#4424)
* refacter store load in hot region scheduler ref tikv#3879 Signed-off-by: Ryan Leung <[email protected]> * resolve conflicts Signed-off-by: Ryan Leung <[email protected]> * fix the hot region API cannot work when hot scheduler is disabled Signed-off-by: Ryan Leung <[email protected]> * resolve conflicts Signed-off-by: Ryan Leung <[email protected]> * clean up Signed-off-by: Ryan Leung <[email protected]> * make drop region atomically Signed-off-by: Ryan Leung <[email protected]> * address the comment Signed-off-by: Ryan Leung <[email protected]>
1 parent 2e993a1 commit c9e2fcf

12 files changed

+391
-373
lines changed

server/cluster/cluster.go

+6-21
Original file line numberDiff line numberDiff line change
@@ -852,11 +852,7 @@ func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
852852

853853
// DropCacheRegion removes a region from the cache.
854854
func (c *RaftCluster) DropCacheRegion(id uint64) {
855-
c.RLock()
856-
defer c.RUnlock()
857-
if region := c.GetRegion(id); region != nil {
858-
c.core.RemoveRegion(region)
859-
}
855+
c.core.RemoveRegionIfExist(id)
860856
}
861857

862858
// GetCacheCluster gets the cached cluster.
@@ -883,10 +879,7 @@ func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
883879

884880
// IsRegionHot checks if a region is in hot state.
885881
func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
886-
c.RLock()
887-
hotStat := c.hotStat
888-
c.RUnlock()
889-
return hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
882+
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
890883
}
891884

892885
// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID.
@@ -1456,8 +1449,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
14561449

14571450
// GetStoresLoads returns load stats of all stores.
14581451
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
1459-
c.RLock()
1460-
defer c.RUnlock()
14611452
return c.hotStat.GetStoresLoads()
14621453
}
14631454

@@ -1508,23 +1499,17 @@ func (c *RaftCluster) GetRegionLabeler() *labeler.RegionLabeler {
15081499

15091500
// GetHotWriteRegions gets hot write regions' info.
15101501
func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
1511-
c.RLock()
1512-
co := c.coordinator
1513-
c.RUnlock()
1514-
hotWriteRegions := co.getHotWriteRegions()
1515-
if len(storeIDs) > 0 {
1502+
hotWriteRegions := c.coordinator.getHotRegionsByType(statistics.Write)
1503+
if len(storeIDs) > 0 && hotWriteRegions != nil {
15161504
hotWriteRegions = getHotRegionsByStoreIDs(hotWriteRegions, storeIDs...)
15171505
}
15181506
return hotWriteRegions
15191507
}
15201508

15211509
// GetHotReadRegions gets hot read regions' info.
15221510
func (c *RaftCluster) GetHotReadRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
1523-
c.RLock()
1524-
co := c.coordinator
1525-
c.RUnlock()
1526-
hotReadRegions := co.getHotReadRegions()
1527-
if len(storeIDs) > 0 {
1511+
hotReadRegions := c.coordinator.getHotRegionsByType(statistics.Read)
1512+
if len(storeIDs) > 0 && hotReadRegions != nil {
15281513
hotReadRegions = getHotRegionsByStoreIDs(hotReadRegions, storeIDs...)
15291514
}
15301515
return hotReadRegions

server/cluster/coordinator.go

+23-45
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/tikv/pd/server/schedule/checker"
3636
"github.com/tikv/pd/server/schedule/hbstream"
3737
"github.com/tikv/pd/server/schedule/operator"
38-
"github.com/tikv/pd/server/schedulers"
3938
"github.com/tikv/pd/server/statistics"
4039
"github.com/tikv/pd/server/storage"
4140
"go.uber.org/zap"
@@ -470,35 +469,18 @@ func (c *coordinator) stop() {
470469
c.cancel()
471470
}
472471

473-
// Hack to retrieve info from scheduler.
474-
// TODO: remove it.
475-
type hasHotStatus interface {
476-
GetHotStatus(statistics.RWType) *statistics.StoreHotPeersInfos
477-
GetPendingInfluence() map[uint64]*statistics.Influence
478-
}
479-
480-
func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
481-
c.RLock()
482-
defer c.RUnlock()
483-
s, ok := c.schedulers[schedulers.HotRegionName]
484-
if !ok {
485-
return nil
486-
}
487-
if h, ok := s.Scheduler.(hasHotStatus); ok {
488-
return h.GetHotStatus(statistics.Write)
489-
}
490-
return nil
491-
}
492-
493-
func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos {
494-
c.RLock()
495-
defer c.RUnlock()
496-
s, ok := c.schedulers[schedulers.HotRegionName]
497-
if !ok {
498-
return nil
499-
}
500-
if h, ok := s.Scheduler.(hasHotStatus); ok {
501-
return h.GetHotStatus(statistics.Read)
472+
func (c *coordinator) getHotRegionsByType(typ statistics.RWType) *statistics.StoreHotPeersInfos {
473+
isTraceFlow := c.cluster.GetOpts().IsTraceRegionFlow()
474+
storeLoads := c.cluster.GetStoresLoads()
475+
stores := c.cluster.GetStores()
476+
switch typ {
477+
case statistics.Write:
478+
regionStats := c.cluster.RegionWriteStats()
479+
return statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Write, isTraceFlow)
480+
case statistics.Read:
481+
regionStats := c.cluster.RegionReadStats()
482+
return statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Read, isTraceFlow)
483+
default:
502484
}
503485
return nil
504486
}
@@ -542,36 +524,32 @@ func (c *coordinator) resetSchedulerMetrics() {
542524
}
543525

544526
func (c *coordinator) collectHotSpotMetrics() {
545-
c.RLock()
546-
// Collects hot write region metrics.
547-
s, ok := c.schedulers[schedulers.HotRegionName]
548-
if !ok {
549-
c.RUnlock()
550-
return
551-
}
552-
c.RUnlock()
553527
stores := c.cluster.GetStores()
554528
// Collects hot write region metrics.
555-
collectHotMetrics(s, stores, statistics.Write)
529+
collectHotMetrics(c.cluster, stores, statistics.Write)
556530
// Collects hot read region metrics.
557-
collectHotMetrics(s, stores, statistics.Read)
531+
collectHotMetrics(c.cluster, stores, statistics.Read)
558532
// Collects pending influence.
559-
collectPendingInfluence(s, stores)
533+
collectPendingInfluence(stores)
560534
}
561535

562-
func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ statistics.RWType) {
563-
status := s.Scheduler.(hasHotStatus).GetHotStatus(typ)
536+
func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ statistics.RWType) {
564537
var (
565538
kind string
566539
byteTyp, keyTyp, queryTyp statistics.RegionStatKind
540+
regionStats map[uint64][]*statistics.HotPeerStat
567541
)
568542

569543
switch typ {
570544
case statistics.Read:
545+
regionStats = cluster.RegionReadStats()
571546
kind, byteTyp, keyTyp, queryTyp = statistics.Read.String(), statistics.RegionReadBytes, statistics.RegionReadKeys, statistics.RegionReadQuery
572547
case statistics.Write:
548+
regionStats = cluster.RegionWriteStats()
573549
kind, byteTyp, keyTyp, queryTyp = statistics.Write.String(), statistics.RegionWriteBytes, statistics.RegionWriteKeys, statistics.RegionWriteQuery
574550
}
551+
status := statistics.GetHotStatus(stores, cluster.GetStoresLoads(), regionStats, typ, cluster.GetOpts().IsTraceRegionFlow())
552+
575553
for _, s := range stores {
576554
storeAddress := s.GetAddress()
577555
storeID := s.GetID()
@@ -604,8 +582,8 @@ func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ stat
604582
}
605583
}
606584

607-
func collectPendingInfluence(s *scheduleController, stores []*core.StoreInfo) {
608-
pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence()
585+
func collectPendingInfluence(stores []*core.StoreInfo) {
586+
pendings := statistics.GetPendingInfluence(stores)
609587
for _, s := range stores {
610588
storeAddress := s.GetAddress()
611589
storeID := s.GetID()

server/core/basic_cluster.go

+9
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,15 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
397397
return bc.PutRegion(region)
398398
}
399399

400+
// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
401+
func (bc *BasicCluster) RemoveRegionIfExist(id uint64) {
402+
bc.Lock()
403+
defer bc.Unlock()
404+
if region := bc.Regions.GetRegion(id); region != nil {
405+
bc.Regions.RemoveRegion(region)
406+
}
407+
}
408+
400409
// RemoveRegion removes RegionInfo from regionTree and regionMap.
401410
func (bc *BasicCluster) RemoveRegion(region *RegionInfo) {
402411
bc.Lock()

server/schedulers/grant_hot_region.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -274,21 +274,21 @@ func (s *grantHotRegionScheduler) Schedule(cluster schedule.Cluster) []*operator
274274
}
275275

276276
func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
277-
storeInfos := statistics.SummaryStoreInfos(cluster)
277+
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
278278
storesLoads := cluster.GetStoresLoads()
279279
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()
280280

281281
var stLoadInfos map[uint64]*statistics.StoreLoadDetail
282282
switch typ {
283283
case statistics.Read:
284-
stLoadInfos = summaryStoresLoad(
284+
stLoadInfos = statistics.SummaryStoresLoad(
285285
storeInfos,
286286
storesLoads,
287287
cluster.RegionReadStats(),
288288
isTraceRegionFlow,
289289
statistics.Read, core.RegionKind)
290290
case statistics.Write:
291-
stLoadInfos = summaryStoresLoad(
291+
stLoadInfos = statistics.SummaryStoresLoad(
292292
storeInfos,
293293
storesLoads,
294294
cluster.RegionWriteStats(),

server/schedulers/hot_region.go

+5-41
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
174174
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
175175
// each store
176176
func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule.Cluster) {
177-
h.stInfos = statistics.SummaryStoreInfos(cluster)
177+
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
178178
h.summaryPendingInfluence()
179179
storesLoads := cluster.GetStoresLoads()
180180
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()
@@ -183,13 +183,13 @@ func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule
183183
case statistics.Read:
184184
// update read statistics
185185
regionRead := cluster.RegionReadStats()
186-
h.stLoadInfos[readLeader] = summaryStoresLoad(
186+
h.stLoadInfos[readLeader] = statistics.SummaryStoresLoad(
187187
h.stInfos,
188188
storesLoads,
189189
regionRead,
190190
isTraceRegionFlow,
191191
statistics.Read, core.LeaderKind)
192-
h.stLoadInfos[readPeer] = summaryStoresLoad(
192+
h.stLoadInfos[readPeer] = statistics.SummaryStoresLoad(
193193
h.stInfos,
194194
storesLoads,
195195
regionRead,
@@ -198,13 +198,13 @@ func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule
198198
case statistics.Write:
199199
// update write statistics
200200
regionWrite := cluster.RegionWriteStats()
201-
h.stLoadInfos[writeLeader] = summaryStoresLoad(
201+
h.stLoadInfos[writeLeader] = statistics.SummaryStoresLoad(
202202
h.stInfos,
203203
storesLoads,
204204
regionWrite,
205205
isTraceRegionFlow,
206206
statistics.Write, core.LeaderKind)
207-
h.stLoadInfos[writePeer] = summaryStoresLoad(
207+
h.stLoadInfos[writePeer] = statistics.SummaryStoresLoad(
208208
h.stInfos,
209209
storesLoads,
210210
regionWrite,
@@ -1127,42 +1127,6 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
11271127
return op, infl
11281128
}
11291129

1130-
func (h *hotScheduler) GetHotStatus(typ statistics.RWType) *statistics.StoreHotPeersInfos {
1131-
h.RLock()
1132-
defer h.RUnlock()
1133-
var leaderTyp, peerTyp resourceType
1134-
switch typ {
1135-
case statistics.Read:
1136-
leaderTyp, peerTyp = readLeader, readPeer
1137-
case statistics.Write:
1138-
leaderTyp, peerTyp = writeLeader, writePeer
1139-
}
1140-
asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[leaderTyp]))
1141-
asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[peerTyp]))
1142-
for id, detail := range h.stLoadInfos[leaderTyp] {
1143-
asLeader[id] = detail.ToHotPeersStat()
1144-
}
1145-
for id, detail := range h.stLoadInfos[peerTyp] {
1146-
asPeer[id] = detail.ToHotPeersStat()
1147-
}
1148-
return &statistics.StoreHotPeersInfos{
1149-
AsLeader: asLeader,
1150-
AsPeer: asPeer,
1151-
}
1152-
}
1153-
1154-
func (h *hotScheduler) GetPendingInfluence() map[uint64]*statistics.Influence {
1155-
h.RLock()
1156-
defer h.RUnlock()
1157-
ret := make(map[uint64]*statistics.Influence, len(h.stInfos))
1158-
for id, info := range h.stInfos {
1159-
if info.PendingSum != nil {
1160-
ret[id] = info.PendingSum
1161-
}
1162-
}
1163-
return ret
1164-
}
1165-
11661130
// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
11671131
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
11681132
status := op.CheckAndGetStatus()

server/schedulers/metrics.go

-9
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,6 @@ var schedulerStatus = prometheus.NewGaugeVec(
3232
Help: "Inner status of the scheduler.",
3333
}, []string{"type", "name"})
3434

35-
var hotPeerSummary = prometheus.NewGaugeVec(
36-
prometheus.GaugeOpts{
37-
Namespace: "pd",
38-
Subsystem: "scheduler",
39-
Name: "hot_peers_summary",
40-
Help: "Hot peers summary for each store",
41-
}, []string{"type", "store"})
42-
4335
var opInfluenceStatus = prometheus.NewGaugeVec(
4436
prometheus.GaugeOpts{
4537
Namespace: "pd",
@@ -123,7 +115,6 @@ var hotPendingStatus = prometheus.NewGaugeVec(
123115
func init() {
124116
prometheus.MustRegister(schedulerCounter)
125117
prometheus.MustRegister(schedulerStatus)
126-
prometheus.MustRegister(hotPeerSummary)
127118
prometheus.MustRegister(balanceLeaderCounter)
128119
prometheus.MustRegister(balanceRegionCounter)
129120
prometheus.MustRegister(hotSchedulerResultCounter)

server/schedulers/shuffle_hot_region.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -134,21 +134,21 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster schedule.Cluster) []*operat
134134
}
135135

136136
func (s *shuffleHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
137-
storeInfos := statistics.SummaryStoreInfos(cluster)
137+
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
138138
storesLoads := cluster.GetStoresLoads()
139139
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()
140140

141141
switch typ {
142142
case statistics.Read:
143-
s.stLoadInfos[readLeader] = summaryStoresLoad(
143+
s.stLoadInfos[readLeader] = statistics.SummaryStoresLoad(
144144
storeInfos,
145145
storesLoads,
146146
cluster.RegionReadStats(),
147147
isTraceRegionFlow,
148148
statistics.Read, core.LeaderKind)
149149
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
150150
case statistics.Write:
151-
s.stLoadInfos[writeLeader] = summaryStoresLoad(
151+
s.stLoadInfos[writeLeader] = statistics.SummaryStoresLoad(
152152
storeInfos,
153153
storesLoads,
154154
cluster.RegionWriteStats(),

0 commit comments

Comments
 (0)