Skip to content

Support Snappy compression for gRPC #2940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2020
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- `-experimental.tsdb.stripe-size` changed to `-experimental.blocks-storage.tsdb.stripe-size`
- `-experimental.tsdb.wal-compression-enabled` changed to `-experimental.blocks-storage.tsdb.wal-compression-enabled`
- `-experimental.tsdb.flush-blocks-on-shutdown` changed to `-experimental.blocks-storage.tsdb.flush-blocks-on-shutdown`
* [CHANGE] Flags `-bigtable.grpc-use-gzip-compression`, `-ingester.client.grpc-use-gzip-compression`, `-querier.frontend-client.grpc-use-gzip-compression` are now deprecated. #2940
* [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783
* [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783
* [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783
Expand Down Expand Up @@ -87,6 +88,7 @@
* [ENHANCEMENT] Flusher: Added `-flusher.exit-after-flush` option (defaults to true) to control whether Cortex should stop completely after Flusher has finished its work. #2877
* [ENHANCEMENT] Added metrics `cortex_config_hash` and `cortex_runtime_config_hash` to expose hash of the currently active config file. #2874
* [ENHANCEMENT] Logger: added JSON logging support, configured via the `-log.format=json` CLI flag or its respective YAML config option. #2386
* [ENHANCEMENT] Added new flags `-bigtable.grpc-compression`, `-ingester.client.grpc-compression`, `-querier.frontend-client.grpc-compression` to configure compression used by gRPC. Valid values are `gzip`, `snappy`, or empty string (no compression, default). #2940
* [ENHANCEMENT] Clarify limitations of the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints. #2953
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
Expand Down
24 changes: 21 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1774,10 +1774,16 @@ bigtable:
# CLI flag: -bigtable.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages. If true,
# overrides grpc-compression flag.
# CLI flag: -bigtable.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -bigtable.grpc-compression
[grpc_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -bigtable.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2265,10 +2271,16 @@ grpc_client_config:
# CLI flag: -ingester.client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages. If true, overrides
# grpc-compression flag.
# CLI flag: -ingester.client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -ingester.client.grpc-compression
[grpc_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -ingester.client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down Expand Up @@ -2338,10 +2350,16 @@ grpc_client_config:
# CLI flag: -querier.frontend-client.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages.
# Deprecated: Use gzip compression when sending messages. If true, overrides
# grpc-compression flag.
# CLI flag: -querier.frontend-client.grpc-use-gzip-compression
[use_gzip_compression: <boolean> | default = false]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy' and '' (disable compression)
# CLI flag: -querier.frontend-client.grpc-compression
[grpc_compression: <string> | default = ""]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -querier.frontend-client.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
Expand Down
6 changes: 5 additions & 1 deletion pkg/chunk/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"cloud.google.com/go/bigtable"
"github.com/go-kit/kit/log"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -54,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// storageClientColumnKey implements chunk.storageClient for GCP.
type storageClientColumnKey struct {
cfg Config
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (cfg *Config) Validate() error {
if err := cfg.CassandraStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid Cassandra Storage config")
}
if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil {
return errors.Wrap(err, "invalid GCP Storage Storage config")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.Wrap(err, "invalid GCP Storage Storage config")
return errors.Wrap(err, "invalid GCP Storage config")

}
if err := cfg.Swift.Validate(); err != nil {
return errors.Wrap(err, "invalid Swift Storage config")
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if err := c.IngesterClient.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester_client config")
}
if err := c.Worker.Validate(log); err != nil {
return errors.Wrap(err, "invalid frontend_worker config")
}
if err := c.QueryRange.Validate(log); err != nil {
return errors.Wrap(err, "invalid queryrange config")
return errors.Wrap(err, "invalid query_range config")
}
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
return errors.Wrap(err, "invalid table_manager config")
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package client
import (
"flag"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
Expand Down Expand Up @@ -62,3 +62,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
}

func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}
4 changes: 4 additions & 0 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}

func (cfg *WorkerConfig) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
}

// Worker is the counter-part to the frontend, actually processing requests.
type worker struct {
cfg WorkerConfig
Expand Down
83 changes: 83 additions & 0 deletions pkg/util/grpc/encoding/snappy/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package snappy

import (
"io"
"sync"

"github.com/golang/snappy"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the snappy compressor.
const Name = "snappy"

func init() {
encoding.RegisterCompressor(newCompressor())
}

type compressor struct {
writersPool sync.Pool
readersPool sync.Pool
}

func newCompressor() *compressor {
c := &compressor{}
c.readersPool = sync.Pool{
New: func() interface{} {
return snappy.NewReader(nil)
},
}
c.writersPool = sync.Pool{
New: func() interface{} {
return snappy.NewBufferedWriter(nil)
},
}
return c
}

func (c *compressor) Name() string {
return Name
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
wr := c.writersPool.Get().(*snappy.Writer)
wr.Reset(w)
return writeCloser{wr, &c.writersPool}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
return reader{dr, &c.readersPool}, nil
}

type writeCloser struct {
*snappy.Writer
pool *sync.Pool
}

func (w writeCloser) Close() error {
defer func() {
w.Writer.Reset(nil)
w.pool.Put(w.Writer)
}()

if w.Writer != nil {
return w.Writer.Close()
}
return nil
}

type reader struct {
reader *snappy.Reader
pool *sync.Pool
}

func (r reader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err
}
71 changes: 71 additions & 0 deletions pkg/util/grpc/encoding/snappy/snappy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package snappy

import (
"bytes"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSnappy(t *testing.T) {
c := newCompressor()
assert.Equal(t, "snappy", c.Name())

tests := []struct {
test string
input string
}{
{"empty", ""},
{"short", "hello world"},
{"long", strings.Repeat("123456789", 1024)},
}
for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
var buf bytes.Buffer
// Compress
w, err := c.Compress(&buf)
require.NoError(t, err)
n, err := w.Write([]byte(test.input))
require.NoError(t, err)
assert.Len(t, test.input, n)
err = w.Close()
require.NoError(t, err)
// Decompress
r, err := c.Decompress(&buf)
require.NoError(t, err)
out, err := ioutil.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, test.input, string(out))
})
}
}

func BenchmarkSnappyCompress(b *testing.B) {
data := []byte(strings.Repeat("123456789", 1024))
c := newCompressor()
b.ResetTimer()
for i := 0; i < b.N; i++ {
w, _ := c.Compress(ioutil.Discard)
_, _ = w.Write(data)
_ = w.Close()
}
}

func BenchmarkSnappyDecompress(b *testing.B) {
data := []byte(strings.Repeat("123456789", 1024))
c := newCompressor()
var buf bytes.Buffer
w, _ := c.Compress(&buf)
_, _ = w.Write(data)
reader := bytes.NewReader(buf.Bytes())
b.ResetTimer()
for i := 0; i < b.N; i++ {
r, _ := c.Decompress(reader)
_, _ = ioutil.ReadAll(r)
_, _ = reader.Seek(0, io.SeekStart)
}
}
36 changes: 33 additions & 3 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@ package grpcclient
import (
"flag"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy"
"github.com/cortexproject/cortex/pkg/util/tls"
)

// Config for a gRPC client.
type Config struct {
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
UseGzipCompression bool `yaml:"use_gzip_compression"`
UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0.
GRPCCompression string `yaml:"grpc_compression"`
RateLimit float64 `yaml:"rate_limit"`
RateLimitBurst int `yaml:"rate_limit_burst"`

Expand All @@ -31,21 +38,40 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Deprecated: Use gzip compression when sending messages. If true, overrides grpc-compression flag.")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")

cfg.BackoffConfig.RegisterFlags(prefix, f)
}

func (cfg *Config) Validate(log log.Logger) error {
if cfg.UseGzipCompression {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(log).Log("msg", "running with DEPRECATED option use_gzip_compression, use grpc_compression instead.")
}
switch cfg.GRPCCompression {
case gzip.Name, snappy.Name, "":
// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
}
return nil
}

// CallOptions returns the config in terms of CallOptions.
func (cfg *Config) CallOptions() []grpc.CallOption {
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize))
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize))
compression := cfg.GRPCCompression
if cfg.UseGzipCompression {
opts = append(opts, grpc.UseCompressor("gzip"))
compression = gzip.Name
}
if compression != "" {
opts = append(opts, grpc.UseCompressor(compression))
}
return opts
}
Expand Down Expand Up @@ -79,6 +105,10 @@ func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *ConfigWithTLS) Validate(log log.Logger) error {
return cfg.GRPC.Validate(log)
}

// DialOption returns the config as a grpc.DialOptions
func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
opts, err := cfg.TLS.GetGRPCDialOptions()
Expand Down