@@ -17,6 +17,7 @@ import (
17
17
"github.com/go-kit/log"
18
18
"github.com/go-kit/log/level"
19
19
"github.com/oklog/ulid"
20
+ "github.com/opentracing/opentracing-go"
20
21
"github.com/pkg/errors"
21
22
"github.com/prometheus/client_golang/prometheus"
22
23
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -31,6 +32,7 @@ import (
31
32
"github.com/thanos-io/thanos/pkg/extprom"
32
33
"github.com/thanos-io/thanos/pkg/objstore"
33
34
"github.com/thanos-io/thanos/pkg/runutil"
35
+ "github.com/thanos-io/thanos/pkg/tracing"
34
36
)
35
37
36
38
type ResolutionLevel int64
@@ -764,7 +766,11 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
764
766
return false , ulid.ULID {}, errors .Wrap (err , "create compaction group dir" )
765
767
}
766
768
767
- shouldRerun , compID , err := cg .compact (ctx , subDir , planner , comp )
769
+ var err error
770
+ tracing .DoInSpanWithErr (ctx , "compaction_group" , func (ctx context.Context ) error {
771
+ shouldRerun , compID , err = cg .compact (ctx , subDir , planner , comp )
772
+ return err
773
+ }, opentracing.Tags {"group.key" : cg .Key ()})
768
774
if err != nil {
769
775
cg .compactionFailures .Inc ()
770
776
return false , ulid.ULID {}, err
@@ -980,7 +986,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
980
986
overlappingBlocks = true
981
987
}
982
988
983
- toCompact , err := planner .Plan (ctx , cg .metasByMinTime )
989
+ var toCompact []* metadata.Meta
990
+ tracing .DoInSpanWithErr (ctx , "compaction_planning" , func (ctx context.Context ) error {
991
+ toCompact , err = planner .Plan (ctx , cg .metasByMinTime )
992
+ return err
993
+ })
984
994
if err != nil {
985
995
return false , ulid.ULID {}, errors .Wrap (err , "plan compaction" )
986
996
}
@@ -1008,12 +1018,20 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
1008
1018
uniqueSources [s ] = struct {}{}
1009
1019
}
1010
1020
1011
- if err := block .Download (ctx , cg .logger , cg .bkt , meta .ULID , bdir ); err != nil {
1021
+ tracing .DoInSpanWithErr (ctx , "compaction_block_download" , func (ctx context.Context ) error {
1022
+ err = block .Download (ctx , cg .logger , cg .bkt , meta .ULID , bdir )
1023
+ return err
1024
+ }, opentracing.Tags {"block.id" : meta .ULID })
1025
+ if err != nil {
1012
1026
return false , ulid.ULID {}, retry (errors .Wrapf (err , "download block %s" , meta .ULID ))
1013
1027
}
1014
1028
1015
1029
// Ensure all input blocks are valid.
1016
- stats , err := block .GatherIndexHealthStats (cg .logger , filepath .Join (bdir , block .IndexFilename ), meta .MinTime , meta .MaxTime )
1030
+ var stats block.HealthStats
1031
+ tracing .DoInSpanWithErr (ctx , "compaction_block_health_stats" , func (ctx context.Context ) error {
1032
+ stats , err = block .GatherIndexHealthStats (cg .logger , filepath .Join (bdir , block .IndexFilename ), meta .MinTime , meta .MaxTime )
1033
+ return err
1034
+ }, opentracing.Tags {"block.id" : meta .ULID })
1017
1035
if err != nil {
1018
1036
return false , ulid.ULID {}, errors .Wrapf (err , "gather index issues for block %s" , bdir )
1019
1037
}
@@ -1039,7 +1057,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
1039
1057
level .Info (cg .logger ).Log ("msg" , "downloaded and verified blocks; compacting blocks" , "plan" , fmt .Sprintf ("%v" , toCompactDirs ), "duration" , time .Since (begin ), "duration_ms" , time .Since (begin ).Milliseconds ())
1040
1058
1041
1059
begin = time .Now ()
1042
- compID , err = comp .Compact (dir , toCompactDirs , nil )
1060
+ tracing .DoInSpanWithErr (ctx , "compaction" , func (ctx context.Context ) error {
1061
+ compID , err = comp .Compact (dir , toCompactDirs , nil )
1062
+ return err
1063
+ })
1043
1064
if err != nil {
1044
1065
return false , ulid.ULID {}, halt (errors .Wrapf (err , "compact blocks %v" , toCompactDirs ))
1045
1066
}
@@ -1081,7 +1102,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
1081
1102
}
1082
1103
1083
1104
// Ensure the output block is valid.
1084
- if err := block .VerifyIndex (cg .logger , index , newMeta .MinTime , newMeta .MaxTime ); ! cg .acceptMalformedIndex && err != nil {
1105
+ tracing .DoInSpanWithErr (ctx , "compaction_verify_index" , func (ctx context.Context ) error {
1106
+ err = block .VerifyIndex (cg .logger , index , newMeta .MinTime , newMeta .MaxTime )
1107
+ return err
1108
+ })
1109
+ if ! cg .acceptMalformedIndex && err != nil {
1085
1110
return false , ulid.ULID {}, halt (errors .Wrapf (err , "invalid result block %s" , bdir ))
1086
1111
}
1087
1112
@@ -1095,7 +1120,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
1095
1120
1096
1121
begin = time .Now ()
1097
1122
1098
- if err := block .Upload (ctx , cg .logger , cg .bkt , bdir , cg .hashFunc ); err != nil {
1123
+ tracing .DoInSpanWithErr (ctx , "compaction_block_upload" , func (ctx context.Context ) error {
1124
+ err = block .Upload (ctx , cg .logger , cg .bkt , bdir , cg .hashFunc )
1125
+ return err
1126
+ })
1127
+ if err != nil {
1099
1128
return false , ulid.ULID {}, retry (errors .Wrapf (err , "upload of %s failed" , compID ))
1100
1129
}
1101
1130
level .Info (cg .logger ).Log ("msg" , "uploaded block" , "result_block" , compID , "duration" , time .Since (begin ), "duration_ms" , time .Since (begin ).Milliseconds ())
@@ -1104,7 +1133,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
1104
1133
// into the next planning cycle.
1105
1134
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
1106
1135
for _ , meta := range toCompact {
1107
- if err := cg .deleteBlock (meta .ULID , filepath .Join (dir , meta .ULID .String ())); err != nil {
1136
+ tracing .DoInSpanWithErr (ctx , "compaction_block_delete" , func (ctx context.Context ) error {
1137
+ err = cg .deleteBlock (meta .ULID , filepath .Join (dir , meta .ULID .String ()))
1138
+ return err
1139
+ }, opentracing.Tags {"block.id" : meta .ULID })
1140
+ if err != nil {
1108
1141
return false , ulid.ULID {}, retry (errors .Wrapf (err , "mark old block for deletion from bucket" ))
1109
1142
}
1110
1143
cg .groupGarbageCollectedBlocks .Inc ()
0 commit comments