Skip to content

Add Cassandra table options flag #2575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ Please make sure to review renamed metrics, and update your dashboards and alert
* [FEATURE] TLS config options added to the Server. #2535
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.query-concurrency` flag. #2562
* [FEATURE] Add `-cassandra.table-options` flag to customize table options of Cassandra when creating the index or chunk table. #2575
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,14 @@ cassandra:
# CLI flag: -cassandra.convict-hosts-on-failure
[convict_hosts_on_failure: <boolean> | default = true]

# Table options used to create index or chunk tables. This value is used as
# plain text in the table `WITH` like this, "CREATE TABLE
# <generated_by_cortex> (...) WITH <cassandra.table-options>". For details,
# see https://cortexmetrics.io/docs/production/cassandra. (Default = "": use
# default table options of your Cassandra)
# CLI flag: -cassandra.table-options
[table_options: <string> | default = ""]

boltdb:
# Location of BoltDB index files.
# CLI flag: -boltdb.dir
Expand Down
38 changes: 38 additions & 0 deletions docs/production/storage-cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,44 @@ docker run -d --name=cortex -v $(pwd)/single-process-config.yaml:/etc/single-pro
```
In case you prefer to run the master version, please follow this [documentation](../getting-started/getting-started-chunks.md) on how to build Cortex from source.

### Configure the index and chunk table options

In order to create index and chunk tables on Cassandra, Cortex will use the default table options of your Cassandra.
If you want to configure the table options, use the `storage.cassandra.table_options` property or `cassandra.table-options` flag.
This configuration property is just `string` type and this value used as plain text on `WITH` option of table creation query.
It is recommended to enclose the value of `table_options` in double-quotes because you should enclose strings of table options in quotes on Cassandra.

For example, suppose the name of index(or chunk) table is 'test_table'.
Details about column definitions of the table are omitted.
If no table options configured, then Cortex will generate the query to create a table without a `WITH` clause to use default table options:

```
CREATE TABLE IF NOT EXISTS cortex.test_table (...)
```

If table options configured with `table_options` as below:

```
storage:
cassandra:
addresses: 127.0.0.1
keyspace: cortex
table_options: "gc_grace_seocnds = 86400
AND comments = 'this is a test table'
AND COMPACT STORAGE
AND caching = { 'keys': 'ALL', 'rows_per_partition': 1024 }"
```

Then Cortex will generate the query to create a table with a `WITH` clause as below:

```
CREATE TABLE IF NOT EXISTS cortex.test_table (...) WITH gc_grace_seocnds = 86400 AND comments = 'this is a test table' AND COMPACT STORAGE AND caching = { 'keys': 'ALL', 'rows_per_partition': 1024 }
```

Available settings of the table options on Cassandra depend on Cassandra version or storage which is compatible.
For details about table options, see the official document of storage you are using.

**WARNING**: Make sure there are no incorrect options and mistakes. Misconfigured table options may cause a failure in creating a table by Table Manager at runtime and seriously affect your Cortex.

## Configure Prometheus to send series to Cortex

Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
QueryConcurrency int `yaml:"query_concurrency"`
NumConnections int `yaml:"num_connections"`
ConvictHosts bool `yaml:"convict_hosts_on_failure"`
TableOptions string `yaml:"table_options"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -75,6 +76,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
f.IntVar(&cfg.NumConnections, "cassandra.num-connections", 2, "Number of TCP connections per host.")
f.BoolVar(&cfg.ConvictHosts, "cassandra.convict-hosts-on-failure", true, "Convict hosts of being down on failure.")
f.StringVar(&cfg.TableOptions, "cassandra.table-options", "", "Table options used to create index or chunk tables. This value is used as plain text in the table `WITH` like this, \"CREATE TABLE <generated_by_cortex> (...) WITH <cassandra.table-options>\". For details, see https://cortexmetrics.io/docs/production/cassandra. (Default = \"\": use default table options of your Cassandra)")
}

func (cfg *Config) Validate() error {
Expand Down
23 changes: 16 additions & 7 deletions pkg/chunk/cassandra/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,8 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
}

func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error {
err := c.session.Query(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`, desc.Name)).WithContext(ctx).Exec()
query := c.getCreateTableQuery(&desc)
err := c.session.Query(query).WithContext(ctx).Exec()
return errors.WithStack(err)
}

Expand All @@ -69,3 +64,17 @@ func (c *tableClient) UpdateTable(ctx context.Context, current, expected chunk.T
func (c *tableClient) Stop() {
c.session.Close()
}

func (c *tableClient) getCreateTableQuery(desc *chunk.TableDesc) (query string) {
query = fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`, desc.Name)
if c.cfg.TableOptions != "" {
query = fmt.Sprintf("%s WITH %s", query, c.cfg.TableOptions)
}
return
}
48 changes: 48 additions & 0 deletions pkg/chunk/cassandra/table_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cassandra

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestTableClient_getCreateTableQuery_default(t *testing.T) {
client := &tableClient{
cfg: Config{},
}
desc, _, _ := client.DescribeTable(context.Background(), "test_table")
query := client.getCreateTableQuery(&desc)
assert.Equal(
t,
`
CREATE TABLE IF NOT EXISTS test_table (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`,
query,
)
}

func TestTableClient_getCreateTableQuery_withOptions(t *testing.T) {
client := &tableClient{
cfg: Config{
TableOptions: "CLUSTERING ORDER BY (range DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }",
},
}
desc, _, _ := client.DescribeTable(context.Background(), "test_table")
query := client.getCreateTableQuery(&desc)
assert.Equal(
t,
`
CREATE TABLE IF NOT EXISTS test_table (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
) WITH CLUSTERING ORDER BY (range DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }`,
query,
)
}