@@ -14,6 +14,7 @@ import (
14
14
15
15
"github.com/cockroachdb/errors/oserror"
16
16
"github.com/cockroachdb/pebble/v2/internal/base"
17
+ "github.com/cockroachdb/pebble/v2/internal/invariants"
17
18
"github.com/cockroachdb/pebble/v2/objstorage"
18
19
"github.com/cockroachdb/pebble/v2/vfs"
19
20
"github.com/cockroachdb/pebble/v2/wal"
@@ -30,10 +31,9 @@ type DeleteCleaner = base.DeleteCleaner
30
31
type ArchiveCleaner = base.ArchiveCleaner
31
32
32
33
type cleanupManager struct {
33
- opts * Options
34
- objProvider objstorage.Provider
35
- onTableDeleteFn func (fileSize uint64 , isLocal bool )
36
- deletePacer * deletionPacer
34
+ opts * Options
35
+ objProvider objstorage.Provider
36
+ deletePacer * deletionPacer
37
37
38
38
// jobsCh is used as the cleanup job queue.
39
39
jobsCh chan * cleanupJob
@@ -44,6 +44,7 @@ type cleanupManager struct {
44
44
sync.Mutex
45
45
// totalJobs is the total number of enqueued jobs (completed or in progress).
46
46
totalJobs int
47
+ completedStats obsoleteTableStats
47
48
completedJobs int
48
49
completedJobsCond sync.Cond
49
50
jobsQueueWarningIssued bool
@@ -73,22 +74,19 @@ type obsoleteFile struct {
73
74
type cleanupJob struct {
74
75
jobID JobID
75
76
obsoleteFiles []obsoleteFile
77
+ tableStats obsoleteTableStats
76
78
}
77
79
78
80
// openCleanupManager creates a cleanupManager and starts its background goroutine.
79
81
// The cleanupManager must be Close()d.
80
82
func openCleanupManager (
81
- opts * Options ,
82
- objProvider objstorage.Provider ,
83
- onTableDeleteFn func (fileSize uint64 , isLocal bool ),
84
- getDeletePacerInfo func () deletionPacerInfo ,
83
+ opts * Options , objProvider objstorage.Provider , getDeletePacerInfo func () deletionPacerInfo ,
85
84
) * cleanupManager {
86
85
cm := & cleanupManager {
87
- opts : opts ,
88
- objProvider : objProvider ,
89
- onTableDeleteFn : onTableDeleteFn ,
90
- deletePacer : newDeletionPacer (time .Now (), int64 (opts .TargetByteDeletionRate ), getDeletePacerInfo ),
91
- jobsCh : make (chan * cleanupJob , jobsQueueDepth ),
86
+ opts : opts ,
87
+ objProvider : objProvider ,
88
+ deletePacer : newDeletionPacer (time .Now (), int64 (opts .TargetByteDeletionRate ), getDeletePacerInfo ),
89
+ jobsCh : make (chan * cleanupJob , jobsQueueDepth ),
92
90
}
93
91
cm .mu .completedJobsCond .L = & cm .mu .Mutex
94
92
cm .waitGroup .Add (1 )
@@ -102,6 +100,14 @@ func openCleanupManager(
102
100
return cm
103
101
}
104
102
103
+ // CompletedStats returns the stats summarizing tables deleted. The returned
104
+ // stats increase monotonically over the lifetime of the DB.
105
+ func (cm * cleanupManager ) CompletedStats () obsoleteTableStats {
106
+ cm .mu .Lock ()
107
+ defer cm .mu .Unlock ()
108
+ return cm .mu .completedStats
109
+ }
110
+
105
111
// Close stops the background goroutine, waiting until all queued jobs are completed.
106
112
// Delete pacing is disabled for the remaining jobs.
107
113
func (cm * cleanupManager ) Close () {
@@ -110,10 +116,13 @@ func (cm *cleanupManager) Close() {
110
116
}
111
117
112
118
// EnqueueJob adds a cleanup job to the manager's queue.
113
- func (cm * cleanupManager ) EnqueueJob (jobID JobID , obsoleteFiles []obsoleteFile ) {
119
+ func (cm * cleanupManager ) EnqueueJob (
120
+ jobID JobID , obsoleteFiles []obsoleteFile , tableStats obsoleteTableStats ,
121
+ ) {
114
122
job := & cleanupJob {
115
123
jobID : jobID ,
116
124
obsoleteFiles : obsoleteFiles ,
125
+ tableStats : tableStats ,
117
126
}
118
127
119
128
// Report deleted bytes to the pacer, which can use this data to potentially
@@ -165,7 +174,6 @@ func (cm *cleanupManager) mainLoop() {
165
174
switch of .fileType {
166
175
case fileTypeTable :
167
176
cm .maybePace (& tb , of .fileType , of .nonLogFile .fileNum , of .nonLogFile .fileSize )
168
- cm .onTableDeleteFn (of .nonLogFile .fileSize , of .nonLogFile .isLocal )
169
177
cm .deleteObsoleteObject (fileTypeTable , job .jobID , of .nonLogFile .fileNum )
170
178
case fileTypeLog :
171
179
cm .deleteObsoleteFile (of .logFile .FS , fileTypeLog , job .jobID , of .logFile .Path ,
@@ -177,6 +185,7 @@ func (cm *cleanupManager) mainLoop() {
177
185
}
178
186
}
179
187
cm .mu .Lock ()
188
+ cm .mu .completedStats .Add (job .tableStats )
180
189
cm .mu .completedJobs ++
181
190
cm .mu .completedJobsCond .Broadcast ()
182
191
cm .maybeLogLocked ()
@@ -322,17 +331,6 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
322
331
return pacerInfo
323
332
}
324
333
325
- // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
326
- func (d * DB ) onObsoleteTableDelete (fileSize uint64 , isLocal bool ) {
327
- d .mu .Lock ()
328
- d .mu .versions .metrics .Table .ObsoleteCount --
329
- d .mu .versions .metrics .Table .ObsoleteSize -= fileSize
330
- if isLocal {
331
- d .mu .versions .metrics .Table .Local .ObsoleteSize -= fileSize
332
- }
333
- d .mu .Unlock ()
334
- }
335
-
336
334
// scanObsoleteFiles scans the filesystem for files that are no longer needed
337
335
// and adds those to the internal lists of obsolete files. Note that the files
338
336
// are not actually deleted by this method. A subsequent call to
@@ -439,7 +437,7 @@ func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlusha
439
437
//
440
438
// d.mu must be held when calling this method.
441
439
func (d * DB ) disableFileDeletions () {
442
- d .mu .disableFileDeletions ++
440
+ d .mu .fileDeletions . disableCount ++
443
441
d .mu .Unlock ()
444
442
defer d .mu .Lock ()
445
443
d .cleanupManager .Wait ()
@@ -450,11 +448,11 @@ func (d *DB) disableFileDeletions() {
450
448
//
451
449
// d.mu must be held when calling this method.
452
450
func (d * DB ) enableFileDeletions () {
453
- if d .mu .disableFileDeletions <= 0 {
451
+ if d .mu .fileDeletions . disableCount <= 0 {
454
452
panic ("pebble: file deletion disablement invariant violated" )
455
453
}
456
- d .mu .disableFileDeletions --
457
- if d .mu .disableFileDeletions > 0 {
454
+ d .mu .fileDeletions . disableCount --
455
+ if d .mu .fileDeletions . disableCount > 0 {
458
456
return
459
457
}
460
458
d .deleteObsoleteFiles (d .newJobIDLocked ())
@@ -469,7 +467,7 @@ type fileInfo = base.FileInfo
469
467
// Does nothing if file deletions are disabled (see disableFileDeletions). A
470
468
// cleanup job will be scheduled when file deletions are re-enabled.
471
469
func (d * DB ) deleteObsoleteFiles (jobID JobID ) {
472
- if d .mu .disableFileDeletions > 0 {
470
+ if d .mu .fileDeletions . disableCount > 0 {
473
471
return
474
472
}
475
473
_ , noRecycle := d .opts .Cleaner .(base.NeedsFileContents )
@@ -507,6 +505,14 @@ func (d *DB) deleteObsoleteFiles(jobID JobID) {
507
505
obsoleteOptions := d .mu .versions .obsoleteOptions
508
506
d .mu .versions .obsoleteOptions = nil
509
507
508
+ // Compute the stats for the tables being queued for deletion and add them
509
+ // to the running total. These stats will be used during DB.Metrics() to
510
+ // calculate the count and size of pending obsolete tables by diffing these
511
+ // stats and the stats reported by the cleanup manager.
512
+ tableStats := calculateObsoleteTableStats (obsoleteTables )
513
+ d .mu .fileDeletions .queuedStats .Add (tableStats )
514
+ d .mu .versions .updateObsoleteTableMetricsLocked ()
515
+
510
516
// Release d.mu while preparing the cleanup job and possibly waiting.
511
517
// Note the unusual order: Unlock and then Lock.
512
518
d .mu .Unlock ()
@@ -560,7 +566,7 @@ func (d *DB) deleteObsoleteFiles(jobID JobID) {
560
566
}
561
567
}
562
568
if len (filesToDelete ) > 0 {
563
- d .cleanupManager .EnqueueJob (jobID , filesToDelete )
569
+ d .cleanupManager .EnqueueJob (jobID , filesToDelete , tableStats )
564
570
}
565
571
if d .opts .private .testingAlwaysWaitForCleanup {
566
572
d .cleanupManager .Wait ()
@@ -579,6 +585,49 @@ func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
579
585
}
580
586
}
581
587
588
+ func calculateObsoleteTableStats (objects []tableInfo ) obsoleteTableStats {
589
+ var stats obsoleteTableStats
590
+ for _ , o := range objects {
591
+ if o .isLocal {
592
+ stats .local .count ++
593
+ stats .local .size += o .FileSize
594
+ }
595
+ stats .total .count ++
596
+ stats .total .size += o .FileSize
597
+ }
598
+ return stats
599
+ }
600
+
601
+ type obsoleteTableStats struct {
602
+ local countAndSize
603
+ total countAndSize
604
+ }
605
+
606
+ func (s * obsoleteTableStats ) Add (other obsoleteTableStats ) {
607
+ s .local .Add (other .local )
608
+ s .total .Add (other .total )
609
+ }
610
+
611
+ func (s * obsoleteTableStats ) Sub (other obsoleteTableStats ) {
612
+ s .local .Sub (other .local )
613
+ s .total .Sub (other .total )
614
+ }
615
+
616
+ type countAndSize struct {
617
+ count uint64
618
+ size uint64
619
+ }
620
+
621
+ func (c * countAndSize ) Add (other countAndSize ) {
622
+ c .count += other .count
623
+ c .size += other .size
624
+ }
625
+
626
+ func (c * countAndSize ) Sub (other countAndSize ) {
627
+ c .count = invariants .SafeSub (c .count , other .count )
628
+ c .size = invariants .SafeSub (c .size , other .size )
629
+ }
630
+
582
631
func merge (a , b []fileInfo ) []fileInfo {
583
632
if len (b ) == 0 {
584
633
return a
0 commit comments