-
Notifications
You must be signed in to change notification settings - Fork 816
Support Snappy compression for gRPC #2940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
270a06d
5df98c9
835a6d1
5dd90d3
c775a23
6cae329
87bcb12
0f7a209
97bd88d
0978fec
2f15674
47d77a0
6b4b2bd
fee4259
88d34a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1774,10 +1774,15 @@ bigtable: | |
# CLI flag: -bigtable.grpc-max-send-msg-size | ||
[max_send_msg_size: <int> | default = 16777216] | ||
|
||
# Use compression when sending messages. | ||
# Deprecated: Use gzip compression when sending messages. | ||
# CLI flag: -bigtable.grpc-use-gzip-compression | ||
[use_gzip_compression: <boolean> | default = false] | ||
|
||
# Use compression when sending messages. Supported values are: 'gzip', | ||
# 'snappy' and '' (disable compression) | ||
# CLI flag: -bigtable.grpc-compression | ||
[compression: <string> | default = ""] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect this to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as you may know, CI don't run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I haven't noticed! Can't understand why the CI is not running on this specific PR 👀 |
||
|
||
# Rate limit for gRPC client; 0 means disabled. | ||
# CLI flag: -bigtable.grpc-client-rate-limit | ||
[rate_limit: <float> | default = 0] | ||
|
@@ -2265,10 +2270,15 @@ grpc_client_config: | |
# CLI flag: -ingester.client.grpc-max-send-msg-size | ||
[max_send_msg_size: <int> | default = 16777216] | ||
|
||
# Use compression when sending messages. | ||
# Deprecated: Use gzip compression when sending messages. | ||
# CLI flag: -ingester.client.grpc-use-gzip-compression | ||
[use_gzip_compression: <boolean> | default = false] | ||
|
||
# Use compression when sending messages. Supported values are: 'gzip', | ||
# 'snappy' and '' (disable compression) | ||
# CLI flag: -ingester.client.grpc-compression | ||
[compression: <string> | default = ""] | ||
|
||
# Rate limit for gRPC client; 0 means disabled. | ||
# CLI flag: -ingester.client.grpc-client-rate-limit | ||
[rate_limit: <float> | default = 0] | ||
|
@@ -2338,10 +2348,15 @@ grpc_client_config: | |
# CLI flag: -querier.frontend-client.grpc-max-send-msg-size | ||
[max_send_msg_size: <int> | default = 16777216] | ||
|
||
# Use compression when sending messages. | ||
# Deprecated: Use gzip compression when sending messages. | ||
# CLI flag: -querier.frontend-client.grpc-use-gzip-compression | ||
[use_gzip_compression: <boolean> | default = false] | ||
|
||
# Use compression when sending messages. Supported values are: 'gzip', | ||
# 'snappy' and '' (disable compression) | ||
# CLI flag: -querier.frontend-client.grpc-compression | ||
[compression: <string> | default = ""] | ||
|
||
# Rate limit for gRPC client; 0 means disabled. | ||
# CLI flag: -querier.frontend-client.grpc-client-rate-limit | ||
[rate_limit: <float> | default = 0] | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -105,6 +105,9 @@ func (cfg *Config) Validate() error { | |||||
if err := cfg.CassandraStorageConfig.Validate(); err != nil { | ||||||
return errors.Wrap(err, "invalid Cassandra Storage config") | ||||||
} | ||||||
if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil { | ||||||
return errors.Wrap(err, "invalid GCP Storage Storage config") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
if err := cfg.Swift.Validate(); err != nil { | ||||||
return errors.Wrap(err, "invalid Swift Storage config") | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package snappy | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"io" | ||
"sync" | ||
|
||
"github.com/golang/snappy" | ||
"google.golang.org/grpc/encoding" | ||
) | ||
|
||
// Name is the name registered for the snappy compressor. | ||
const Name = "snappy" | ||
|
||
func init() { | ||
encoding.RegisterCompressor(newCompressor()) | ||
} | ||
|
||
type compressor struct { | ||
pracucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
writersPool sync.Pool | ||
Wing924 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
readersPool sync.Pool | ||
} | ||
|
||
func newCompressor() *compressor { | ||
c := &compressor{} | ||
c.readersPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &reader{ | ||
compressor: c, | ||
Reader: snappy.NewReader(nil), | ||
} | ||
}, | ||
} | ||
c.writersPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &writeCloser{ | ||
compressor: c, | ||
Writer: snappy.NewBufferedWriter(nil), | ||
} | ||
}, | ||
} | ||
return c | ||
} | ||
|
||
func (c *compressor) Name() string { | ||
return Name | ||
} | ||
|
||
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
wr := c.writersPool.Get().(*writeCloser) | ||
wr.Reset(w) | ||
return wr, nil | ||
} | ||
|
||
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { | ||
dr := c.readersPool.Get().(*reader) | ||
dr.Reset(r) | ||
return dr, nil | ||
} | ||
|
||
type writeCloser struct { | ||
*compressor | ||
*snappy.Writer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to avoid nameless embeds. They expose more methods than intended (writeCloser now has methods like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment applies to
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (w *writeCloser) Close() error { | ||
defer w.writersPool.Put(w) | ||
return w.Writer.Close() | ||
} | ||
|
||
type reader struct { | ||
*compressor | ||
*snappy.Reader | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: These anonymous embeds now expose |
||
} | ||
|
||
func (r *reader) Read(p []byte) (n int, err error) { | ||
n, err = r.Reader.Read(p) | ||
if err == io.EOF { | ||
r.readersPool.Put(r) | ||
} | ||
return n, err | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,11 +2,16 @@ package grpcclient | |||||
|
||||||
import ( | ||||||
"flag" | ||||||
"fmt" | ||||||
|
||||||
"github.com/go-kit/kit/log" | ||||||
"github.com/go-kit/kit/log/level" | ||||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
"github.com/pkg/errors" | ||||||
"google.golang.org/grpc" | ||||||
|
||||||
"github.com/cortexproject/cortex/pkg/util" | ||||||
"github.com/cortexproject/cortex/pkg/util/flagext" | ||||||
"github.com/cortexproject/cortex/pkg/util/tls" | ||||||
) | ||||||
|
||||||
|
@@ -15,11 +20,14 @@ type Config struct { | |||||
MaxRecvMsgSize int `yaml:"max_recv_msg_size"` | ||||||
MaxSendMsgSize int `yaml:"max_send_msg_size"` | ||||||
UseGzipCompression bool `yaml:"use_gzip_compression"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
Compression string `yaml:"compression"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CLI flags and YAML config option names should match.
Suggested change
|
||||||
RateLimit float64 `yaml:"rate_limit"` | ||||||
RateLimitBurst int `yaml:"rate_limit_burst"` | ||||||
|
||||||
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` | ||||||
BackoffConfig util.BackoffConfig `yaml:"backoff_config"` | ||||||
|
||||||
prefix string `yaml:"-"` | ||||||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
// RegisterFlags registers flags. | ||||||
|
@@ -29,23 +37,43 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |||||
|
||||||
// RegisterFlagsWithPrefix registers flags with prefix. | ||||||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | ||||||
cfg.prefix = prefix | ||||||
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") | ||||||
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") | ||||||
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.") | ||||||
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages.") | ||||||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
f.StringVar(&cfg.Compression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") | ||||||
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") | ||||||
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") | ||||||
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") | ||||||
|
||||||
cfg.BackoffConfig.RegisterFlags(prefix, f) | ||||||
} | ||||||
|
||||||
func (cfg *Config) Validate(log log.Logger) error { | ||||||
if cfg.UseGzipCompression { | ||||||
flagext.DeprecatedFlagsUsed.Inc() | ||||||
level.Warn(log).Log("msg", fmt.Sprintf("running with DEPRECATED flag -%s.grpc-use-gzip-compression, use -%s.grpc-use-compression instead.", cfg.prefix, cfg.prefix)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||||||
} | ||||||
switch cfg.Compression { | ||||||
case "gzip", "snappy", "": | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this made me go back and look why I didn't use |
||||||
// valid | ||||||
default: | ||||||
return errors.Errorf("unsupported compression type: %s", cfg.Compression) | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
// CallOptions returns the config in terms of CallOptions. | ||||||
func (cfg *Config) CallOptions() []grpc.CallOption { | ||||||
var opts []grpc.CallOption | ||||||
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)) | ||||||
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize)) | ||||||
compression := cfg.Compression | ||||||
if cfg.UseGzipCompression { | ||||||
opts = append(opts, grpc.UseCompressor("gzip")) | ||||||
compression = "gzip" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
if compression != "" { | ||||||
opts = append(opts, grpc.UseCompressor(compression)) | ||||||
} | ||||||
return opts | ||||||
} | ||||||
|
@@ -79,6 +107,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet | |||||
cfg.TLS.RegisterFlagsWithPrefix(prefix, f) | ||||||
} | ||||||
|
||||||
func (cfg *ConfigWithTLS) Validate(log log.Logger) error { | ||||||
return cfg.GRPC.Validate(log) | ||||||
} | ||||||
|
||||||
// DialOption returns the config as a grpc.DialOptions | ||||||
func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { | ||||||
opts, err := cfg.TLS.GetGRPCDialOptions() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Note that they should be moved to their respective sections)