Skip to content

Commit be7248a

Browse files
committed
feat(storagenode): migrate metrics to OpenTelemetry histogram wrappers
Replace existing metrics with OpenTelemetry histogram wrappers. Convert several metric types from counters to histograms to capture measurement distributions. Converted from counter to histogram: - sn.append.preparation - sn.sequencer.operation.duration - sn.sequencer.fanout.duration - sn.writer.operation.duration - sn.committer.operation.duration - sn.committer.logs - sn.replicate.client.operation.duration - sn.replicate.duration - sn.replicate.fanout.duration Migrated to OpenTelemetry histogram wrapper (already histogram): - log_rpc.server.duration - log_rpc.server.log_entry.size - log_rpc.server.batch.size - log_rpc.server.log_entries_per_batch This improves observability by enabling distribution analysis for these metrics.
1 parent bc0d663 commit be7248a

File tree

9 files changed

+215
-309
lines changed

9 files changed

+215
-309
lines changed

internal/storagenode/log_server.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import (
77
"time"
88

99
pbtypes "github.com/gogo/protobuf/types"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/metric"
12+
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
1013
"go.uber.org/multierr"
1114
"golang.org/x/sync/errgroup"
1215
"google.golang.org/grpc/codes"
1316
"google.golang.org/grpc/status"
1417

18+
"github.com/kakao/varlog/internal/stats/opentelemetry"
1519
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
1620
"github.com/kakao/varlog/internal/storagenode/logstream"
1721
"github.com/kakao/varlog/internal/storagenode/telemetry"
@@ -118,6 +122,7 @@ func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq cha
118122
lsid = req.LogStreamID
119123
}
120124

125+
appendTask.TopicID = tpid
121126
appendTask.LogStreamID = lsid
122127
appendTask.RPCStartTime = time.Now()
123128

@@ -160,6 +165,7 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c
160165
return nil
161166
}
162167

168+
tpid := appendTask.TopicID
163169
lsid := appendTask.LogStreamID
164170
res, err = appendTask.WaitForCompletion(ctx)
165171
elapsed := time.Since(appendTask.RPCStartTime)
@@ -188,10 +194,18 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c
188194
}
189195
}
190196
if !lsid.Invalid() {
191-
metrics, ok := ls.sn.metrics.GetLogStreamMetrics(lsid)
192-
if ok {
193-
metrics.LogRPCServerDuration.Record(ctx, telemetry.RPCKindAppend, code, elapsed.Microseconds())
194-
}
197+
ls.sn.metrics.LogRPCServerDuration.Record(ctx, lsid, elapsed.Microseconds(), func() []metric.RecordOption {
198+
return []metric.RecordOption{
199+
metric.WithAttributeSet(attribute.NewSet(
200+
opentelemetry.TopicID(tpid),
201+
opentelemetry.LogStreamID(lsid),
202+
semconv.RPCSystemGRPC,
203+
semconv.RPCService(telemetry.ServiceNames[telemetry.RPCKindAppend]),
204+
semconv.RPCMethod(telemetry.MethodNames[telemetry.RPCKindAppend]),
205+
semconv.RPCGRPCStatusCodeKey.Int64(int64(code)),
206+
)),
207+
}
208+
})
195209
}
196210
if err != nil {
197211
return err

internal/storagenode/logstream/append.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import (
55
"sync"
66
"time"
77

8-
"google.golang.org/grpc/codes"
9-
108
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
11-
"github.com/kakao/varlog/internal/storagenode/telemetry"
129
"github.com/kakao/varlog/pkg/types"
1310
"github.com/kakao/varlog/pkg/verrors"
1411
"github.com/kakao/varlog/proto/snpb"
@@ -28,6 +25,7 @@ type AppendTask struct {
2825
apc appendContext
2926
dataBatchLen int
3027

28+
TopicID types.TopicID
3129
LogStreamID types.LogStreamID
3230
RPCStartTime time.Time
3331
}
@@ -117,9 +115,6 @@ func (at *AppendTask) WaitForCompletion(ctx context.Context) ([]snpb.AppendResul
117115
func appendTaskDeferredFunc(at *AppendTask) {
118116
at.lse.inflight.Add(-1)
119117
at.lse.inflightAppend.Add(-1)
120-
if at.lse.lsm != nil {
121-
at.lse.lsm.AppendDuration.Add(time.Since(at.start).Microseconds())
122-
}
123118
}
124119

125120
type appendContext struct {
@@ -154,13 +149,9 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append
154149
var preparationDuration time.Duration
155150
defer func() {
156151
if lse.lsm != nil {
157-
lse.lsm.AppendLogs.Add(int64(dataBatchLen))
158-
lse.lsm.AppendBytes.Add(totalBytes)
159-
lse.lsm.AppendOperations.Add(1)
160-
lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds())
161-
162-
lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, totalBytes)
163-
lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen))
152+
lse.lsm.AppendPreparationDuration.Record(context.Background(), preparationDuration.Microseconds())
153+
lse.lsm.LogRPCServerBatchSize.Record(context.Background(), totalBytes)
154+
lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), int64(dataBatchLen))
164155
}
165156
}()
166157

@@ -199,8 +190,7 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
199190
logEntrySize := int64(len(dataBatch[i]))
200191
totalBytes += logEntrySize
201192
if lse.lsm != nil {
202-
// TODO: Set the correct status code.
203-
lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize)
193+
lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), logEntrySize)
204194
}
205195
}
206196
awg := newAppendWaitGroup(st.wwg, len(dataBatch))

internal/storagenode/logstream/backup_writer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (bw *backupWriter) writeLoop(ctx context.Context) {
7474
}
7575
}
7676

77-
func (bw *backupWriter) writeLoopInternal(_ context.Context, bwt *backupWriteTask) {
77+
func (bw *backupWriter) writeLoopInternal(ctx context.Context, bwt *backupWriteTask) {
7878
startTime := time.Now()
7979
var err error
8080
wb, oldLLSN, newLLSN := bwt.wb, bwt.oldLLSN, bwt.newLLSN
@@ -88,8 +88,7 @@ func (bw *backupWriter) writeLoopInternal(_ context.Context, bwt *backupWriteTas
8888
if bw.lse.lsm == nil {
8989
return
9090
}
91-
bw.lse.lsm.WriterOperationDuration.Add(time.Since(startTime).Microseconds())
92-
bw.lse.lsm.WriterOperations.Add(1)
91+
bw.lse.lsm.WriterOperationDuration.Record(ctx, time.Since(startTime).Microseconds())
9392
}()
9493

9594
if uncommittedLLSNEnd := bw.lse.lsc.uncommittedLLSNEnd.Load(); uncommittedLLSNEnd != oldLLSN {

internal/storagenode/logstream/committer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,8 @@ func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
253253
if cm.lse.lsm == nil {
254254
return
255255
}
256-
cm.lse.lsm.CommitterOperationDuration.Add(int64(time.Since(startTime).Microseconds()))
257-
cm.lse.lsm.CommitterOperations.Add(1)
258-
cm.lse.lsm.CommitterLogs.Add(int64(numCommits))
256+
cm.lse.lsm.CommitterOperationDuration.Record(context.Background(), int64(time.Since(startTime).Microseconds()))
257+
cm.lse.lsm.CommitterLogs.Record(context.Background(), int64(numCommits))
259258
}()
260259

261260
iter := cm.commitWaitQ.peekIterator()

internal/storagenode/logstream/executor.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,21 +177,19 @@ func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataLi
177177
return errors.New("log stream: not backup")
178178
}
179179

180-
var preparationDuration time.Duration
181-
startTime := time.Now()
180+
var startTime, prepEndTime time.Time
182181
dataBytes := int64(0)
183182
batchSize := len(dataList)
184183
defer func() {
185184
if lse.lsm == nil {
186185
return
187186
}
188-
lse.lsm.ReplicateLogs.Add(int64(batchSize))
189-
lse.lsm.ReplicateBytes.Add(dataBytes)
190-
lse.lsm.ReplicateDuration.Add(time.Since(startTime).Microseconds())
191-
lse.lsm.ReplicateOperations.Add(1)
192-
lse.lsm.ReplicatePreparationMicro.Add(preparationDuration.Microseconds())
187+
lse.lsm.ReplicateDuration.Record(ctx, time.Since(startTime).Microseconds())
188+
lse.lsm.ReplicateFanoutDuration.Record(ctx, time.Since(prepEndTime).Microseconds())
193189
}()
194190

191+
startTime = time.Now()
192+
195193
oldLLSN, newLLSN := beginLLSN, beginLLSN+types.LLSN(batchSize)
196194
wb := lse.stg.NewWriteBatch()
197195
for i := 0; i < batchSize; i++ {
@@ -201,7 +199,7 @@ func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataLi
201199
cwt := newCommitWaitTask(nil, batchSize)
202200
bwt := newBackupWriteTask(wb, oldLLSN, newLLSN)
203201

204-
preparationDuration = time.Since(startTime)
202+
prepEndTime = time.Now()
205203

206204
if err := lse.bw.send(ctx, bwt); err != nil {
207205
lse.logger.Error("could not send backup batch write task", zap.Error(err))
@@ -216,6 +214,7 @@ func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataLi
216214
cwt.release()
217215
return err
218216
}
217+
219218
return nil
220219
}
221220

internal/storagenode/logstream/replicate_client.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {
127127
}
128128

129129
// sendLoopInternal sends a replicate task to the backup replica.
130-
func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error {
130+
func (rc *replicateClient) sendLoopInternal(ctx context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error {
131131
startTime := time.Now()
132132
req.Data = rt.dataList
133133
req.BeginLLSN = rt.beginLLSN
@@ -136,8 +136,7 @@ func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask
136136
inflight := rc.inflight.Add(-1)
137137
if rc.lse.lsm != nil {
138138
rc.lse.lsm.ReplicateClientInflightOperations.Store(inflight)
139-
rc.lse.lsm.ReplicateClientOperationDuration.Add(time.Since(startTime).Microseconds())
140-
rc.lse.lsm.ReplicateClientOperations.Add(1)
139+
rc.lse.lsm.ReplicateClientOperationDuration.Record(ctx, time.Since(startTime).Microseconds())
141140
}
142141
return err
143142
}

internal/storagenode/logstream/sequencer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
9494
if sq.lse.lsm == nil {
9595
return
9696
}
97-
sq.lse.lsm.SequencerFanoutDuration.Add(time.Since(operationEndTime).Microseconds())
98-
sq.lse.lsm.SequencerOperationDuration.Add(int64(operationEndTime.Sub(startTime).Microseconds()))
99-
sq.lse.lsm.SequencerOperations.Add(1)
97+
sq.lse.lsm.SequencerFanoutDuration.Record(ctx, time.Since(operationEndTime).Microseconds())
98+
sq.lse.lsm.SequencerOperationDuration.Record(ctx, int64(operationEndTime.Sub(startTime).Microseconds()))
10099
sq.lse.lsm.ReplicateClientInflightOperations.Store(inflight)
101100
}()
102101

internal/storagenode/logstream/writer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (w *writer) writeLoop(ctx context.Context) {
8383
}
8484

8585
// writeLoopInternal stores a batch of writes to the storage and modifies uncommittedLLSNEnd of the log stream which presents the next expected LLSN to be written.
86-
func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
86+
func (w *writer) writeLoopInternal(ctx context.Context, st *sequenceTask) {
8787
startTime := time.Now()
8888
var err error
8989
cnt := len(st.dataBatch)
@@ -98,8 +98,7 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
9898
if w.lse.lsm == nil {
9999
return
100100
}
101-
w.lse.lsm.WriterOperationDuration.Add(time.Since(startTime).Microseconds())
102-
w.lse.lsm.WriterOperations.Add(1)
101+
w.lse.lsm.WriterOperationDuration.Record(ctx, time.Since(startTime).Microseconds())
103102
w.lse.lsm.WriterInflightOperations.Store(inflight)
104103
}()
105104

0 commit comments

Comments
 (0)