Skip to content

Making ingester head compaction jitter Zone aware. #5928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## master / unreleased
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919 #5928
* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
Expand Down
23 changes: 14 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,7 +71,6 @@ const (

// Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently.
compactionIdleTimeoutJitter = 0.25
initialHeadCompactionJitter = 0.5

instanceIngestionRateTickInterval = time.Second

Expand Down Expand Up @@ -2405,20 +2405,25 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
}

func (i *Ingester) compactionLoop(ctx context.Context) error {
// Apply a jitter on the first head compaction
firstHeadCompaction := true
ticker := time.NewTicker(util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval, initialHeadCompactionJitter))
infoFunc := func() (int, int) {
if i.cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled {
zones := i.lifecycler.Zones()
if len(zones) != 0 {
return slices.Index(zones, i.lifecycler.Zone), len(zones)
}
}

// Lets create the slot based on the hash id
i := int(client.HashAdd32(client.HashNew32(), i.lifecycler.ID) % 10)
return i, 10
}
ticker := util.NewSlottedTicker(infoFunc, i.cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval)
defer ticker.Stop()

for ctx.Err() == nil {
select {
case <-ticker.C:
i.compactBlocks(ctx, false, nil)
// Reset the ticker to run the configured interval on the first head compaction
if firstHeadCompaction {
ticker.Reset(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval)
firstHeadCompaction = false
}

case req := <-i.TSDBState.forceCompactTrigger:
i.compactBlocks(ctx, true, req.users)
Expand Down
22 changes: 20 additions & 2 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type Lifecycler struct {
countersLock sync.RWMutex
healthyInstancesCount int
zonesCount int
zones []string

lifecyclerMetrics *LifecyclerMetrics
logger log.Logger
Expand Down Expand Up @@ -426,6 +427,15 @@ func (i *Lifecycler) ZonesCount() int {
return i.zonesCount
}

// Zones returns the zones for which there's at least 1 instance registered
// in the ring.
func (i *Lifecycler) Zones() []string {
i.countersLock.RLock()
defer i.countersLock.RUnlock()

return i.zones
}

// Join trigger the instance to join the ring, if autoJoinOnStartup is set to false.
func (i *Lifecycler) Join() {
select {
Expand Down Expand Up @@ -865,13 +875,13 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error

func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
zones := map[string]struct{}{}
zonesMap := map[string]struct{}{}

if ringDesc != nil {
lastUpdated := i.KVStore.LastUpdateTime(i.RingKey)

for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone] = struct{}{}
zonesMap[ingester.Zone] = struct{}{}

// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, lastUpdated) {
Expand All @@ -880,10 +890,18 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
}
}

zones := make([]string, 0, len(zonesMap))
for z := range zonesMap {
zones = append(zones, z)
}

slices.Sort(zones)

// Update counters
i.countersLock.Lock()
i.healthyInstancesCount = healthyInstancesCount
i.zonesCount = len(zones)
i.zones = zones
i.countersLock.Unlock()
}

Expand Down
88 changes: 88 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"context"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -95,6 +96,19 @@ func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.
return input + time.Duration(jitter)
}

// PositiveJitter returns random duration from "0" to "input*variance" interval.
func PositiveJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration? No jitter.
if input == 0 {
return 0
}

variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance)

return time.Duration(jitter)
}

// NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing
// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.
func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) {
Expand Down Expand Up @@ -136,3 +150,77 @@ func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Durati

return promql.FindMinMaxTime(es)
}

// SlotInfoFunc returns the slot number and the total number of slots
type SlotInfoFunc func() (int, int)

type SlottedTicker struct {
C <-chan time.Time // The channel on which the ticks are delivered.

done func()
d time.Duration
shouldReset bool
ticker *time.Ticker
sf SlotInfoFunc
}

func NewSlottedTicker(sf SlotInfoFunc, d time.Duration) *SlottedTicker {
c := make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
st := &SlottedTicker{
C: c,
done: cancel,
d: d,
sf: sf,
shouldReset: true,
}
slitIndex, totalSlots := sf()
st.ticker = time.NewTicker(st.nextInterval())
go func() {
for ctx.Err() == nil {
select {
case t := <-st.ticker.C:
if i, s := sf(); i != slitIndex || s != totalSlots {
slitIndex, totalSlots = i, s
st.ticker.Reset(st.nextInterval())
st.shouldReset = true
continue
}

c <- t
if st.shouldReset {
st.ticker.Reset(st.d)
}

st.shouldReset = false
case <-ctx.Done():
return
}
}
close(c)
}()
return st
}

func (t *SlottedTicker) Stop() {
t.ticker.Stop()
t.done()
}

func (t *SlottedTicker) nextInterval() time.Duration {
slitIndex, totalSlots := t.sf()

// Discover what time the last iteration started
lastStartTime := time.UnixMilli((time.Now().UnixMilli() / t.d.Milliseconds()) * t.d.Milliseconds())
slotSize := t.d / time.Duration(totalSlots)
offset := time.Duration((float64(slitIndex) / float64(totalSlots)) * float64(t.d))
// Lets offset the time of the iteration
lastStartTime = lastStartTime.Add(offset)

// Keep adding the ticker duration until we pass time.now
for lastStartTime.Before(time.Now()) {
lastStartTime = lastStartTime.Add(t.d)
}

return time.Until(lastStartTime) + PositiveJitter(slotSize, 1)
}
78 changes: 78 additions & 0 deletions pkg/util/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/test"
)

func TestTimeFromMillis(t *testing.T) {
Expand Down Expand Up @@ -188,3 +191,78 @@ func TestFindMinMaxTime(t *testing.T) {
})
}
}

func TestSlottedTicker(t *testing.T) {
testCases := map[string]struct {
duration time.Duration
totalSlots int
slotNumber int
}{
"No Slots should spread across all the interval": {
duration: 100 * time.Millisecond,
totalSlots: 1,
slotNumber: 0,
},
"Get first slot": {
duration: 100 * time.Millisecond,
totalSlots: 5,
slotNumber: 0,
},
"Get 3th slot": {
duration: 100 * time.Millisecond,
totalSlots: 5,
slotNumber: 3,
},
"Get last slot": {
duration: 100 * time.Millisecond,
totalSlots: 5,
slotNumber: 4,
},
}
for name, c := range testCases {
tc := c
t.Run(name, func(t *testing.T) {
infoFunc := func() (int, int) {
return tc.slotNumber, tc.totalSlots
}
ticker := NewSlottedTicker(infoFunc, tc.duration)
tTime := <-ticker.C
slotSize := tc.duration.Milliseconds() / int64(tc.totalSlots)
slotShiftInMs := tTime.UnixMilli() % tc.duration.Milliseconds()
slot := slotShiftInMs / slotSize
successCount := 0
test.Poll(t, 2*time.Second, true, func() interface{} {
if slot == int64(tc.slotNumber) {
successCount++
} else {
successCount--
}

return successCount == 50
})
ticker.Stop()
})
}

t.Run("Change slot size", func(t *testing.T) {
slotSize := atomic.NewInt32(10)
d := 100 * time.Millisecond
infoFunc := func() (int, int) {
return 2, int(slotSize.Load())
}

ticker := NewSlottedTicker(infoFunc, d)

test.Poll(t, 2*time.Second, true, func() interface{} {
tTime := <-ticker.C
slotShiftInMs := tTime.UnixMilli() % d.Milliseconds()
return slotShiftInMs >= 20 && slotShiftInMs <= 30
})
slotSize.Store(5)
test.Poll(t, 2*time.Second, true, func() interface{} {
tTime := <-ticker.C
slotShiftInMs := tTime.UnixMilli() % d.Milliseconds()
return slotShiftInMs >= 40 && slotShiftInMs <= 60
})
})
}
Loading