diff --git a/CHANGELOG.md b/CHANGELOG.md index 97eaf0728ba..824c46567b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - `-experimental.tsdb.stripe-size` changed to `-experimental.blocks-storage.tsdb.stripe-size` - `-experimental.tsdb.wal-compression-enabled` changed to `-experimental.blocks-storage.tsdb.wal-compression-enabled` - `-experimental.tsdb.flush-blocks-on-shutdown` changed to `-experimental.blocks-storage.tsdb.flush-blocks-on-shutdown` +* [CHANGE] Flags `-bigtable.grpc-use-gzip-compression`, `-ingester.client.grpc-use-gzip-compression`, `-querier.frontend-client.grpc-use-gzip-compression` are now deprecated. #2940 * [CHANGE] Limit errors reported by ingester during query-time now return HTTP status code 422. #2941 * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 @@ -88,6 +89,7 @@ * [ENHANCEMENT] Flusher: Added `-flusher.exit-after-flush` option (defaults to true) to control whether Cortex should stop completely after Flusher has finished its work. #2877 * [ENHANCEMENT] Added metrics `cortex_config_hash` and `cortex_runtime_config_hash` to expose hash of the currently active config file. #2874 * [ENHANCEMENT] Logger: added JSON logging support, configured via the `-log.format=json` CLI flag or its respective YAML config option. #2386 +* [ENHANCEMENT] Added new flags `-bigtable.grpc-compression`, `-ingester.client.grpc-compression`, `-querier.frontend-client.grpc-compression` to configure compression used by gRPC. Valid values are `gzip`, `snappy`, or empty string (no compression, default). #2940 * [ENHANCEMENT] Clarify limitations of the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints. #2953 * [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796 * [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8a8f68aa58a..313e186b3c9 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1774,10 +1774,16 @@ bigtable: # CLI flag: -bigtable.grpc-max-send-msg-size [max_send_msg_size: | default = 16777216] - # Use compression when sending messages. + # Deprecated: Use gzip compression when sending messages. If true, + # overrides grpc-compression flag. # CLI flag: -bigtable.grpc-use-gzip-compression [use_gzip_compression: | default = false] + # Use compression when sending messages. Supported values are: 'gzip', + # 'snappy' and '' (disable compression) + # CLI flag: -bigtable.grpc-compression + [grpc_compression: | default = ""] + # Rate limit for gRPC client; 0 means disabled. # CLI flag: -bigtable.grpc-client-rate-limit [rate_limit: | default = 0] @@ -2265,10 +2271,16 @@ grpc_client_config: # CLI flag: -ingester.client.grpc-max-send-msg-size [max_send_msg_size: | default = 16777216] - # Use compression when sending messages. + # Deprecated: Use gzip compression when sending messages. If true, overrides + # grpc-compression flag. # CLI flag: -ingester.client.grpc-use-gzip-compression [use_gzip_compression: | default = false] + # Use compression when sending messages. Supported values are: 'gzip', + # 'snappy' and '' (disable compression) + # CLI flag: -ingester.client.grpc-compression + [grpc_compression: | default = ""] + # Rate limit for gRPC client; 0 means disabled. # CLI flag: -ingester.client.grpc-client-rate-limit [rate_limit: | default = 0] @@ -2338,10 +2350,16 @@ grpc_client_config: # CLI flag: -querier.frontend-client.grpc-max-send-msg-size [max_send_msg_size: | default = 16777216] - # Use compression when sending messages. + # Deprecated: Use gzip compression when sending messages. If true, overrides + # grpc-compression flag. # CLI flag: -querier.frontend-client.grpc-use-gzip-compression [use_gzip_compression: | default = false] + # Use compression when sending messages. Supported values are: 'gzip', + # 'snappy' and '' (disable compression) + # CLI flag: -querier.frontend-client.grpc-compression + [grpc_compression: | default = ""] + # Rate limit for gRPC client; 0 means disabled. # CLI flag: -querier.frontend-client.grpc-client-rate-limit [rate_limit: | default = 0] diff --git a/pkg/chunk/gcp/bigtable_index_client.go b/pkg/chunk/gcp/bigtable_index_client.go index f5822bdc5a7..434bb40c754 100644 --- a/pkg/chunk/gcp/bigtable_index_client.go +++ b/pkg/chunk/gcp/bigtable_index_client.go @@ -11,9 +11,9 @@ import ( "time" "cloud.google.com/go/bigtable" + "github.com/go-kit/kit/log" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "github.com/pkg/errors" "github.com/cortexproject/cortex/pkg/chunk" @@ -54,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f) } +func (cfg *Config) Validate(log log.Logger) error { + return cfg.GRPCClientConfig.Validate(log) +} + // storageClientColumnKey implements chunk.storageClient for GCP. type storageClientColumnKey struct { cfg Config diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index de4cc6cc415..d63b1d958b1 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -105,6 +105,9 @@ func (cfg *Config) Validate() error { if err := cfg.CassandraStorageConfig.Validate(); err != nil { return errors.Wrap(err, "invalid Cassandra Storage config") } + if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil { + return errors.Wrap(err, "invalid GCP Storage Storage config") + } if err := cfg.Swift.Validate(); err != nil { return errors.Wrap(err, "invalid Swift Storage config") } diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 21e19d86fb0..b7403bce775 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -178,11 +178,17 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Querier.Validate(); err != nil { return errors.Wrap(err, "invalid querier config") } + if err := c.IngesterClient.Validate(log); err != nil { + return errors.Wrap(err, "invalid ingester_client config") + } + if err := c.Worker.Validate(log); err != nil { + return errors.Wrap(err, "invalid frontend_worker config") + } if err := c.QueryRange.Validate(log); err != nil { - return errors.Wrap(err, "invalid queryrange config") + return errors.Wrap(err, "invalid query_range config") } if err := c.TableManager.Validate(); err != nil { - return errors.Wrap(err, "invalid tablemanager config") + return errors.Wrap(err, "invalid table_manager config") } return nil } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 8c5d3b54d30..7ae169785fc 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -3,10 +3,10 @@ package client import ( "flag" + "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" - _ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/util/grpcclient" @@ -62,3 +62,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) } + +func (cfg *Config) Validate(log log.Logger) error { + return cfg.GRPCClientConfig.Validate(log) +} diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 7721b0eb054..c7d897916c2 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -40,6 +40,10 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) } +func (cfg *WorkerConfig) Validate(log log.Logger) error { + return cfg.GRPCClientConfig.Validate(log) +} + // Worker is the counter-part to the frontend, actually processing requests. type worker struct { cfg WorkerConfig diff --git a/pkg/util/grpc/encoding/snappy/snappy.go b/pkg/util/grpc/encoding/snappy/snappy.go new file mode 100644 index 00000000000..fe01b4ca351 --- /dev/null +++ b/pkg/util/grpc/encoding/snappy/snappy.go @@ -0,0 +1,87 @@ +package snappy + +import ( + "io" + "sync" + + "github.com/golang/snappy" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the snappy compressor. +const Name = "snappy" + +func init() { + encoding.RegisterCompressor(newCompressor()) +} + +type compressor struct { + writersPool sync.Pool + readersPool sync.Pool +} + +func newCompressor() *compressor { + c := &compressor{} + c.readersPool = sync.Pool{ + New: func() interface{} { + return snappy.NewReader(nil) + }, + } + c.writersPool = sync.Pool{ + New: func() interface{} { + return snappy.NewBufferedWriter(nil) + }, + } + return c +} + +func (c *compressor) Name() string { + return Name +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + wr := c.writersPool.Get().(*snappy.Writer) + wr.Reset(w) + return writeCloser{wr, &c.writersPool}, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + dr := c.readersPool.Get().(*snappy.Reader) + dr.Reset(r) + return reader{dr, &c.readersPool}, nil +} + +type writeCloser struct { + writer *snappy.Writer + pool *sync.Pool +} + +func (w writeCloser) Write(p []byte) (n int, err error) { + return w.writer.Write(p) +} + +func (w writeCloser) Close() error { + defer func() { + w.writer.Reset(nil) + w.pool.Put(w.writer) + }() + + if w.writer != nil { + return w.writer.Close() + } + return nil +} + +type reader struct { + reader *snappy.Reader + pool *sync.Pool +} + +func (r reader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + if err == io.EOF { + r.reader.Reset(nil) + r.pool.Put(r.reader) + } + return n, err +} diff --git a/pkg/util/grpc/encoding/snappy/snappy_test.go b/pkg/util/grpc/encoding/snappy/snappy_test.go new file mode 100644 index 00000000000..315136f9065 --- /dev/null +++ b/pkg/util/grpc/encoding/snappy/snappy_test.go @@ -0,0 +1,71 @@ +package snappy + +import ( + "bytes" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSnappy(t *testing.T) { + c := newCompressor() + assert.Equal(t, "snappy", c.Name()) + + tests := []struct { + test string + input string + }{ + {"empty", ""}, + {"short", "hello world"}, + {"long", strings.Repeat("123456789", 1024)}, + } + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + var buf bytes.Buffer + // Compress + w, err := c.Compress(&buf) + require.NoError(t, err) + n, err := w.Write([]byte(test.input)) + require.NoError(t, err) + assert.Len(t, test.input, n) + err = w.Close() + require.NoError(t, err) + // Decompress + r, err := c.Decompress(&buf) + require.NoError(t, err) + out, err := ioutil.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, test.input, string(out)) + }) + } +} + +func BenchmarkSnappyCompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + b.ResetTimer() + for i := 0; i < b.N; i++ { + w, _ := c.Compress(ioutil.Discard) + _, _ = w.Write(data) + _ = w.Close() + } +} + +func BenchmarkSnappyDecompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + var buf bytes.Buffer + w, _ := c.Compress(&buf) + _, _ = w.Write(data) + reader := bytes.NewReader(buf.Bytes()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r, _ := c.Decompress(reader) + _, _ = ioutil.ReadAll(r) + _, _ = reader.Seek(0, io.SeekStart) + } +} diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go index f2ef20e9bde..8a73616946b 100644 --- a/pkg/util/grpcclient/grpcclient.go +++ b/pkg/util/grpcclient/grpcclient.go @@ -3,10 +3,16 @@ package grpcclient import ( "flag" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy" "github.com/cortexproject/cortex/pkg/util/tls" ) @@ -14,7 +20,8 @@ import ( type Config struct { MaxRecvMsgSize int `yaml:"max_recv_msg_size"` MaxSendMsgSize int `yaml:"max_send_msg_size"` - UseGzipCompression bool `yaml:"use_gzip_compression"` + UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0. + GRPCCompression string `yaml:"grpc_compression"` RateLimit float64 `yaml:"rate_limit"` RateLimitBurst int `yaml:"rate_limit_burst"` @@ -31,7 +38,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") - f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.") + f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages. If true, overrides grpc-compression flag.") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") @@ -39,13 +47,31 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BackoffConfig.RegisterFlags(prefix, f) } +func (cfg *Config) Validate(log log.Logger) error { + if cfg.UseGzipCompression { + flagext.DeprecatedFlagsUsed.Inc() + level.Warn(log).Log("msg", "running with DEPRECATED option use_gzip_compression, use grpc_compression instead.") + } + switch cfg.GRPCCompression { + case gzip.Name, snappy.Name, "": + // valid + default: + return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + } + return nil +} + // CallOptions returns the config in terms of CallOptions. func (cfg *Config) CallOptions() []grpc.CallOption { var opts []grpc.CallOption opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)) opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize)) + compression := cfg.GRPCCompression if cfg.UseGzipCompression { - opts = append(opts, grpc.UseCompressor("gzip")) + compression = gzip.Name + } + if compression != "" { + opts = append(opts, grpc.UseCompressor(compression)) } return opts } @@ -79,6 +105,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet cfg.TLS.RegisterFlagsWithPrefix(prefix, f) } +func (cfg *ConfigWithTLS) Validate(log log.Logger) error { + return cfg.GRPC.Validate(log) +} + // DialOption returns the config as a grpc.DialOptions func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { opts, err := cfg.TLS.GetGRPCDialOptions()