Skip to content

Commit e9fac05

Browse files
akstronyurishkuro
andauthored
Create Cassandra db schema on session initialization (#5922)
Create Schema (if not present) on Session Initialization Once a session is established with cassandra db, the added code parses the template file containing queries for creating schema and create queries out of it. Post which it executes those queries to create the required types and tables. ## Which problem is this PR solving? Resolves #5797 ## Description of the changes - The PR includes the following changes: - 1. Embedding template files into binary - 2. Creation of database schema in initialization steps once session to database is established. ## How was this change tested? - Schema rendering is being tested with unit test. - bash scripts/cassandra-integration-test.sh -s 4 v004 v2 ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Alok Kumar Singh <[email protected]> Signed-off-by: Alok Kumar Singh <[email protected]> Signed-off-by: Yuri Shkuro <[email protected]> Signed-off-by: Yuri Shkuro <[email protected]> Co-authored-by: Yuri Shkuro <[email protected]> Co-authored-by: Yuri Shkuro <[email protected]>
1 parent 2e5ed05 commit e9fac05

File tree

13 files changed

+662
-90
lines changed

13 files changed

+662
-90
lines changed

.github/workflows/ci-e2e-cassandra.yml

+9-2
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ jobs:
2222
fail-fast: false
2323
matrix:
2424
jaeger-version: [v1, v2]
25+
create-schema: [manual, auto]
2526
version:
2627
- distribution: cassandra
2728
major: 4.x
2829
schema: v004
2930
- distribution: cassandra
3031
major: 5.x
3132
schema: v004
32-
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
33+
exclude:
34+
# Exclude v1 as create schema on fly is available for v2 only
35+
- jaeger-version: v1
36+
create-schema: auto
37+
name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }}
3338
steps:
3439
- name: Harden Runner
3540
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
@@ -45,9 +50,11 @@ jobs:
4550
- name: Run cassandra integration tests
4651
id: test-execution
4752
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
53+
env:
54+
SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }}
4855

4956
- name: Upload coverage to codecov
5057
uses: ./.github/actions/upload-codecov
5158
with:
5259
files: cover.out
53-
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}
60+
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }}

cmd/jaeger/config-cassandra.yaml

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ extensions:
3434
cassandra:
3535
schema:
3636
keyspace: "jaeger_v1_dc1"
37+
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
3738
connection:
3839
auth:
3940
basic:
@@ -44,7 +45,8 @@ extensions:
4445
another_storage:
4546
cassandra:
4647
schema:
47-
keyspace: "jaeger_v1_dc1"
48+
keyspace: "jaeger_v1_dc1_archive"
49+
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
4850
connection:
4951
auth:
5052
basic:

pkg/cassandra/config/config.go

+63-23
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ package config
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"time"
1112

1213
"github.com/asaskevich/govalidator"
1314
"github.com/gocql/gocql"
1415
"go.opentelemetry.io/collector/config/configtls"
15-
16-
"github.com/jaegertracing/jaeger/pkg/cassandra"
17-
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
1816
)
1917

2018
// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
@@ -58,6 +56,19 @@ type Schema struct {
5856
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
5957
// that do not support SnappyCompression.
6058
DisableCompression bool `mapstructure:"disable_compression"`
59+
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
60+
CreateSchema bool `mapstructure:"create" valid:"optional"`
61+
// Datacenter is the name for network topology
62+
Datacenter string `mapstructure:"datacenter" valid:"optional"`
63+
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
64+
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
65+
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
66+
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
67+
// Replication factor for the db
68+
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
69+
// CompactionWindow is the size of the window for TimeWindowCompactionStrategy.
70+
// All SSTables within that window are grouped together into one SSTable.
71+
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
6172
}
6273

6374
type Query struct {
@@ -86,7 +97,13 @@ type BasicAuthenticator struct {
8697
func DefaultConfiguration() Configuration {
8798
return Configuration{
8899
Schema: Schema{
89-
Keyspace: "jaeger_v1_test",
100+
CreateSchema: false,
101+
Keyspace: "jaeger_dc1",
102+
Datacenter: "dc1",
103+
TraceTTL: 2 * 24 * time.Hour,
104+
DependenciesTTL: 2 * 24 * time.Hour,
105+
ReplicationFactor: 1,
106+
CompactionWindow: time.Minute,
90107
},
91108
Connection: Connection{
92109
Servers: []string{"127.0.0.1"},
@@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
106123
if c.Schema.Keyspace == "" {
107124
c.Schema.Keyspace = source.Schema.Keyspace
108125
}
126+
127+
if c.Schema.Datacenter == "" {
128+
c.Schema.Datacenter = source.Schema.Datacenter
129+
}
130+
131+
if c.Schema.TraceTTL == 0 {
132+
c.Schema.TraceTTL = source.Schema.TraceTTL
133+
}
134+
135+
if c.Schema.DependenciesTTL == 0 {
136+
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
137+
}
138+
139+
if c.Schema.ReplicationFactor == 0 {
140+
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
141+
}
142+
143+
if c.Schema.CompactionWindow == 0 {
144+
c.Schema.CompactionWindow = source.Schema.CompactionWindow
145+
}
146+
109147
if c.Connection.ConnectionsPerHost == 0 {
110148
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
111149
}
@@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
129167
}
130168
}
131169

132-
// SessionBuilder creates new cassandra.Session
133-
type SessionBuilder interface {
134-
NewSession() (cassandra.Session, error)
135-
}
136-
137-
// NewSession creates a new Cassandra session
138-
func (c *Configuration) NewSession() (cassandra.Session, error) {
139-
cluster, err := c.NewCluster()
140-
if err != nil {
141-
return nil, err
142-
}
143-
session, err := cluster.CreateSession()
144-
if err != nil {
145-
return nil, err
146-
}
147-
return gocqlw.WrapCQLSession(session), nil
148-
}
149-
150170
// NewCluster creates a new gocql cluster from the configuration
151171
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
152172
cluster := gocql.NewCluster(c.Connection.Servers...)
@@ -210,7 +230,27 @@ func (c *Configuration) String() string {
210230
return fmt.Sprintf("%+v", *c)
211231
}
212232

233+
func isValidTTL(duration time.Duration) bool {
234+
return duration == 0 || duration >= time.Second
235+
}
236+
213237
func (c *Configuration) Validate() error {
214238
_, err := govalidator.ValidateStruct(c)
215-
return err
239+
if err != nil {
240+
return err
241+
}
242+
243+
if !isValidTTL(c.Schema.TraceTTL) {
244+
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
245+
}
246+
247+
if !isValidTTL(c.Schema.DependenciesTTL) {
248+
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
249+
}
250+
251+
if c.Schema.CompactionWindow < time.Minute {
252+
return errors.New("compaction_window should at least be 1 minute")
253+
}
254+
255+
return nil
216256
}

pkg/cassandra/config/config_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package config
55

66
import (
77
"testing"
8+
"time"
89

910
"github.com/gocql/gocql"
1011
"github.com/stretchr/testify/assert"
@@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
4344
Connection: Connection{
4445
Servers: []string{"localhost:9200"},
4546
},
47+
Schema: Schema{
48+
CompactionWindow: time.Minute,
49+
},
4650
}
4751

4852
err := cfg.Validate()
@@ -94,3 +98,23 @@ func TestToString(t *testing.T) {
9498
s := cfg.String()
9599
assert.Contains(t, s, "Keyspace:test")
96100
}
101+
102+
func TestConfigSchemaValidation(t *testing.T) {
103+
cfg := DefaultConfiguration()
104+
err := cfg.Validate()
105+
require.NoError(t, err)
106+
107+
cfg.Schema.TraceTTL = time.Millisecond
108+
err = cfg.Validate()
109+
require.Error(t, err)
110+
111+
cfg.Schema.TraceTTL = time.Second
112+
cfg.Schema.CompactionWindow = time.Minute - 1
113+
err = cfg.Validate()
114+
require.Error(t, err)
115+
116+
cfg.Schema.CompactionWindow = time.Minute
117+
cfg.Schema.DependenciesTTL = time.Second - 1
118+
err = cfg.Validate()
119+
require.Error(t, err)
120+
}

plugin/storage/cassandra/factory.go

+56-9
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ import (
1717

1818
"github.com/jaegertracing/jaeger/pkg/cassandra"
1919
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
20+
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
2021
"github.com/jaegertracing/jaeger/pkg/distributedlock"
2122
"github.com/jaegertracing/jaeger/pkg/hostname"
2223
"github.com/jaegertracing/jaeger/pkg/metrics"
2324
"github.com/jaegertracing/jaeger/plugin"
2425
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
2526
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
2627
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
28+
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
2729
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
2830
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
2931
"github.com/jaegertracing/jaeger/storage"
@@ -55,17 +57,22 @@ type Factory struct {
5557
logger *zap.Logger
5658
tracer trace.TracerProvider
5759

58-
primaryConfig config.SessionBuilder
60+
primaryConfig config.Configuration
61+
archiveConfig *config.Configuration
62+
5963
primarySession cassandra.Session
60-
archiveConfig config.SessionBuilder
6164
archiveSession cassandra.Session
65+
66+
// tests can override this
67+
sessionBuilderFn func(*config.Configuration) (cassandra.Session, error)
6268
}
6369

6470
// NewFactory creates a new Factory.
6571
func NewFactory() *Factory {
6672
return &Factory{
67-
tracer: otel.GetTracerProvider(),
68-
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
73+
tracer: otel.GetTracerProvider(),
74+
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
75+
sessionBuilderFn: NewSession,
6976
}
7077
}
7178

@@ -126,9 +133,7 @@ func (f *Factory) configureFromOptions(o *Options) {
126133
o.others = make(map[string]*NamespaceConfig)
127134
}
128135
f.primaryConfig = o.GetPrimary()
129-
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
130-
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
131-
}
136+
f.archiveConfig = f.Options.Get(archiveStorageConfig)
132137
}
133138

134139
// Initialize implements storage.Factory
@@ -137,14 +142,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
137142
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
138143
f.logger = logger
139144

140-
primarySession, err := f.primaryConfig.NewSession()
145+
primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
141146
if err != nil {
142147
return err
143148
}
144149
f.primarySession = primarySession
145150

146151
if f.archiveConfig != nil {
147-
archiveSession, err := f.archiveConfig.NewSession()
152+
archiveSession, err := f.sessionBuilderFn(f.archiveConfig)
148153
if err != nil {
149154
return err
150155
}
@@ -155,6 +160,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
155160
return nil
156161
}
157162

163+
// createSession creates session from a configuration
164+
func createSession(c *config.Configuration) (cassandra.Session, error) {
165+
cluster, err := c.NewCluster()
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
session, err := cluster.CreateSession()
171+
if err != nil {
172+
return nil, err
173+
}
174+
175+
return gocqlw.WrapCQLSession(session), nil
176+
}
177+
178+
// newSessionPrerequisites creates tables and types before creating a session
179+
func newSessionPrerequisites(c *config.Configuration) error {
180+
if !c.Schema.CreateSchema {
181+
return nil
182+
}
183+
184+
cfg := *c // clone because we need to connect without specifying a keyspace
185+
cfg.Schema.Keyspace = ""
186+
187+
session, err := createSession(&cfg)
188+
if err != nil {
189+
return err
190+
}
191+
192+
sc := schema.NewSchemaCreator(session, c.Schema)
193+
return sc.CreateSchemaIfNotPresent()
194+
}
195+
196+
// NewSession creates a new Cassandra session
197+
func NewSession(c *config.Configuration) (cassandra.Session, error) {
198+
if err := newSessionPrerequisites(c); err != nil {
199+
return nil, err
200+
}
201+
202+
return createSession(c)
203+
}
204+
158205
// CreateSpanReader implements storage.Factory
159206
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
160207
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))

0 commit comments

Comments
 (0)