Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration of Cassandra's host selection policy #4069

Merged
merged 3 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [ENHANCEMENT] Distributor: added per-distributor limits: max number of inflight requests (`-distributor.instance-limits.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.instance-limits.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_instance_limits` with various `limit` label values). #4071
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
* [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,11 @@ cassandra:
# CLI flag: -cassandra.host-verification
[host_verification: <boolean> | default = true]

# Policy for selecting Cassandra host. Supported values are: round-robin,
# token-aware.
# CLI flag: -cassandra.host-selection-policy
[host_selection_policy: <string> | default = "round-robin"]

# Path to certificate file to verify the peer.
# CLI flag: -cassandra.ca-path
[CA_path: <string> | default = ""]
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
DisableInitialHostLookup bool `yaml:"disable_initial_host_lookup"`
SSL bool `yaml:"SSL"`
HostVerification bool `yaml:"host_verification"`
HostSelectionPolicy string `yaml:"host_selection_policy"`
CAPath string `yaml:"CA_path"`
CertPath string `yaml:"tls_cert_path"`
KeyPath string `yaml:"tls_key_path"`
Expand All @@ -53,6 +54,11 @@ type Config struct {
TableOptions string `yaml:"table_options"`
}

const (
HostPolicyRoundRobin = "round-robin"
HostPolicyTokenAware = "token-aware"
)

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Addresses, "cassandra.addresses", "", "Comma-separated hostnames or IPs of Cassandra instances.")
Expand All @@ -63,6 +69,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.DisableInitialHostLookup, "cassandra.disable-initial-host-lookup", false, "Instruct the cassandra driver to not attempt to get host info from the system.peers table.")
f.BoolVar(&cfg.SSL, "cassandra.ssl", false, "Use SSL when connecting to cassandra instances.")
f.BoolVar(&cfg.HostVerification, "cassandra.host-verification", true, "Require SSL certificate validation.")
f.StringVar(&cfg.HostSelectionPolicy, "cassandra.host-selection-policy", HostPolicyRoundRobin, "Policy for selecting Cassandra host. Supported values are: round-robin, token-aware.")
f.StringVar(&cfg.CAPath, "cassandra.ca-path", "", "Path to certificate file to verify the peer.")
f.StringVar(&cfg.CertPath, "cassandra.tls-cert-path", "", "Path to certificate file used by TLS.")
f.StringVar(&cfg.KeyPath, "cassandra.tls-key-path", "", "Path to private key file used by TLS.")
Expand Down Expand Up @@ -180,6 +187,15 @@ func (cfg *Config) setClusterConfig(cluster *gocql.ClusterConfig) error {
}
}
}

if cfg.HostSelectionPolicy == HostPolicyRoundRobin {
cluster.PoolConfig.HostSelectionPolicy = gocql.RoundRobinHostPolicy()
} else if cfg.HostSelectionPolicy == HostPolicyTokenAware {
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
} else {
return errors.New("Unknown host selection policy")
}

if cfg.Auth {
password := cfg.Password.Value
if cfg.PasswordFile != "" {
Expand Down