Skip to content

Commit 450bfc6

Browse files
alanprotbogdan-st
authored andcommitted
Update GRPC (cortexproject#6808)
* Upadte GRPC + Create unit test Signed-off-by: alanprot <[email protected]> * Implementing custom grpc codec that does not free the buffer when Unmarshalling Signed-off-by: alanprot <[email protected]> * fix http test Signed-off-by: alanprot <[email protected]> * just changing the proto id Signed-off-by: alanprot <[email protected]> * defer even on error Signed-off-by: alanprot <[email protected]> * Implementing the free on the PushStream Signed-off-by: alanprot <[email protected]> * Using gogo proto methods when possible Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent ce33dbd commit 450bfc6

File tree

129 files changed

+8352
-4933
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

129 files changed

+8352
-4933
lines changed

go.mod

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ require (
136136
github.com/docker/go-units v0.5.0 // indirect
137137
github.com/edsrzf/mmap-go v1.2.0 // indirect
138138
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
139-
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
140139
github.com/fatih/color v1.18.0 // indirect
141140
github.com/felixge/httpsnoop v1.0.4 // indirect
142141
github.com/fsnotify/fsnotify v1.9.0 // indirect
@@ -300,9 +299,6 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
300299
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
301300
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497
302301

303-
// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
304-
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
305-
306302
replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
307303

308304
replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.302.1

go.sum

Lines changed: 66 additions & 1101 deletions
Large diffs are not rendered by default.

integration/grpc_server_test.go

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"flag"
9+
"fmt"
10+
"io"
11+
"math/rand"
12+
"net"
13+
"strconv"
14+
"sync"
15+
"testing"
16+
"time"
17+
18+
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
"github.com/weaveworks/common/server"
22+
"google.golang.org/grpc"
23+
"google.golang.org/grpc/metadata"
24+
25+
"github.com/cortexproject/cortex/pkg/cortexpb"
26+
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
27+
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
28+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
29+
)
30+
31+
type mockGprcServer struct {
32+
ingester_client.IngesterServer
33+
}
34+
35+
func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
36+
md, _ := metadata.FromIncomingContext(streamServer.Context())
37+
i, _ := strconv.Atoi(md["i"][0])
38+
return streamServer.Send(createStreamResponse(i))
39+
}
40+
41+
func (m mockGprcServer) PushStream(srv ingester_client.Ingester_PushStreamServer) error {
42+
for {
43+
req, err := srv.Recv()
44+
if err == io.EOF {
45+
return nil
46+
}
47+
ctx := metadata.NewIncomingContext(srv.Context(), metadata.MD{"i": []string{req.TenantID}})
48+
res, err := m.Push(ctx, req.Request)
49+
req.Free()
50+
if err != nil {
51+
return err
52+
}
53+
err = srv.Send(res)
54+
if err != nil {
55+
return err
56+
}
57+
}
58+
}
59+
60+
func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
61+
defer request.Free()
62+
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
63+
md, _ := metadata.FromIncomingContext(ctx)
64+
i, _ := strconv.Atoi(md["i"][0])
65+
expected := createRequest(i)
66+
// Need to do this so the .String method return the same value for MessageWithBufRef
67+
expected.MessageWithBufRef = request.MessageWithBufRef
68+
69+
if expected.String() != request.String() {
70+
return nil, fmt.Errorf("expected %v, got %v", expected, request)
71+
}
72+
return &cortexpb.WriteResponse{}, nil
73+
}
74+
75+
func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
76+
savedRegistry := prometheus.DefaultRegisterer
77+
prometheus.DefaultRegisterer = prometheus.NewRegistry()
78+
defer func() {
79+
prometheus.DefaultRegisterer = savedRegistry
80+
}()
81+
82+
grpcPort, closeGrpcPort, err := getLocalHostPort()
83+
require.NoError(t, err)
84+
httpPort, closeHTTPPort, err := getLocalHostPort()
85+
require.NoError(t, err)
86+
87+
err = closeGrpcPort()
88+
require.NoError(t, err)
89+
err = closeHTTPPort()
90+
require.NoError(t, err)
91+
92+
cfg.HTTPListenPort = httpPort
93+
cfg.GRPCListenPort = grpcPort
94+
95+
serv, err := server.New(cfg)
96+
require.NoError(t, err)
97+
register(serv.GRPC)
98+
99+
go func() {
100+
err := serv.Run()
101+
require.NoError(t, err)
102+
}()
103+
104+
defer serv.Shutdown()
105+
106+
grpcHost := fmt.Sprintf("localhost:%d", grpcPort)
107+
108+
clientConfig := grpcclient.Config{}
109+
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
110+
111+
dialOptions, err := clientConfig.DialOption(nil, nil)
112+
assert.NoError(t, err)
113+
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)
114+
115+
conn, err := grpc.NewClient(grpcHost, dialOptions...)
116+
assert.NoError(t, err)
117+
validate(t, conn)
118+
}
119+
120+
func TestConcurrentGrpcCalls(t *testing.T) {
121+
cfg := server.Config{}
122+
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
123+
124+
tc := map[string]struct {
125+
cfg server.Config
126+
register func(s *grpc.Server)
127+
validate func(t *testing.T, con *grpc.ClientConn)
128+
}{
129+
"distributor": {
130+
cfg: cfg,
131+
register: func(s *grpc.Server) {
132+
d := &mockGprcServer{}
133+
distributorpb.RegisterDistributorServer(s, d)
134+
},
135+
validate: func(t *testing.T, conn *grpc.ClientConn) {
136+
client := distributorpb.NewDistributorClient(conn)
137+
wg := sync.WaitGroup{}
138+
n := 10000
139+
wg.Add(n)
140+
for i := 0; i < n; i++ {
141+
go func(i int) {
142+
defer wg.Done()
143+
ctx := context.Background()
144+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
145+
_, err := client.Push(ctx, createRequest(i))
146+
require.NoError(t, err)
147+
}(i)
148+
}
149+
150+
wg.Wait()
151+
},
152+
},
153+
"distributor push stream": {
154+
cfg: cfg,
155+
register: func(s *grpc.Server) {
156+
d := &mockGprcServer{}
157+
ingester_client.RegisterIngesterServer(s, d)
158+
},
159+
validate: func(t *testing.T, conn *grpc.ClientConn) {
160+
ctx := context.Background()
161+
client := ingester_client.NewIngesterClient(conn)
162+
wg := sync.WaitGroup{}
163+
n := 10000
164+
wg.Add(n)
165+
for i := 0; i < n; i++ {
166+
go func(i int) {
167+
defer wg.Done()
168+
stream, err := client.PushStream(ctx)
169+
require.NoError(t, err)
170+
171+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
172+
err = stream.Send(&cortexpb.StreamWriteRequest{TenantID: strconv.Itoa(i), Request: createRequest(i)})
173+
require.NoError(t, err)
174+
_, err = stream.Recv()
175+
require.NoError(t, err)
176+
//err = stream.Send(&cortexpb.StreamWriteRequest{"i", createRequest(i + 1)})
177+
//require.NoError(t, err)
178+
require.NoError(t, stream.CloseSend())
179+
}(i)
180+
}
181+
182+
wg.Wait()
183+
},
184+
},
185+
"ingester": {
186+
cfg: cfg,
187+
register: func(s *grpc.Server) {
188+
d := &mockGprcServer{}
189+
ingester_client.RegisterIngesterServer(s, d)
190+
},
191+
validate: func(t *testing.T, conn *grpc.ClientConn) {
192+
client := ingester_client.NewIngesterClient(conn)
193+
wg := sync.WaitGroup{}
194+
n := 10000
195+
wg.Add(n)
196+
for i := 0; i < n; i++ {
197+
go func(i int) {
198+
defer wg.Done()
199+
ctx := context.Background()
200+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
201+
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
202+
require.NoError(t, err)
203+
resp, err := s.Recv()
204+
require.NoError(t, err)
205+
expected := createStreamResponse(i)
206+
require.Equal(t, expected.String(), resp.String())
207+
}(i)
208+
}
209+
210+
wg.Wait()
211+
},
212+
},
213+
}
214+
215+
for name, c := range tc {
216+
t.Run(name, func(t *testing.T) {
217+
run(t, c.cfg, c.register, c.validate)
218+
})
219+
}
220+
}
221+
222+
func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
223+
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
224+
{
225+
FromIngesterId: strconv.Itoa(i),
226+
Labels: createLabels(i),
227+
Chunks: []ingester_client.Chunk{
228+
{
229+
StartTimestampMs: int64(i),
230+
EndTimestampMs: int64(i),
231+
Encoding: int32(i),
232+
Data: []byte(strconv.Itoa(i)),
233+
},
234+
},
235+
},
236+
}}
237+
}
238+
239+
func createRequest(i int) *cortexpb.WriteRequest {
240+
labels := createLabels(i)
241+
return &cortexpb.WriteRequest{
242+
Timeseries: []cortexpb.PreallocTimeseries{
243+
{
244+
TimeSeries: &cortexpb.TimeSeries{
245+
Labels: labels,
246+
Samples: []cortexpb.Sample{
247+
{TimestampMs: int64(i), Value: float64(i)},
248+
},
249+
Exemplars: []cortexpb.Exemplar{
250+
{
251+
Labels: labels,
252+
Value: float64(i),
253+
TimestampMs: int64(i),
254+
},
255+
},
256+
},
257+
},
258+
},
259+
}
260+
}
261+
262+
func createLabels(i int) []cortexpb.LabelAdapter {
263+
labels := make([]cortexpb.LabelAdapter, 0, 100)
264+
for j := 0; j < 100; j++ {
265+
labels = append(labels, cortexpb.LabelAdapter{
266+
Name: fmt.Sprintf("test%d_%d", i, j),
267+
Value: fmt.Sprintf("test%d_%d", i, j),
268+
})
269+
}
270+
return labels
271+
}
272+
273+
func getLocalHostPort() (int, func() error, error) {
274+
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
275+
if err != nil {
276+
return 0, nil, err
277+
}
278+
279+
l, err := net.ListenTCP("tcp", addr)
280+
if err != nil {
281+
return 0, nil, err
282+
}
283+
284+
closePort := func() error {
285+
return l.Close()
286+
}
287+
return l.Addr().(*net.TCPAddr).Port, closePort, nil
288+
}

0 commit comments

Comments
 (0)