Skip to content

Commit a14f072

Browse files
pstibranypracucci
andauthored
Querier can now notify about its shutdown without providing any authentication in the context. (#4066)
* Allow querier to call `NotifyClientShutdown` gRPC method without org ID. Signed-off-by: Peter Štibraný <[email protected]> * Added unit test Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Fix imports formatting. Signed-off-by: Peter Štibraný <[email protected]> * Update pkg/cortex/cortex_test.go Signed-off-by: Marco Pracucci <[email protected]> * Fix test registering metrics to global registry. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent 7f85a26 commit a14f072

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [BUGFIX] Compactor: `-compactor.blocks-retention-period` now supports weeks (`w`) and years (`y`). #4027
3939
* [BUGFIX] Querier: returning 422 (instead of 500) when query hits `max_chunks_per_query` limit with block storage, when the limit is hit in the store-gateway. #3937
4040
* [BUGFIX] Ruler: Rule group limit enforcement should now allow the same number of rules in a group as the limit. #3615
41+
* [BUGFIX] Frontend, Query-scheduler: allow querier to notify about shutdown without providing any authentication. #4066
4142

4243
## Blocksconvert
4344

pkg/cortex/cortex.go

+2
Original file line numberDiff line numberDiff line change
@@ -345,8 +345,10 @@ func New(cfg Config) (*Cortex, error) {
345345
"/grpc.health.v1.Health/Check",
346346
"/cortex.Ingester/TransferChunks",
347347
"/frontend.Frontend/Process",
348+
"/frontend.Frontend/NotifyClientShutdown",
348349
"/schedulerpb.SchedulerForFrontend/FrontendLoop",
349350
"/schedulerpb.SchedulerForQuerier/QuerierLoop",
351+
"/schedulerpb.SchedulerForQuerier/NotifyQuerierShutdown",
350352
})
351353

352354
cortex := &Cortex{

pkg/cortex/cortex_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
package cortex
22

33
import (
4+
"context"
5+
"net"
46
"net/url"
7+
"strconv"
58
"testing"
69

10+
"github.com/prometheus/client_golang/prometheus"
711
"github.com/stretchr/testify/require"
12+
"github.com/weaveworks/common/server"
13+
"go.uber.org/atomic"
14+
"google.golang.org/grpc"
815

916
"github.com/cortexproject/cortex/pkg/chunk/aws"
1017
"github.com/cortexproject/cortex/pkg/chunk/storage"
18+
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
1119
"github.com/cortexproject/cortex/pkg/ingester"
1220
"github.com/cortexproject/cortex/pkg/ring"
1321
"github.com/cortexproject/cortex/pkg/ring/kv"
1422
"github.com/cortexproject/cortex/pkg/ruler"
23+
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
1524
"github.com/cortexproject/cortex/pkg/storage/bucket"
1625
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
1726
"github.com/cortexproject/cortex/pkg/storage/tsdb"
@@ -137,3 +146,111 @@ func TestConfigValidation(t *testing.T) {
137146
})
138147
}
139148
}
149+
150+
func TestGrpcAuthMiddleware(t *testing.T) {
151+
prepareGlobalMetricsRegistry(t)
152+
153+
cfg := Config{
154+
AuthEnabled: true, // We must enable this to enable Auth middleware for gRPC server.
155+
Server: getServerConfig(t),
156+
Target: []string{API}, // Something innocent that doesn't require much config.
157+
}
158+
159+
msch := &mockGrpcServiceHandler{}
160+
ctx := context.Background()
161+
162+
// Setup server, using Cortex config. This includes authentication middleware.
163+
{
164+
c, err := New(cfg)
165+
require.NoError(t, err)
166+
167+
serv, err := c.initServer()
168+
require.NoError(t, err)
169+
170+
schedulerpb.RegisterSchedulerForQuerierServer(c.Server.GRPC, msch)
171+
frontendv1pb.RegisterFrontendServer(c.Server.GRPC, msch)
172+
173+
require.NoError(t, services.StartAndAwaitRunning(ctx, serv))
174+
defer func() {
175+
require.NoError(t, services.StopAndAwaitTerminated(ctx, serv))
176+
}()
177+
}
178+
179+
conn, err := grpc.Dial(net.JoinHostPort(cfg.Server.GRPCListenAddress, strconv.Itoa(cfg.Server.GRPCListenPort)), grpc.WithInsecure())
180+
require.NoError(t, err)
181+
defer func() {
182+
require.NoError(t, conn.Close())
183+
}()
184+
185+
{
186+
// Verify that we can call frontendClient.NotifyClientShutdown without user in the context, and we don't get any error.
187+
require.False(t, msch.clientShutdownCalled.Load())
188+
frontendClient := frontendv1pb.NewFrontendClient(conn)
189+
_, err = frontendClient.NotifyClientShutdown(ctx, &frontendv1pb.NotifyClientShutdownRequest{ClientID: "random-client-id"})
190+
require.NoError(t, err)
191+
require.True(t, msch.clientShutdownCalled.Load())
192+
}
193+
194+
{
195+
// Verify that we can call schedulerClient.NotifyQuerierShutdown without user in the context, and we don't get any error.
196+
require.False(t, msch.querierShutdownCalled.Load())
197+
schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn)
198+
_, err = schedulerClient.NotifyQuerierShutdown(ctx, &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: "random-querier-id"})
199+
require.NoError(t, err)
200+
require.True(t, msch.querierShutdownCalled.Load())
201+
}
202+
}
203+
204+
// Generates server config, with gRPC listening on random port.
205+
func getServerConfig(t *testing.T) server.Config {
206+
listen, err := net.Listen("tcp", "localhost:0")
207+
require.NoError(t, err)
208+
209+
host, port, err := net.SplitHostPort(listen.Addr().String())
210+
require.NoError(t, err)
211+
require.NoError(t, listen.Close())
212+
213+
portNum, err := strconv.Atoi(port)
214+
require.NoError(t, err)
215+
216+
return server.Config{
217+
GRPCListenAddress: host,
218+
GRPCListenPort: portNum,
219+
220+
GPRCServerMaxRecvMsgSize: 1024,
221+
}
222+
}
223+
224+
type mockGrpcServiceHandler struct {
225+
clientShutdownCalled atomic.Bool
226+
querierShutdownCalled atomic.Bool
227+
}
228+
229+
func (m *mockGrpcServiceHandler) NotifyClientShutdown(_ context.Context, _ *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
230+
m.clientShutdownCalled.Store(true)
231+
return &frontendv1pb.NotifyClientShutdownResponse{}, nil
232+
}
233+
234+
func (m *mockGrpcServiceHandler) NotifyQuerierShutdown(_ context.Context, _ *schedulerpb.NotifyQuerierShutdownRequest) (*schedulerpb.NotifyQuerierShutdownResponse, error) {
235+
m.querierShutdownCalled.Store(true)
236+
return &schedulerpb.NotifyQuerierShutdownResponse{}, nil
237+
}
238+
239+
func (m *mockGrpcServiceHandler) Process(_ frontendv1pb.Frontend_ProcessServer) error {
240+
panic("implement me")
241+
}
242+
243+
func (m *mockGrpcServiceHandler) QuerierLoop(_ schedulerpb.SchedulerForQuerier_QuerierLoopServer) error {
244+
panic("implement me")
245+
}
246+
247+
func prepareGlobalMetricsRegistry(t *testing.T) {
248+
oldReg, oldGat := prometheus.DefaultRegisterer, prometheus.DefaultGatherer
249+
250+
reg := prometheus.NewRegistry()
251+
prometheus.DefaultRegisterer, prometheus.DefaultGatherer = reg, reg
252+
253+
t.Cleanup(func() {
254+
prometheus.DefaultRegisterer, prometheus.DefaultGatherer = oldReg, oldGat
255+
})
256+
}

0 commit comments

Comments
 (0)