Skip to content

Support Snappy compression for gRPC #2940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2020
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
* [ENHANCEMENT] Flusher: Added `-flusher.exit-after-flush` option (defaults to true) to control whether Cortex should stop completely after Flusher has finished its work. #2877
* [ENHANCEMENT] Added metrics `cortex_config_hash` and `cortex_runtime_config_hash` to expose hash of the currently active config file. #2874
* [ENHANCEMENT] Logger: added JSON logging support, configured via the `-log.format=json` CLI flag or its respective YAML config option. #2386
* [ENHANCEMENT] Support Snappy compression for gRPC. #2940
* `-bigtable.grpc-use-compression`
* `-ingester.client.grpc-use-compression`
* `-querier.frontend-client.grpc-use-compression`
* Deprecated: `-bigtable.grpc-use-gzip-compression`
* Deprecated: `-ingester.client.grpc-use-gzip-compression`
* Deprecated: `-querier.frontend-client.grpc-use-gzip-compression`
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
Expand Down
21 changes: 18 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1774,10 +1774,15 @@ bigtable:
# CLI flag: -bigtable.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -bigtable.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -bigtable.grpc-use-compression
[use_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -bigtable.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2265,10 +2270,15 @@ grpc_client_config:
# CLI flag: -ingester.client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -ingester.client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -ingester.client.grpc-use-compression
[use_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -ingester.client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2338,10 +2348,15 @@ grpc_client_config:
# CLI flag: -querier.frontend-client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -querier.frontend-client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -querier.frontend-client.grpc-use-compression
[use_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -querier.frontend-client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down
6 changes: 5 additions & 1 deletion pkg/chunk/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"cloud.google.com/go/bigtable"
"github.com/go-kit/kit/log"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -54,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// storageClientColumnKey implements chunk.storageClient for GCP.
type storageClientColumnKey struct {
cfg Config
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (cfg *Config) Validate() error {
if err := cfg.CassandraStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid Cassandra Storage config")
}
if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil {
return errors.Wrap(err, "invalid GCP Storage Storage config")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.Wrap(err, "invalid GCP Storage Storage config")
return errors.Wrap(err, "invalid GCP Storage config")

}
if err := cfg.Swift.Validate(); err != nil {
return errors.Wrap(err, "invalid Swift Storage config")
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if err := c.IngesterClient.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester_client config")
}
if err := c.Worker.Validate(log); err != nil {
return errors.Wrap(err, "invalid frontend_worker config")
}
if err := c.QueryRange.Validate(log); err != nil {
return errors.Wrap(err, "invalid queryrange config")
return errors.Wrap(err, "invalid query_range config")
}
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
return errors.Wrap(err, "invalid table_manager config")
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package client
import (
"flag"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
"google.golang.org/grpc/health/grpc_health_v1"

_ "github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy" // get snappy compressor registered
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

Expand Down Expand Up @@ -62,3 +64,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}
4 changes: 4 additions & 0 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}

func (cfg *WorkerConfig) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// Worker is the counter-part to the frontend, actually processing requests.
type worker struct {
cfg WorkerConfig
Expand Down
75 changes: 75 additions & 0 deletions pkg/util/grpc/encoding/snappy/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package snappy

import (
"io"
"sync"

"github.com/golang/snappy"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the snappy compressor.
const Name = "snappy"

func init() {
encoding.RegisterCompressor(&compressor{})
}

var (
// writersPool stores writers
writersPool sync.Pool
// readersPool stores readers
readersPool sync.Pool
)

type compressor struct {
}

func (c *compressor) Name() string {
return Name
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
wr, inPool := writersPool.Get().(*writeCloser)
if !inPool {
return &writeCloser{Writer: snappy.NewBufferedWriter(w)}, nil
}
wr.Reset(w)

return wr, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr, inPool := readersPool.Get().(*reader)
if !inPool {
return &reader{Reader: snappy.NewReader(r)}, nil
}
dr.Reset(r)

return dr, nil
}

type writeCloser struct {
*snappy.Writer
}

func (w *writeCloser) Close() error {
defer func() {
writersPool.Put(w)
}()

return w.Writer.Close()
}

type reader struct {
*snappy.Reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These anonymous embeds now expose snappy.Reader methods. It doesn't even bring anything, as Read is fully reimplemented by reader. In writeCloser it saves reimplementing Write method, although I would still prefer to be explicit. Not a blocker for merging.

}

func (r *reader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if err == io.EOF {
readersPool.Put(r)
}

return n, err
}
32 changes: 30 additions & 2 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package grpcclient
import (
"flag"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/tls"
)

Expand All @@ -15,6 +19,7 @@ type Config struct {
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
UseGzipCompression bool `yaml:"use_gzip_compression"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
UseGzipCompression bool `yaml:"use_gzip_compression"`
UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0.

UseCompression string `yaml:"use_compression"`
RateLimit float64 `yaml:"rate_limit"`
RateLimitBurst int `yaml:"rate_limit_burst"`

Expand All @@ -31,21 +36,40 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages.")
f.StringVar(&cfg.UseCompression, prefix+".grpc-use-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")

cfg.BackoffConfig.RegisterFlags(prefix, f)
}

func (cfg *Config) Validate(log log.Logger) error {
if cfg.UseGzipCompression {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(log).Log("msg", "flag *.grpc-use-gzip-compression (or config use_gzip_compression) is deprecated, use *.grpc-use-compression instead.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a good idea, but we could save the prefix in RegisterFlagsWithPrefix, and then report full flag name here.

}
switch cfg.UseCompression {
case "gzip", "snappy", "":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use snappy.Name instead of the hardcoded "snappy" given we have the constant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this made me go back and look why I didn't use gzip.Name, and it turns out it didn't exist at the time.

// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.UseCompression)
}
return nil
}

// CallOptions returns the config in terms of CallOptions.
func (cfg *Config) CallOptions() []grpc.CallOption {
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize))
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize))
compression := cfg.UseCompression
if cfg.UseGzipCompression {
opts = append(opts, grpc.UseCompressor("gzip"))
compression = "gzip"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
compression = "gzip"
compression = gzip.Name

}
if compression != "" {
opts = append(opts, grpc.UseCompressor(compression))
}
return opts
}
Expand Down Expand Up @@ -79,6 +103,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *ConfigWithTLS) Validate(log log.Logger) error {
return cfg.GRPC.Validate(log)
}

// DialOption returns the config as a grpc.DialOptions
func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
opts, err := cfg.TLS.GetGRPCDialOptions()
Expand Down