Skip to content

Commit fb1732a

Browse files
author
Tyler Reid
committed
Merge remote-tracking branch 'upstream/master' into query-metering
2 parents 7e76fbd + fdb1106 commit fb1732a

File tree

11 files changed

+60
-28
lines changed

11 files changed

+60
-28
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298
56
* [CHANGE] Cortex chunks storage has been deprecated and it's now in maintenance mode: all Cortex users are encouraged to migrate to the blocks storage. No new features will be added to the chunks storage. The default Cortex configuration still runs the chunks engine; please check out the [blocks storage doc](https://cortexmetrics.io/docs/blocks-storage/) on how to configure Cortex to run with the blocks storage. #4268
67
* [CHANGE] The example Kubernetes manifests (stored at `k8s/`) have been removed due to a lack of proper support and maintenance. #4268
78
* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125

pkg/api/api.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,16 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) {
351351
a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, "GET", "POST")
352352
}
353353

354+
type Distributor interface {
355+
querier.Distributor
356+
UserStatsHandler(w http.ResponseWriter, r *http.Request)
357+
}
358+
354359
// RegisterQueryable registers the the default routes associated with the querier
355360
// module.
356361
func (a *API) RegisterQueryable(
357362
queryable storage.SampleAndChunkQueryable,
358-
distributor *distributor.Distributor,
363+
distributor Distributor,
359364
) {
360365
// these routes are always registered to the default server
361366
a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")

pkg/api/handlers.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/weaveworks/common/middleware"
2424

2525
"github.com/cortexproject/cortex/pkg/chunk/purger"
26-
"github.com/cortexproject/cortex/pkg/distributor"
2726
"github.com/cortexproject/cortex/pkg/querier"
2827
"github.com/cortexproject/cortex/pkg/querier/stats"
2928
"github.com/cortexproject/cortex/pkg/util"
@@ -160,7 +159,7 @@ func NewQuerierHandler(
160159
queryable storage.SampleAndChunkQueryable,
161160
exemplarQueryable storage.ExemplarQueryable,
162161
engine *promql.Engine,
163-
distributor *distributor.Distributor,
162+
distributor Distributor,
164163
tombstonesLoader *purger.TombstonesLoader,
165164
reg prometheus.Registerer,
166165
logger log.Logger,

pkg/compactor/compactor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,9 @@ func (c *Compactor) starting(ctx context.Context) error {
408408

409409
level.Info(c.logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
410410
if err := ring.WaitRingStability(ctx, c.ring, RingOp, minWaiting, maxWaiting); err != nil {
411-
level.Warn(c.logger).Log("msg", "compactor is ring topology is not stable after the max waiting time, proceeding anyway")
411+
level.Warn(c.logger).Log("msg", "compactor ring topology is not stable after the max waiting time, proceeding anyway")
412412
} else {
413-
level.Info(c.logger).Log("msg", "compactor is ring topology is stable")
413+
level.Info(c.logger).Log("msg", "compactor ring topology is stable")
414414
}
415415
}
416416
}

pkg/distributor/distributor_test.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -1012,12 +1012,15 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
10121012
flagext.DefaultValues(limits)
10131013

10141014
// Prepare distributors.
1015+
// Use replication factor of 2 to always read all the chunks from both ingesters,
1016+
// this guarantees us to always read the same chunks and have a stable test.
10151017
ds, _, r, _ := prepare(t, prepConfig{
1016-
numIngesters: 3,
1017-
happyIngesters: 3,
1018-
numDistributors: 1,
1019-
shardByAllLabels: true,
1020-
limits: limits,
1018+
numIngesters: 2,
1019+
happyIngesters: 2,
1020+
numDistributors: 1,
1021+
shardByAllLabels: true,
1022+
limits: limits,
1023+
replicationFactor: 2,
10211024
})
10221025
defer stopAll(ds, r)
10231026

@@ -1893,6 +1896,7 @@ type prepConfig struct {
18931896
skipLabelNameValidation bool
18941897
maxInflightRequests int
18951898
maxIngestionRate float64
1899+
replicationFactor int
18961900
}
18971901

18981902
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
@@ -1935,12 +1939,18 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
19351939
)
19361940
require.NoError(t, err)
19371941

1942+
// Use a default replication factor of 3 if there isn't a provided replication factor.
1943+
rf := cfg.replicationFactor
1944+
if rf == 0 {
1945+
rf = 3
1946+
}
1947+
19381948
ingestersRing, err := ring.New(ring.Config{
19391949
KVStore: kv.Config{
19401950
Mock: kvStore,
19411951
},
19421952
HeartbeatTimeout: 60 * time.Minute,
1943-
ReplicationFactor: 3,
1953+
ReplicationFactor: rf,
19441954
}, ring.IngesterRingKey, ring.IngesterRingKey, nil)
19451955
require.NoError(t, err)
19461956
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))

pkg/ruler/compat.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Pusher interface {
2727
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
2828
}
2929

30-
type pusherAppender struct {
30+
type PusherAppender struct {
3131
failedWrites prometheus.Counter
3232
totalWrites prometheus.Counter
3333

@@ -39,7 +39,7 @@ type pusherAppender struct {
3939
evaluationDelay time.Duration
4040
}
4141

42-
func (a *pusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
42+
func (a *PusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
4343
a.labels = append(a.labels, l)
4444

4545
// Adapt staleness markers for ruler evaluation delay. As the upstream code
@@ -61,11 +61,11 @@ func (a *pusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (
6161
return 0, nil
6262
}
6363

64-
func (a *pusherAppender) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) {
64+
func (a *PusherAppender) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) {
6565
return 0, errors.New("exemplars are unsupported")
6666
}
6767

68-
func (a *pusherAppender) Commit() error {
68+
func (a *PusherAppender) Commit() error {
6969
a.totalWrites.Inc()
7070

7171
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
@@ -84,7 +84,7 @@ func (a *pusherAppender) Commit() error {
8484
return err
8585
}
8686

87-
func (a *pusherAppender) Rollback() error {
87+
func (a *PusherAppender) Rollback() error {
8888
a.labels = nil
8989
a.samples = nil
9090
return nil
@@ -112,7 +112,7 @@ func NewPusherAppendable(pusher Pusher, userID string, limits RulesLimits, total
112112

113113
// Appender returns a storage.Appender
114114
func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
115-
return &pusherAppender{
115+
return &PusherAppender{
116116
failedWrites: t.failedWrites,
117117
totalWrites: t.totalWrites,
118118

@@ -131,9 +131,9 @@ type RulesLimits interface {
131131
RulerMaxRulesPerRuleGroup(userID string) int
132132
}
133133

134-
// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
134+
// EngineQueryFunc returns a new query function using the rules.EngineQueryFunc function
135135
// and passing an altered timestamp.
136-
func engineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc {
136+
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc {
137137
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
138138
orig := rules.EngineQueryFunc(engine, q)
139139
// Delay the evaluation of all rules by a set interval to give a buffer
@@ -143,7 +143,7 @@ func engineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides Rules
143143
}
144144
}
145145

146-
func metricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter, queryTime *prometheus.CounterVec, userID string) rules.QueryFunc {
146+
func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter, queryTime *prometheus.CounterVec, userID string) rules.QueryFunc {
147147
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
148148
queries.Inc()
149149

@@ -219,7 +219,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
219219
return rules.NewManager(&rules.ManagerOptions{
220220
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
221221
Queryable: q,
222-
QueryFunc: metricsQueryFunc(engineQueryFunc(engine, q, overrides, userID), totalQueries, failedQueries, rulerQuerySeconds, userID),
222+
QueryFunc: MetricsQueryFunc(EngineQueryFunc(engine, q, overrides, userID), totalQueries, failedQueries, rulerQuerySeconds, userID),
223223
Context: user.InjectOrgID(ctx, userID),
224224
ExternalURL: cfg.ExternalURL.URL,
225225
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
@@ -230,4 +230,4 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
230230
ResendDelay: cfg.ResendDelay,
231231
})
232232
}
233-
}
233+
}

pkg/ruler/compat_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
232232
return promql.Vector{}, tc.returnedError
233233
}
234234

235-
qf := metricsQueryFunc(mockFunc, queries, failures, queryTime, "user")
235+
qf := MetricsQueryFunc(mockFunc, queries, failures, queryTime, "user")
236236

237237
_, err := qf(context.Background(), "test", time.Now())
238238
require.Equal(t, tc.returnedError, err)
@@ -241,4 +241,4 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
241241
require.Equal(t, tc.expectedFailedQueries, int(testutil.ToFloat64(failures)))
242242
})
243243
}
244-
}
244+
}

pkg/storegateway/gateway.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
242242

243243
level.Info(g.logger).Log("msg", "waiting until store-gateway ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
244244
if err := ring.WaitRingStability(ctx, g.ring, BlocksOwnerSync, minWaiting, maxWaiting); err != nil {
245-
level.Warn(g.logger).Log("msg", "store-gateway is ring topology is not stable after the max waiting time, proceeding anyway")
245+
level.Warn(g.logger).Log("msg", "store-gateway ring topology is not stable after the max waiting time, proceeding anyway")
246246
} else {
247-
level.Info(g.logger).Log("msg", "store-gateway is ring topology is stable")
247+
level.Info(g.logger).Log("msg", "store-gateway ring topology is stable")
248248
}
249249
}
250250
}

pkg/storegateway/gateway_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
300300
t.Log("random generator seed:", seed)
301301

302302
ctx := context.Background()
303-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
303+
ringStore := consul.NewInMemoryClientWithConfig(ring.GetCodec(), consul.Config{
304+
MaxCasRetries: 20,
305+
CasRetryDelay: 500 * time.Millisecond,
306+
})
304307

305308
// Create the configured number of gateways.
306309
var gateways []*StoreGateway

pkg/util/validation/limits.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package validation
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"errors"
67
"flag"
@@ -229,7 +230,10 @@ func (l *Limits) UnmarshalJSON(data []byte) error {
229230
}
230231

231232
type plain Limits
232-
return json.Unmarshal(data, (*plain)(l))
233+
dec := json.NewDecoder(bytes.NewReader(data))
234+
dec.DisallowUnknownFields()
235+
236+
return dec.Decode((*plain)(l))
233237
}
234238

235239
func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimitMap) {

pkg/util/validation/limits_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package validation
33
import (
44
"encoding/json"
55
"reflect"
6+
"strings"
67
"testing"
78
"time"
89

@@ -165,6 +166,15 @@ func TestLimitsLoadingFromJson(t *testing.T) {
165166

166167
assert.Equal(t, 0.5, l.IngestionRate, "from json")
167168
assert.Equal(t, 100, l.MaxLabelNameLength, "from defaults")
169+
170+
// Unmarshal should fail if input contains unknown struct fields and
171+
// the decoder flag `json.Decoder.DisallowUnknownFields()` is set
172+
inp = `{"unknown_fields": 100}`
173+
l = Limits{}
174+
dec := json.NewDecoder(strings.NewReader(inp))
175+
dec.DisallowUnknownFields()
176+
err = dec.Decode(&l)
177+
assert.Error(t, err)
168178
}
169179

170180
func TestLimitsTagsYamlMatchJson(t *testing.T) {

0 commit comments

Comments
 (0)