-
Notifications
You must be signed in to change notification settings - Fork 816
Support Snappy compression for gRPC #2940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
270a06d
5df98c9
835a6d1
5dd90d3
c775a23
6cae329
87bcb12
0f7a209
97bd88d
0978fec
2f15674
47d77a0
6b4b2bd
fee4259
88d34a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -105,6 +105,9 @@ func (cfg *Config) Validate() error { | |||||
if err := cfg.CassandraStorageConfig.Validate(); err != nil { | ||||||
return errors.Wrap(err, "invalid Cassandra Storage config") | ||||||
} | ||||||
if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil { | ||||||
return errors.Wrap(err, "invalid GCP Storage Storage config") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
if err := cfg.Swift.Validate(); err != nil { | ||||||
return errors.Wrap(err, "invalid Swift Storage config") | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package snappy | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"io" | ||
"sync" | ||
|
||
"github.com/golang/snappy" | ||
"google.golang.org/grpc/encoding" | ||
) | ||
|
||
// Name is the name registered for the snappy compressor. | ||
const Name = "snappy" | ||
|
||
func init() { | ||
encoding.RegisterCompressor(newCompressor()) | ||
} | ||
|
||
type compressor struct { | ||
pracucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
writersPool sync.Pool | ||
Wing924 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
readersPool sync.Pool | ||
} | ||
|
||
func newCompressor() *compressor { | ||
c := &compressor{} | ||
c.readersPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &reader{ | ||
pool: &c.readersPool, | ||
Reader: snappy.NewReader(nil), | ||
} | ||
}, | ||
} | ||
c.writersPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &writeCloser{ | ||
pool: &c.writersPool, | ||
Writer: snappy.NewBufferedWriter(nil), | ||
} | ||
}, | ||
} | ||
return c | ||
} | ||
|
||
func (c *compressor) Name() string { | ||
return Name | ||
} | ||
|
||
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
wr := c.writersPool.Get().(*writeCloser) | ||
wr.Reset(w) | ||
return wr, nil | ||
} | ||
|
||
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { | ||
dr := c.readersPool.Get().(*reader) | ||
dr.Reset(r) | ||
return dr, nil | ||
} | ||
|
||
type writeCloser struct { | ||
*snappy.Writer | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pool *sync.Pool | ||
} | ||
|
||
func (w *writeCloser) Close() error { | ||
defer w.pool.Put(w) | ||
return w.Writer.Close() | ||
} | ||
|
||
type reader struct { | ||
*snappy.Reader | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: These anonymous embeds now expose |
||
pool *sync.Pool | ||
} | ||
|
||
func (r *reader) Read(p []byte) (n int, err error) { | ||
n, err = r.Reader.Read(p) | ||
if err == io.EOF { | ||
r.pool.Put(r) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: should we reset |
||
} | ||
return n, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package snappy | ||
|
||
import ( | ||
"bytes" | ||
"io" | ||
"io/ioutil" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSnappy(t *testing.T) { | ||
c := newCompressor() | ||
assert.Equal(t, "snappy", c.Name()) | ||
|
||
tests := []struct { | ||
test string | ||
input string | ||
}{ | ||
{"empty", ""}, | ||
{"short", "hello world"}, | ||
{"long", strings.Repeat("123456789", 1024)}, | ||
} | ||
for _, test := range tests { | ||
t.Run(test.test, func(t *testing.T) { | ||
var buf bytes.Buffer | ||
// Compress | ||
w, err := c.Compress(&buf) | ||
require.NoError(t, err) | ||
n, err := w.Write([]byte(test.input)) | ||
require.NoError(t, err) | ||
assert.Len(t, test.input, n) | ||
err = w.Close() | ||
require.NoError(t, err) | ||
// Decompress | ||
r, err := c.Decompress(&buf) | ||
require.NoError(t, err) | ||
out, err := ioutil.ReadAll(r) | ||
require.NoError(t, err) | ||
assert.Equal(t, test.input, string(out)) | ||
}) | ||
} | ||
} | ||
|
||
func BenchmarkSnappyCompress(b *testing.B) { | ||
data := []byte(strings.Repeat("123456789", 1024)) | ||
c := newCompressor() | ||
w, _ := c.Compress(ioutil.Discard) | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
_, _ = w.Write(data) | ||
_ = w.Close() | ||
} | ||
} | ||
|
||
func BenchmarkSnappyDecompress(b *testing.B) { | ||
data := []byte(strings.Repeat("123456789", 1024)) | ||
c := newCompressor() | ||
var buf bytes.Buffer | ||
w, _ := c.Compress(&buf) | ||
_, _ = w.Write(data) | ||
reader := bytes.NewReader(buf.Bytes()) | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
r, _ := c.Decompress(reader) | ||
_, _ = ioutil.ReadAll(r) | ||
_, _ = reader.Seek(0, io.SeekStart) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,10 +3,16 @@ package grpcclient | |||||
import ( | ||||||
"flag" | ||||||
|
||||||
"github.com/go-kit/kit/log" | ||||||
"github.com/go-kit/kit/log/level" | ||||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
"github.com/pkg/errors" | ||||||
"google.golang.org/grpc" | ||||||
"google.golang.org/grpc/encoding/gzip" | ||||||
|
||||||
"github.com/cortexproject/cortex/pkg/util" | ||||||
"github.com/cortexproject/cortex/pkg/util/flagext" | ||||||
"github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy" | ||||||
"github.com/cortexproject/cortex/pkg/util/tls" | ||||||
) | ||||||
|
||||||
|
@@ -15,6 +21,7 @@ type Config struct { | |||||
MaxRecvMsgSize int `yaml:"max_recv_msg_size"` | ||||||
MaxSendMsgSize int `yaml:"max_send_msg_size"` | ||||||
UseGzipCompression bool `yaml:"use_gzip_compression"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
GRPCCompression string `yaml:"grpc_compression"` | ||||||
RateLimit float64 `yaml:"rate_limit"` | ||||||
RateLimitBurst int `yaml:"rate_limit_burst"` | ||||||
|
||||||
|
@@ -31,21 +38,40 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |||||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | ||||||
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") | ||||||
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") | ||||||
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.") | ||||||
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages. If true, overrides grpc-compression flag.") | ||||||
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") | ||||||
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") | ||||||
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") | ||||||
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") | ||||||
|
||||||
cfg.BackoffConfig.RegisterFlags(prefix, f) | ||||||
} | ||||||
|
||||||
func (cfg *Config) Validate(log log.Logger) error { | ||||||
if cfg.UseGzipCompression { | ||||||
flagext.DeprecatedFlagsUsed.Inc() | ||||||
level.Warn(log).Log("msg", "running with DEPRECATED option use_gzip_compression, use grpc_compression instead.") | ||||||
} | ||||||
switch cfg.GRPCCompression { | ||||||
case gzip.Name, snappy.Name, "": | ||||||
// valid | ||||||
default: | ||||||
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
// CallOptions returns the config in terms of CallOptions. | ||||||
func (cfg *Config) CallOptions() []grpc.CallOption { | ||||||
var opts []grpc.CallOption | ||||||
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)) | ||||||
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize)) | ||||||
compression := cfg.GRPCCompression | ||||||
if cfg.UseGzipCompression { | ||||||
opts = append(opts, grpc.UseCompressor("gzip")) | ||||||
compression = "gzip" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
if compression != "" { | ||||||
opts = append(opts, grpc.UseCompressor(compression)) | ||||||
} | ||||||
return opts | ||||||
} | ||||||
|
@@ -79,6 +105,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet | |||||
cfg.TLS.RegisterFlagsWithPrefix(prefix, f) | ||||||
} | ||||||
|
||||||
func (cfg *ConfigWithTLS) Validate(log log.Logger) error { | ||||||
return cfg.GRPC.Validate(log) | ||||||
} | ||||||
|
||||||
// DialOption returns the config as a grpc.DialOptions | ||||||
func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { | ||||||
opts, err := cfg.TLS.GetGRPCDialOptions() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be
grpc_compression
. I'm not sure why the CI linter passed. Could you runmake doc
again, please?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you may know, CI don't run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I haven't noticed! Can't understand why the CI is not running on this specific PR 👀