Skip to content

Support Snappy compression for gRPC #2940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2020
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
- `-experimental.tsdb.stripe-size` changed to `-experimental.blocks-storage.tsdb.stripe-size`
- `-experimental.tsdb.wal-compression-enabled` changed to `-experimental.blocks-storage.tsdb.wal-compression-enabled`
- `-experimental.tsdb.flush-blocks-on-shutdown` changed to `-experimental.blocks-storage.tsdb.flush-blocks-on-shutdown`
* [ENHANCEMENT/CHANGE] Added new flags `-*.grpc-compression` to configure compression used by gRPC. Valid values are "gzip", "snappy", "" (no compression, default). Previous flags (`-*.use-gzip-compression`, enumerate) are now deprecated. #2940
* `-bigtable.grpc-compression`
* `-ingester.client.grpc-compression`
* `-querier.frontend-client.grpc-compression`
* Deprecated: `-bigtable.grpc-use-gzip-compression`
* Deprecated: `-ingester.client.grpc-use-gzip-compression`
* Deprecated: `-querier.frontend-client.grpc-use-gzip-compression`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* [ENHANCEMENT/CHANGE] Added new flags `-*.grpc-compression` to configure compression used by gRPC. Valid values are "gzip", "snappy", "" (no compression, default). Previous flags (`-*.use-gzip-compression`, enumerate) are now deprecated. #2940
* `-bigtable.grpc-compression`
* `-ingester.client.grpc-compression`
* `-querier.frontend-client.grpc-compression`
* Deprecated: `-bigtable.grpc-use-gzip-compression`
* Deprecated: `-ingester.client.grpc-use-gzip-compression`
* Deprecated: `-querier.frontend-client.grpc-use-gzip-compression`
* [ENHANCEMENT] Added new flags `-bigtable.grpc-compression`, `-ingester.client.grpc-compression`, `-querier.frontend-client.grpc-compression` to configure compression used by gRPC. Valid values are `gzip`, `snappy`, or empty string (no compression, default). #2940
* [CHANGE] Flags `-bigtable.grpc-use-gzip-compression`, `-ingester.client.grpc-use-gzip-compression`, `-querier.frontend-client.grpc-use-gzip-compression` are now deprecated. #2940

(Note that they should be moved to their respective sections)

* [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783
* [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783
* [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783
Expand Down
21 changes: 18 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1774,10 +1774,15 @@ bigtable:
# CLI flag: -bigtable.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -bigtable.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -bigtable.grpc-compression
[compression: <string> | default = ""]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be grpc_compression. I'm not sure why the CI linter passed. Could you run make doc again, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you may know, CI don't run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I haven't noticed! Can't understand why the CI is not running on this specific PR 👀


# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -bigtable.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2265,10 +2270,15 @@ grpc_client_config:
# CLI flag: -ingester.client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -ingester.client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -ingester.client.grpc-compression
[compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -ingester.client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2338,10 +2348,15 @@ grpc_client_config:
# CLI flag: -querier.frontend-client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages.
# CLI flag: -querier.frontend-client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -querier.frontend-client.grpc-compression
[compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -querier.frontend-client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down
6 changes: 5 additions & 1 deletion pkg/chunk/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"cloud.google.com/go/bigtable"
"github.com/go-kit/kit/log"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -54,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// storageClientColumnKey implements chunk.storageClient for GCP.
type storageClientColumnKey struct {
cfg Config
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (cfg *Config) Validate() error {
if err := cfg.CassandraStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid Cassandra Storage config")
}
if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil {
return errors.Wrap(err, "invalid GCP Storage Storage config")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.Wrap(err, "invalid GCP Storage Storage config")
return errors.Wrap(err, "invalid GCP Storage config")

}
if err := cfg.Swift.Validate(); err != nil {
return errors.Wrap(err, "invalid Swift Storage config")
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if err := c.IngesterClient.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester_client config")
}
if err := c.Worker.Validate(log); err != nil {
return errors.Wrap(err, "invalid frontend_worker config")
}
if err := c.QueryRange.Validate(log); err != nil {
return errors.Wrap(err, "invalid queryrange config")
return errors.Wrap(err, "invalid query_range config")
}
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
return errors.Wrap(err, "invalid table_manager config")
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package client
import (
"flag"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
"google.golang.org/grpc/health/grpc_health_v1"

_ "github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy" // get snappy compressor registered
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

Expand Down Expand Up @@ -62,3 +64,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}
4 changes: 4 additions & 0 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}

func (cfg *WorkerConfig) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// Worker is the counter-part to the frontend, actually processing requests.
type worker struct {
cfg WorkerConfig
Expand Down
81 changes: 81 additions & 0 deletions pkg/util/grpc/encoding/snappy/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package snappy

import (
"io"
"sync"

"github.com/golang/snappy"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the snappy compressor.
const Name = "snappy"

func init() {
encoding.RegisterCompressor(newCompressor())
}

type compressor struct {
writersPool sync.Pool
readersPool sync.Pool
}

func newCompressor() *compressor {
c := &compressor{}
c.readersPool = sync.Pool{
New: func() interface{} {
return &reader{
compressor: c,
Reader: snappy.NewReader(nil),
}
},
}
c.writersPool = sync.Pool{
New: func() interface{} {
return &writeCloser{
compressor: c,
Writer: snappy.NewBufferedWriter(nil),
}
},
}
return c
}

func (c *compressor) Name() string {
return Name
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
wr := c.writersPool.Get().(*writeCloser)
wr.Reset(w)
return wr, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*reader)
dr.Reset(r)
return dr, nil
}

type writeCloser struct {
*compressor
*snappy.Writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to avoid nameless embeds. They expose more methods than intended (writeCloser now has methods like Name and Compress), and may inadvertently implement methods that were not meant to be implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies to reader.

}

func (w *writeCloser) Close() error {
defer w.writersPool.Put(w)
return w.Writer.Close()
}

type reader struct {
*compressor
*snappy.Reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These anonymous embeds now expose snappy.Reader methods. It doesn't even bring anything, as Read is fully reimplemented by reader. In writeCloser it saves reimplementing Write method, although I would still prefer to be explicit. Not a blocker for merging.

}

func (r *reader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if err == io.EOF {
r.readersPool.Put(r)
}
return n, err
}
36 changes: 34 additions & 2 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package grpcclient

import (
"flag"
"fmt"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/tls"
)

Expand All @@ -15,11 +20,14 @@ type Config struct {
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
UseGzipCompression bool `yaml:"use_gzip_compression"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
UseGzipCompression bool `yaml:"use_gzip_compression"`
UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0.

Compression string `yaml:"compression"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLI flags and YAML config option names should match.

Suggested change
Compression string `yaml:"compression"`
Compression string `yaml:"grpc_compression"`

RateLimit float64 `yaml:"rate_limit"`
RateLimitBurst int `yaml:"rate_limit_burst"`

BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffConfig util.BackoffConfig `yaml:"backoff_config"`

prefix string `yaml:"-"`
}

// RegisterFlags registers flags.
Expand All @@ -29,23 +37,43 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.prefix = prefix
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages.")
f.StringVar(&cfg.Compression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")

cfg.BackoffConfig.RegisterFlags(prefix, f)
}

func (cfg *Config) Validate(log log.Logger) error {
if cfg.UseGzipCompression {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(log).Log("msg", fmt.Sprintf("running with DEPRECATED flag -%s.grpc-use-gzip-compression, use -%s.grpc-use-compression instead.", cfg.prefix, cfg.prefix))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be ... use -%s.grpc-compression now.

}
switch cfg.Compression {
case "gzip", "snappy", "":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use snappy.Name instead of the hardcoded "snappy" given we have the constant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this made me go back and look why I didn't use gzip.Name, and it turns out it didn't exist at the time.

// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.Compression)
}
return nil
}

// CallOptions returns the config in terms of CallOptions.
func (cfg *Config) CallOptions() []grpc.CallOption {
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize))
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize))
compression := cfg.Compression
if cfg.UseGzipCompression {
opts = append(opts, grpc.UseCompressor("gzip"))
compression = "gzip"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
compression = "gzip"
compression = gzip.Name

}
if compression != "" {
opts = append(opts, grpc.UseCompressor(compression))
}
return opts
}
Expand Down Expand Up @@ -79,6 +107,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *ConfigWithTLS) Validate(log log.Logger) error {
return cfg.GRPC.Validate(log)
}

// DialOption returns the config as a grpc.DialOptions
func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
opts, err := cfg.TLS.GetGRPCDialOptions()
Expand Down