Skip to content

Metrics for enqueue/flush reason, and dequeue/flush outcome. #2818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* `cortex_bucket_stores_gate_queries_concurrent_max`
* `cortex_bucket_stores_gate_queries_in_flight`
* `cortex_bucket_stores_gate_duration_seconds`
* [CHANGE] Metric `cortex_ingester_flush_reasons` has been renamed to `cortex_ingester_series_flushed_total`, and is now incremented during flush, not when series is enqueued for flushing. #2802
* [CHANGE] Metric `cortex_ingester_flush_reasons` has been renamed to `cortex_ingester_flushing_enqueued_series_total`, and new metric `cortex_ingester_flushing_dequeued_series_total` with `outcome` label (superset of reason) has been added. #2802, #2818
* [CHANGE] Experimental Delete Series: Metric `cortex_purger_oldest_pending_delete_request_age_seconds` would track age of delete requests since they are over their cancellation period instead of their creation time. #2806
* [CHANGE] Experimental TSDB: the store-gateway service is required in a Cortex cluster running with the experimental blocks storage. Removed the `-experimental.tsdb.store-gateway-enabled` CLI flag and `store_gateway_enabled` YAML config option. The store-gateway is now always enabled when the storage engine is `tsdb`. #2822
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
Expand Down
35 changes: 25 additions & 10 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ const (
reasonIdle
reasonStale
reasonSpreadFlush
// Following are flush outcomes
noUser
noSeries
noChunks
flushError
maxFlushReason // Used for testing String() method. Should be last.
)

func (f flushReason) String() string {
Expand All @@ -124,6 +130,14 @@ func (f flushReason) String() string {
return "Stale"
case reasonSpreadFlush:
return "Spread"
case noUser:
return "NoUser"
case noSeries:
return "NoSeries"
case noChunks:
return "NoChunksToFlush"
case flushError:
return "FlushError"
default:
panic("unrecognised flushReason")
}
Expand All @@ -146,6 +160,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo

flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) {
i.metrics.seriesEnqueuedForFlush.WithLabelValues(flush.String()).Inc()
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "nlabels", len(series.metric), "queue", flushQueueIndex)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small note: should we have a metric for the 'else' case here? Which would mean the series is already queued.
I can't see that it's tremendously interesting, but if it was way out of line with expectations that could be interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could signal that chunks are not flushed fast enough, but one can also see that from queue length.

}
Expand Down Expand Up @@ -217,7 +232,8 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
outcome, err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
i.metrics.seriesDequeuedOutcome.WithLabelValues(outcome.String()).Inc()
if err != nil {
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
}
Expand All @@ -231,7 +247,8 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.Fingerprint, immediate bool) error {
// Returns flush outcome (either original reason, if series was flushed, noFlush if it doesn't need flushing anymore, or one of the errors)
func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.Fingerprint, immediate bool) (flushReason, error) {
i.metrics.flushSeriesInProgress.Inc()
defer i.metrics.flushSeriesInProgress.Dec()

Expand All @@ -241,19 +258,19 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.

userState, ok := i.userStates.get(userID)
if !ok {
return nil
return noUser, nil
}

series, ok := userState.fpToSeries.get(fp)
if !ok {
return nil
return noSeries, nil
}

userState.fpLocker.Lock(fp)
reason := i.shouldFlushSeries(series, fp, immediate)
if reason == noFlush {
userState.fpLocker.Unlock(fp)
return nil
return noFlush, nil
}

// shouldFlushSeries() has told us we have at least one chunk.
Expand Down Expand Up @@ -302,11 +319,9 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
}

if len(chunks) == 0 {
return nil
return noChunks, nil
}

i.metrics.flushedSeries.WithLabelValues(reason.String()).Inc()

// flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs.
ctx, cancel := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout)
defer cancel() // releases resources if slowOperation completes before timeout elapses
Expand All @@ -318,7 +333,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "nlabels", len(series.metric), "queue", flushQueueIndex)
err := i.flushChunks(ctx, userID, fp, series.metric, chunks)
if err != nil {
return err
return flushError, err
}

userState.fpLocker.Lock(fp)
Expand All @@ -329,7 +344,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
chunks[i].LastUpdate = model.Now()
}
userState.fpLocker.Unlock(fp)
return nil
return reason, err
}

// must be called under fpLocker lock
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ func newSampleGenerator(t *testing.T, initTime time.Time, step time.Duration) <-

return ts
}

func TestFlushReasonString(t *testing.T) {
for fr := flushReason(0); fr < maxFlushReason; fr++ {
require.True(t, len(fr.String()) > 0)
}
}
13 changes: 9 additions & 4 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type ingesterMetrics struct {
chunkSize prometheus.Histogram
chunkAge prometheus.Histogram
memoryChunks prometheus.Gauge
flushedSeries *prometheus.CounterVec
seriesEnqueuedForFlush *prometheus.CounterVec
seriesDequeuedOutcome *prometheus.CounterVec
droppedChunks prometheus.Counter
oldestUnflushedChunkTimestamp prometheus.Gauge
}
Expand Down Expand Up @@ -192,10 +193,14 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
Name: "cortex_ingester_memory_chunks",
Help: "The total number of chunks in memory.",
}),
flushedSeries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_series_flushed_total",
Help: "Total number of flushed series, with reasons.",
seriesEnqueuedForFlush: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_flushing_enqueued_series_total",
Help: "Total number of series enqueued for flushing, with reasons.",
}, []string{"reason"}),
seriesDequeuedOutcome: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_flushing_dequeued_series_total",
Help: "Total number of series dequeued for flushing, with outcome (superset of enqueue reasons)",
}, []string{"outcome"}),
droppedChunks: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_dropped_chunks_total",
Help: "Total number of chunks dropped from flushing because they have too few samples.",
Expand Down