From 0f96e9939e2462df00398cb0ff6000cbb386f1e5 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Tue, 21 Jan 2025 00:02:21 +0530 Subject: [PATCH 01/11] first step Signed-off-by: Manik2708 --- cmd/jaeger/config-elasticsearch-ilm.yaml | 76 +++++ cmd/jaeger/config-elasticsearch.yaml | 2 + .../e2e_elasticsearch_ilm_integration.go | 232 ++++++++++++++ .../integration/elasticsearch_test.go | 8 + .../internal/integration/opensearch_test.go | 8 + pkg/es/client.go | 29 ++ pkg/es/config/config.go | 2 + pkg/es/mocks/AliasAddAction.go | 112 +++++++ pkg/es/mocks/AliasRemoveAction.go | 92 ++++++ pkg/es/mocks/Client.go | 170 +++++++++++ pkg/es/mocks/IndicesGetService.go | 92 ++++++ pkg/es/mocks/XPackIlmPutLifecycle.go | 106 +++++++ pkg/es/wrapper/wrapper.go | 128 ++++++++ plugin/storage/es/factory.go | 16 +- plugin/storage/es/ilm/ilm-policy.json | 13 + plugin/storage/es/ilm/ilm.go | 284 ++++++++++++++++++ plugin/storage/es/ilm/ism-policy.json | 18 ++ .../es/mappings/jaeger-dependencies-7.json | 12 +- .../es/mappings/jaeger-dependencies-8.json | 12 +- .../es/mappings/jaeger-sampling-7.json | 16 +- .../es/mappings/jaeger-sampling-8.json | 12 +- .../storage/es/mappings/jaeger-service-7.json | 12 +- .../storage/es/mappings/jaeger-service-8.json | 16 +- plugin/storage/es/mappings/jaeger-span-7.json | 16 +- plugin/storage/es/mappings/jaeger-span-8.json | 18 +- plugin/storage/es/mappings/mapping.go | 4 +- storage_v2/depstore/mocks/Writer.go | 52 ++++ 27 files changed, 1515 insertions(+), 43 deletions(-) create mode 100644 cmd/jaeger/config-elasticsearch-ilm.yaml create mode 100644 cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go create mode 100644 pkg/es/mocks/AliasAddAction.go create mode 100644 pkg/es/mocks/AliasRemoveAction.go create mode 100644 pkg/es/mocks/IndicesGetService.go create mode 100644 pkg/es/mocks/XPackIlmPutLifecycle.go create mode 100644 plugin/storage/es/ilm/ilm-policy.json create mode 100644 plugin/storage/es/ilm/ilm.go create mode 100644 plugin/storage/es/ilm/ism-policy.json create mode 100644 storage_v2/depstore/mocks/Writer.go diff --git a/cmd/jaeger/config-elasticsearch-ilm.yaml b/cmd/jaeger/config-elasticsearch-ilm.yaml new file mode 100644 index 00000000000..ce3126f1426 --- /dev/null +++ b/cmd/jaeger/config-elasticsearch-ilm.yaml @@ -0,0 +1,76 @@ +service: + extensions: [jaeger_storage, jaeger_query, healthcheckv2] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + telemetry: + resource: + service.name: jaeger + metrics: + level: detailed + address: 0.0.0.0:8888 + logs: + level: debug + # TODO Initialize telemetry tracer once OTEL released new feature. + # https://github.com/open-telemetry/opentelemetry-collector/issues/10663 + +extensions: + healthcheckv2: + use_v2: true + http: + + jaeger_query: + storage: + traces: some_storage + traces_archive: another_storage + ui: + config_file: ./cmd/jaeger/config-ui.json + + jaeger_storage: + backends: + some_storage: + elasticsearch: + create_mappings: true + indices: + index_prefix: "jaeger-main" + spans: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + use_ilm: true + use_aliases: true + another_storage: + elasticsearch: + indices: + index_prefix: "jaeger-archive" + +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: some_storage diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 6fa4d247e20..bb3de11a31e 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -54,6 +54,8 @@ extensions: rollover_frequency: "day" shards: 5 replicas: 1 + use_ilm: true + use_aliases: true another_storage: elasticsearch: indices: diff --git a/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go b/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go new file mode 100644 index 00000000000..8ec4647b452 --- /dev/null +++ b/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go @@ -0,0 +1,232 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "fmt" + "io" + "net/http" + "strconv" + "testing" + + esV8 "github.com/elastic/go-elasticsearch/v8" + esV8api "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/olivere/elastic" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/plugin/storage/integration" +) + +const ( + queryURL = "http://0.0.0.0:9200" + defaultIlmPolicy = "jaeger-default-ilm-policy" + defaultIsmPolicy = "jaeger-default-ism-policy" + prefix = "jaeger-main-" +) + +type E2EElasticSearchILMIntegration struct { + isOpenSearch bool +} + +func (e *E2EElasticSearchILMIntegration) RunTests(t *testing.T) { + hcl := getESHttpClient(t) + client, err := createESClient(t, hcl) + require.NoError(t, err) + defer e.cleanES(t, client) + version, err := e.getVersion(client) + require.NoError(t, err) + if version < 7 { + t.Skip("Automated Rollover is supported only for 7+ versions") + } + s := &E2EStorageIntegration{ + ConfigFile: "../../config-elasticsearch-ilm.yaml", + StorageIntegration: integration.StorageIntegration{ + CleanUp: purge, + Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), + }, + } + if e.isOpenSearch { + s.e2eInitialize(t, "opensearch") + } else { + s.e2eInitialize(t, "elasticsearch") + } + var v8client *esV8.Client + if version > 7 { + v8client, err = createESV8Client(hcl.Transport) + require.NoError(t, err) + } + t.Run("CheckForILMOrISMPolicy", func(t *testing.T) { + e.checkForIlmPolicyCreation(t, client) + }) + t.Run("CheckForCorrectTemplateSettings", func(t *testing.T) { + e.checkForTemplateSettings(t, client, v8client, version) + }) + t.Run("CheckForIndexAndAliasesExistence", func(t *testing.T) { + e.checkForIndexAndAliasesExistence(t, client) + }) +} + +func (e *E2EElasticSearchILMIntegration) checkForIlmPolicyCreation(t *testing.T, client *elastic.Client) { + if !e.isOpenSearch { + res, err := client.XPackIlmGetLifecycle().Policy(defaultIlmPolicy).Do(context.Background()) + require.NoError(t, err) + if _, ok := res[defaultIlmPolicy]; !ok { + t.Errorf("no policy found for %s", defaultIlmPolicy) + } + } else { + _, err := client.PerformRequest(context.Background(), elastic.PerformRequestOptions{ + Path: "_plugins/_ism/policies/" + defaultIsmPolicy, + Method: http.MethodGet, + }) + require.NoError(t, err) + } +} + +func (e *E2EElasticSearchILMIntegration) checkForTemplateSettings(t *testing.T, client *elastic.Client, v8Client *esV8.Client, version uint) { + templateNames := []string{"jaeger-span", "jaeger-service"} + for _, templateName := range templateNames { + t.Run(templateName, func(t *testing.T) { + if version > 7 { + flatSettings := true + res, err := v8Client.Indices.GetIndexTemplate(func(request *esV8api.IndicesGetIndexTemplateRequest) { + request.FlatSettings = &flatSettings + request.Name = prefix + templateName + request.Pretty = true + }) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + data, err := io.ReadAll(res.Body) + require.NoError(t, err) + dataStr := string(data) + // Parsing the whole template doesn't seems to be worth here as we are only concerned about correct policy application + expectedLifecycleName := fmt.Sprintf("\"index.lifecycle.name\" : \"%s\"", defaultIlmPolicy) + expectedRolloverAlias := fmt.Sprintf("\"index.lifecycle.rollover_alias\" : \"%s-write\"", prefix+templateName) + assert.Contains(t, dataStr, expectedLifecycleName) + assert.Contains(t, dataStr, expectedRolloverAlias) + } else { + tmpl, err := client.IndexGetTemplate().FlatSettings(true).Do(context.Background()) + require.NoError(t, err) + template, ok := tmpl[prefix+templateName] + require.True(t, ok) + aliasName := prefix + templateName + "-" + "write" + if !e.isOpenSearch { + if lifecycleName, ok := template.Settings["index.lifecycle.name"]; ok { + assert.Equal(t, defaultIlmPolicy, lifecycleName) + } else { + t.Errorf("no lifecycle name found for %s", defaultIlmPolicy) + } + if rolloverAlias, ok := template.Settings["index.lifecycle.rollover_alias"]; ok { + assert.Equal(t, rolloverAlias, aliasName) + } else { + t.Errorf("no rollover_alias found for %s", defaultIlmPolicy) + } + } else { + if rolloverAlias, ok := template.Settings["index.plugins.index_state_management.rollover_alias"]; ok { + assert.Equal(t, rolloverAlias, aliasName) + } else { + t.Errorf("no rollover_alias found for %s", defaultIsmPolicy) + } + } + } + }) + } +} + +func (*E2EElasticSearchILMIntegration) checkForIndexAndAliasesExistence(t *testing.T, client *elastic.Client) { + indexes := []string{"jaeger-span", "jaeger-service"} + suffix := "-000001" + for _, indexName := range indexes { + t.Run(indexName, func(t *testing.T) { + index := prefix + indexName + suffix + exists, err := client.IndexExists(index).Do(context.Background()) + require.NoError(t, err) + require.True(t, exists) + aliases, err := client.Aliases().Index(index).Do(context.Background()) + require.NoError(t, err) + readIndexExists := false + writeIndexExists := false + if indexInfo, found := aliases.Indices[index]; found { + for _, alias := range indexInfo.Aliases { + if alias.AliasName == prefix+indexName+"-write" { + writeIndexExists = true + assert.True(t, alias.IsWriteIndex) + } else if alias.AliasName == prefix+indexName+"-read" { + readIndexExists = true + assert.False(t, alias.IsWriteIndex) + } + } + } else { + t.Errorf("no alias found for %s", indexName) + } + assert.True(t, writeIndexExists) + assert.True(t, readIndexExists) + }) + } +} + +func (e *E2EElasticSearchILMIntegration) cleanES(t *testing.T, client *elastic.Client) { + _, err := client.DeleteIndex("*").Do(context.Background()) + require.NoError(t, err) + if !e.isOpenSearch { + _, err = client.XPackIlmDeleteLifecycle().Policy(defaultIlmPolicy).Do(context.Background()) + if err != nil && !elastic.IsNotFound(err) { + assert.Fail(t, "Not able to clean up ILM Policy") + } + } else { + _, err := client.PerformRequest(context.Background(), elastic.PerformRequestOptions{ + Path: "_plugins/_ism/policies/" + defaultIsmPolicy, + Method: http.MethodDelete, + }) + require.NoError(t, err) + } + _, err = client.IndexDeleteTemplate("*").Do(context.Background()) + require.NoError(t, err) +} + +func (e *E2EElasticSearchILMIntegration) getVersion(client *elastic.Client) (uint, error) { + if e.isOpenSearch { + return 7, nil + } + pingResult, _, err := client.Ping(queryURL).Do(context.Background()) + if err != nil { + return 0, err + } + esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) + if err != nil { + return 0, err + } + //nolint: gosec + return uint(esVersion), nil +} + +func createESClient(t *testing.T, hcl *http.Client) (*elastic.Client, error) { + cl, err := elastic.NewClient( + elastic.SetURL(queryURL), + elastic.SetSniff(false), + elastic.SetHttpClient(hcl), + ) + require.NoError(t, err) + t.Cleanup(func() { + cl.Stop() + }) + return cl, nil +} + +func createESV8Client(tr http.RoundTripper) (*esV8.Client, error) { + return esV8.NewClient(esV8.Config{ + Addresses: []string{queryURL}, + DiscoverNodesOnStart: false, + Transport: tr, + }) +} + +func getESHttpClient(t *testing.T) *http.Client { + tr := &http.Transport{} + t.Cleanup(func() { + tr.CloseIdleConnections() + }) + return &http.Client{Transport: tr} +} diff --git a/cmd/jaeger/internal/integration/elasticsearch_test.go b/cmd/jaeger/internal/integration/elasticsearch_test.go index 844c4866321..a26b0f7f765 100644 --- a/cmd/jaeger/internal/integration/elasticsearch_test.go +++ b/cmd/jaeger/internal/integration/elasticsearch_test.go @@ -23,3 +23,11 @@ func TestElasticsearchStorage(t *testing.T) { s.e2eInitialize(t, "elasticsearch") s.RunSpanStoreTests(t) } + +func TestElasticsearchRollover(t *testing.T) { + integration.SkipUnlessEnv(t, "elasticsearch") + e := &E2EElasticSearchILMIntegration{ + isOpenSearch: false, + } + e.RunTests(t) +} diff --git a/cmd/jaeger/internal/integration/opensearch_test.go b/cmd/jaeger/internal/integration/opensearch_test.go index f164bfcda26..4176accc1e9 100644 --- a/cmd/jaeger/internal/integration/opensearch_test.go +++ b/cmd/jaeger/internal/integration/opensearch_test.go @@ -22,3 +22,11 @@ func TestOpenSearchStorage(t *testing.T) { s.e2eInitialize(t, "opensearch") s.RunSpanStoreTests(t) } + +func TestOpenSearchRollover(t *testing.T) { + integration.SkipUnlessEnv(t, "opensearch") + e := &E2EElasticSearchILMIntegration{ + isOpenSearch: true, + } + e.RunTests(t) +} diff --git a/pkg/es/client.go b/pkg/es/client.go index f7a5b6e73a0..a13accf28c2 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -15,7 +15,14 @@ import ( type Client interface { IndexExists(index string) IndicesExistsService CreateIndex(index string) IndicesCreateService + GetIndices() IndicesGetService CreateTemplate(id string) TemplateCreateService + CreateAlias(name string) AliasAddAction + DeleteAlias(name string) AliasRemoveAction + CreateIlmPolicy() XPackIlmPutLifecycle + CreateIsmPolicy(ctx context.Context, id string, policy string) (*elastic.Response, error) + IlmPolicyExists(ctx context.Context, id string) (bool, error) + IsmPolicyExists(ctx context.Context, id string) (bool, error) Index() IndexService Search(indices ...string) SearchService MultiSearch() MultiSearchService @@ -70,3 +77,25 @@ type MultiSearchService interface { Index(indices ...string) MultiSearchService Do(ctx context.Context) (*elastic.MultiSearchResult, error) } + +type AliasAddAction interface { + Index(index ...string) AliasAddAction + IsWriteIndex(flag bool) AliasAddAction + Do(ctx context.Context) (*elastic.AliasResult, error) +} + +type AliasRemoveAction interface { + Index(index ...string) AliasRemoveAction + Do(ctx context.Context) (*elastic.AliasResult, error) +} + +type XPackIlmPutLifecycle interface { + BodyString(body string) XPackIlmPutLifecycle + Policy(policy string) XPackIlmPutLifecycle + Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error) +} + +type IndicesGetService interface { + Index(indices ...string) IndicesGetService + Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error) +} diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d0ae9c6ad3d..3b6556c3308 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -134,6 +134,7 @@ type Configuration struct { // latest adaptive sampling probabilities. AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"` Tags TagsAsFields `mapstructure:"tags_as_fields"` + IsOpenSearch bool `mapstructure:"is_open_search"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` } @@ -280,6 +281,7 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact } // OpenSearch is based on ES 7.x if strings.Contains(pingResult.TagLine, "OpenSearch") { + c.IsOpenSearch = true if pingResult.Version.Number[0] == '1' { logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings") esVersion = 7 diff --git a/pkg/es/mocks/AliasAddAction.go b/pkg/es/mocks/AliasAddAction.go new file mode 100644 index 00000000000..ec9d27aef7d --- /dev/null +++ b/pkg/es/mocks/AliasAddAction.go @@ -0,0 +1,112 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + es "github.com/jaegertracing/jaeger/pkg/es" + elastic "github.com/olivere/elastic" + + mock "github.com/stretchr/testify/mock" +) + +// AliasAddAction is an autogenerated mock type for the AliasAddAction type +type AliasAddAction struct { + mock.Mock +} + +// Do provides a mock function with given fields: ctx +func (_m *AliasAddAction) Do(ctx context.Context) (*elastic.AliasResult, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 *elastic.AliasResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*elastic.AliasResult, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *elastic.AliasResult); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*elastic.AliasResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Index provides a mock function with given fields: index +func (_m *AliasAddAction) Index(index ...string) es.AliasAddAction { + _va := make([]interface{}, len(index)) + for _i := range index { + _va[_i] = index[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Index") + } + + var r0 es.AliasAddAction + if rf, ok := ret.Get(0).(func(...string) es.AliasAddAction); ok { + r0 = rf(index...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.AliasAddAction) + } + } + + return r0 +} + +// IsWriteIndex provides a mock function with given fields: flag +func (_m *AliasAddAction) IsWriteIndex(flag bool) es.AliasAddAction { + ret := _m.Called(flag) + + if len(ret) == 0 { + panic("no return value specified for IsWriteIndex") + } + + var r0 es.AliasAddAction + if rf, ok := ret.Get(0).(func(bool) es.AliasAddAction); ok { + r0 = rf(flag) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.AliasAddAction) + } + } + + return r0 +} + +// NewAliasAddAction creates a new instance of AliasAddAction. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAliasAddAction(t interface { + mock.TestingT + Cleanup(func()) +}) *AliasAddAction { + mock := &AliasAddAction{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/es/mocks/AliasRemoveAction.go b/pkg/es/mocks/AliasRemoveAction.go new file mode 100644 index 00000000000..226d2340741 --- /dev/null +++ b/pkg/es/mocks/AliasRemoveAction.go @@ -0,0 +1,92 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + es "github.com/jaegertracing/jaeger/pkg/es" + elastic "github.com/olivere/elastic" + + mock "github.com/stretchr/testify/mock" +) + +// AliasRemoveAction is an autogenerated mock type for the AliasRemoveAction type +type AliasRemoveAction struct { + mock.Mock +} + +// Do provides a mock function with given fields: ctx +func (_m *AliasRemoveAction) Do(ctx context.Context) (*elastic.AliasResult, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 *elastic.AliasResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*elastic.AliasResult, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *elastic.AliasResult); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*elastic.AliasResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Index provides a mock function with given fields: index +func (_m *AliasRemoveAction) Index(index ...string) es.AliasRemoveAction { + _va := make([]interface{}, len(index)) + for _i := range index { + _va[_i] = index[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Index") + } + + var r0 es.AliasRemoveAction + if rf, ok := ret.Get(0).(func(...string) es.AliasRemoveAction); ok { + r0 = rf(index...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.AliasRemoveAction) + } + } + + return r0 +} + +// NewAliasRemoveAction creates a new instance of AliasRemoveAction. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAliasRemoveAction(t interface { + mock.TestingT + Cleanup(func()) +}) *AliasRemoveAction { + mock := &AliasRemoveAction{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/es/mocks/Client.go b/pkg/es/mocks/Client.go index 8adf7f176c2..5a2a5735dc6 100644 --- a/pkg/es/mocks/Client.go +++ b/pkg/es/mocks/Client.go @@ -8,7 +8,11 @@ package mocks import ( + context "context" + es "github.com/jaegertracing/jaeger/pkg/es" + elastic "github.com/olivere/elastic" + mock "github.com/stretchr/testify/mock" ) @@ -35,6 +39,46 @@ func (_m *Client) Close() error { return r0 } +// CreateAlias provides a mock function with given fields: name +func (_m *Client) CreateAlias(name string) es.AliasAddAction { + ret := _m.Called(name) + + if len(ret) == 0 { + panic("no return value specified for CreateAlias") + } + + var r0 es.AliasAddAction + if rf, ok := ret.Get(0).(func(string) es.AliasAddAction); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.AliasAddAction) + } + } + + return r0 +} + +// CreateIlmPolicy provides a mock function with no fields +func (_m *Client) CreateIlmPolicy() es.XPackIlmPutLifecycle { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateIlmPolicy") + } + + var r0 es.XPackIlmPutLifecycle + if rf, ok := ret.Get(0).(func() es.XPackIlmPutLifecycle); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.XPackIlmPutLifecycle) + } + } + + return r0 +} + // CreateIndex provides a mock function with given fields: index func (_m *Client) CreateIndex(index string) es.IndicesCreateService { ret := _m.Called(index) @@ -55,6 +99,36 @@ func (_m *Client) CreateIndex(index string) es.IndicesCreateService { return r0 } +// CreateIsmPolicy provides a mock function with given fields: ctx, id, policy +func (_m *Client) CreateIsmPolicy(ctx context.Context, id string, policy string) (*elastic.Response, error) { + ret := _m.Called(ctx, id, policy) + + if len(ret) == 0 { + panic("no return value specified for CreateIsmPolicy") + } + + var r0 *elastic.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*elastic.Response, error)); ok { + return rf(ctx, id, policy) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) *elastic.Response); ok { + r0 = rf(ctx, id, policy) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*elastic.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, id, policy) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CreateTemplate provides a mock function with given fields: id func (_m *Client) CreateTemplate(id string) es.TemplateCreateService { ret := _m.Called(id) @@ -75,6 +149,26 @@ func (_m *Client) CreateTemplate(id string) es.TemplateCreateService { return r0 } +// DeleteAlias provides a mock function with given fields: name +func (_m *Client) DeleteAlias(name string) es.AliasRemoveAction { + ret := _m.Called(name) + + if len(ret) == 0 { + panic("no return value specified for DeleteAlias") + } + + var r0 es.AliasRemoveAction + if rf, ok := ret.Get(0).(func(string) es.AliasRemoveAction); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.AliasRemoveAction) + } + } + + return r0 +} + // DeleteIndex provides a mock function with given fields: index func (_m *Client) DeleteIndex(index string) es.IndicesDeleteService { ret := _m.Called(index) @@ -95,6 +189,26 @@ func (_m *Client) DeleteIndex(index string) es.IndicesDeleteService { return r0 } +// GetIndices provides a mock function with no fields +func (_m *Client) GetIndices() es.IndicesGetService { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetIndices") + } + + var r0 es.IndicesGetService + if rf, ok := ret.Get(0).(func() es.IndicesGetService); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.IndicesGetService) + } + } + + return r0 +} + // GetVersion provides a mock function with no fields func (_m *Client) GetVersion() uint { ret := _m.Called() @@ -113,6 +227,34 @@ func (_m *Client) GetVersion() uint { return r0 } +// IlmPolicyExists provides a mock function with given fields: ctx, id +func (_m *Client) IlmPolicyExists(ctx context.Context, id string) (bool, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for IlmPolicyExists") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Index provides a mock function with no fields func (_m *Client) Index() es.IndexService { ret := _m.Called() @@ -153,6 +295,34 @@ func (_m *Client) IndexExists(index string) es.IndicesExistsService { return r0 } +// IsmPolicyExists provides a mock function with given fields: ctx, id +func (_m *Client) IsmPolicyExists(ctx context.Context, id string) (bool, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for IsmPolicyExists") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // MultiSearch provides a mock function with no fields func (_m *Client) MultiSearch() es.MultiSearchService { ret := _m.Called() diff --git a/pkg/es/mocks/IndicesGetService.go b/pkg/es/mocks/IndicesGetService.go new file mode 100644 index 00000000000..2745532ae97 --- /dev/null +++ b/pkg/es/mocks/IndicesGetService.go @@ -0,0 +1,92 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + es "github.com/jaegertracing/jaeger/pkg/es" + elastic "github.com/olivere/elastic" + + mock "github.com/stretchr/testify/mock" +) + +// IndicesGetService is an autogenerated mock type for the IndicesGetService type +type IndicesGetService struct { + mock.Mock +} + +// Do provides a mock function with given fields: ctx +func (_m *IndicesGetService) Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 map[string]*elastic.IndicesGetResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[string]*elastic.IndicesGetResponse, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[string]*elastic.IndicesGetResponse); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]*elastic.IndicesGetResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Index provides a mock function with given fields: indices +func (_m *IndicesGetService) Index(indices ...string) es.IndicesGetService { + _va := make([]interface{}, len(indices)) + for _i := range indices { + _va[_i] = indices[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Index") + } + + var r0 es.IndicesGetService + if rf, ok := ret.Get(0).(func(...string) es.IndicesGetService); ok { + r0 = rf(indices...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.IndicesGetService) + } + } + + return r0 +} + +// NewIndicesGetService creates a new instance of IndicesGetService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIndicesGetService(t interface { + mock.TestingT + Cleanup(func()) +}) *IndicesGetService { + mock := &IndicesGetService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/es/mocks/XPackIlmPutLifecycle.go b/pkg/es/mocks/XPackIlmPutLifecycle.go new file mode 100644 index 00000000000..e6d3f729892 --- /dev/null +++ b/pkg/es/mocks/XPackIlmPutLifecycle.go @@ -0,0 +1,106 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + es "github.com/jaegertracing/jaeger/pkg/es" + elastic "github.com/olivere/elastic" + + mock "github.com/stretchr/testify/mock" +) + +// XPackIlmPutLifecycle is an autogenerated mock type for the XPackIlmPutLifecycle type +type XPackIlmPutLifecycle struct { + mock.Mock +} + +// BodyString provides a mock function with given fields: body +func (_m *XPackIlmPutLifecycle) BodyString(body string) es.XPackIlmPutLifecycle { + ret := _m.Called(body) + + if len(ret) == 0 { + panic("no return value specified for BodyString") + } + + var r0 es.XPackIlmPutLifecycle + if rf, ok := ret.Get(0).(func(string) es.XPackIlmPutLifecycle); ok { + r0 = rf(body) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.XPackIlmPutLifecycle) + } + } + + return r0 +} + +// Do provides a mock function with given fields: ctx +func (_m *XPackIlmPutLifecycle) Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 *elastic.XPackIlmPutLifecycleResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*elastic.XPackIlmPutLifecycleResponse, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *elastic.XPackIlmPutLifecycleResponse); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*elastic.XPackIlmPutLifecycleResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Policy provides a mock function with given fields: policy +func (_m *XPackIlmPutLifecycle) Policy(policy string) es.XPackIlmPutLifecycle { + ret := _m.Called(policy) + + if len(ret) == 0 { + panic("no return value specified for Policy") + } + + var r0 es.XPackIlmPutLifecycle + if rf, ok := ret.Get(0).(func(string) es.XPackIlmPutLifecycle); ok { + r0 = rf(policy) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.XPackIlmPutLifecycle) + } + } + + return r0 +} + +// NewXPackIlmPutLifecycle creates a new instance of XPackIlmPutLifecycle. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewXPackIlmPutLifecycle(t interface { + mock.TestingT + Cleanup(func()) +}) *XPackIlmPutLifecycle { + mock := &XPackIlmPutLifecycle{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index e34b8c49590..21c9b4c2170 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -57,6 +57,60 @@ func (c ClientWrapper) DeleteIndex(index string) es.IndicesDeleteService { return WrapESIndicesDeleteService(c.client.DeleteIndex(index)) } +func (c ClientWrapper) GetIndices() es.IndicesGetService { + indicesGetService := elastic.NewIndicesGetService(c.client) + return WrapIndicesGetService(indicesGetService) +} + +func (c ClientWrapper) CreateAlias(alias string) es.AliasAddAction { + aliasAddAction := elastic.NewAliasAddAction(alias) + return WrapAliasAddAction(aliasAddAction, c.client) +} + +func (c ClientWrapper) DeleteAlias(alias string) es.AliasRemoveAction { + aliasRemoveAction := elastic.NewAliasRemoveAction(alias) + return WrapAliasRemoveAction(aliasRemoveAction, c.client) +} + +func (c ClientWrapper) CreateIlmPolicy() es.XPackIlmPutLifecycle { + xPack := elastic.NewXPackIlmPutLifecycleService(c.client) + return WrapXPackIlmPutLifecycle(xPack) +} + +func (c ClientWrapper) CreateIsmPolicy(ctx context.Context, id, policy string) (*elastic.Response, error) { + return c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ + Path: "_plugins/_ism/policies/" + id, + Method: http.MethodPut, + Body: policy, + }) +} + +func (c ClientWrapper) IlmPolicyExists(ctx context.Context, id string) (bool, error) { + ilmGetService := elastic.NewXPackIlmGetLifecycleService(c.client) + _, err := ilmGetService.Policy(id).Do(ctx) + if err != nil { + if elastic.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} + +func (c ClientWrapper) IsmPolicyExists(ctx context.Context, id string) (bool, error) { + _, err := c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ + Path: "_opendistro/_ism/policies/" + id, + Method: http.MethodGet, + }) + if err != nil { + if elastic.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} + // CreateTemplate calls this function to internal client. func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService { if c.esVersion >= 8 { @@ -294,3 +348,77 @@ func (s MultiSearchServiceWrapper) Index(indices ...string) es.MultiSearchServic func (s MultiSearchServiceWrapper) Do(ctx context.Context) (*elastic.MultiSearchResult, error) { return s.multiSearchService.Do(ctx) } + +type AliasAddActionWrapper struct { + aliasAddAction *elastic.AliasAddAction + client *elastic.Client +} + +func WrapAliasAddAction(aliasAddAction *elastic.AliasAddAction, client *elastic.Client) AliasAddActionWrapper { + return AliasAddActionWrapper{aliasAddAction: aliasAddAction, client: client} +} + +func (a AliasAddActionWrapper) Index(index ...string) es.AliasAddAction { + return WrapAliasAddAction(a.aliasAddAction.Index(index...), a.client) +} + +func (a AliasAddActionWrapper) IsWriteIndex(flag bool) es.AliasAddAction { + return WrapAliasAddAction(a.aliasAddAction.IsWriteIndex(flag), a.client) +} + +func (a AliasAddActionWrapper) Do(ctx context.Context) (*elastic.AliasResult, error) { + return a.client.Alias().Action(a.aliasAddAction).Do(ctx) +} + +type AliasRemoveActionWrapper struct { + aliasRemoveAction *elastic.AliasRemoveAction + client *elastic.Client +} + +func WrapAliasRemoveAction(aliasRemoveAction *elastic.AliasRemoveAction, client *elastic.Client) AliasRemoveActionWrapper { + return AliasRemoveActionWrapper{aliasRemoveAction: aliasRemoveAction, client: client} +} + +func (a AliasRemoveActionWrapper) Index(index ...string) es.AliasRemoveAction { + return WrapAliasRemoveAction(a.aliasRemoveAction.Index(index...), a.client) +} + +func (a AliasRemoveActionWrapper) Do(ctx context.Context) (*elastic.AliasResult, error) { + return a.client.Alias().Action(a.aliasRemoveAction).Do(ctx) +} + +type XPackIlmPutLifecycleWrapper struct { + xPackPutLifecycleWrapper *elastic.XPackIlmPutLifecycleService +} + +func WrapXPackIlmPutLifecycle(xPackIlmPutLifecycleWrapper *elastic.XPackIlmPutLifecycleService) XPackIlmPutLifecycleWrapper { + return XPackIlmPutLifecycleWrapper{xPackPutLifecycleWrapper: xPackIlmPutLifecycleWrapper} +} + +func (x XPackIlmPutLifecycleWrapper) BodyString(body string) es.XPackIlmPutLifecycle { + return WrapXPackIlmPutLifecycle(x.xPackPutLifecycleWrapper.BodyString(body)) +} + +func (x XPackIlmPutLifecycleWrapper) Policy(policy string) es.XPackIlmPutLifecycle { + return WrapXPackIlmPutLifecycle(x.xPackPutLifecycleWrapper.Policy(policy)) +} + +func (x XPackIlmPutLifecycleWrapper) Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error) { + return x.xPackPutLifecycleWrapper.Do(ctx) +} + +type IndicesGetServiceWrapper struct { + indicesGetService *elastic.IndicesGetService +} + +func WrapIndicesGetService(indicesGetService *elastic.IndicesGetService) IndicesGetServiceWrapper { + return IndicesGetServiceWrapper{indicesGetService: indicesGetService} +} + +func (i IndicesGetServiceWrapper) Index(indices ...string) es.IndicesGetService { + return WrapIndicesGetService(i.indicesGetService.Index(indices...)) +} + +func (i IndicesGetServiceWrapper) Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error) { + return i.indicesGetService.Do(ctx) +} diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index e1c7ae807a4..9f9e4aeb101 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" + "github.com/jaegertracing/jaeger/plugin/storage/es/ilm" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" esSampleStore "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" esSpanStore "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" @@ -312,6 +313,13 @@ func createSpanWriter( return nil, err } } + if cfg.UseILM { + policyManager := ilm.NewPolicyManager(clientFn(), cfg, ilm.OnServiceAndSpan) + err = policyManager.Init() + if err != nil { + return nil, err + } + } return writer, nil } @@ -337,7 +345,13 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store return nil, fmt.Errorf("failed to create template: %w", err) } } - + if f.primaryConfig.UseILM { + policyManager := ilm.NewPolicyManager(f.getPrimaryClient(), f.primaryConfig, ilm.OnSampling) + err := policyManager.Init() + if err != nil { + return nil, err + } + } return store, nil } diff --git a/plugin/storage/es/ilm/ilm-policy.json b/plugin/storage/es/ilm/ilm-policy.json new file mode 100644 index 00000000000..58d288c7486 --- /dev/null +++ b/plugin/storage/es/ilm/ilm-policy.json @@ -0,0 +1,13 @@ +{ + "policy": { + "phases": { + "hot": { + "actions": { + "rollover": { + "max_age": "1d" + } + } + } + } + } +} \ No newline at end of file diff --git a/plugin/storage/es/ilm/ilm.go b/plugin/storage/es/ilm/ilm.go new file mode 100644 index 00000000000..f2e13127357 --- /dev/null +++ b/plugin/storage/es/ilm/ilm.go @@ -0,0 +1,284 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package ilm + +import ( + "context" + "embed" + "errors" + "fmt" + "strconv" + "time" + + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/pkg/es/filter" + "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" +) + +//go:embed *.json +var ILM embed.FS + +const ( + defaultIlmPolicy = "jaeger-default-ilm-policy" + defaultIsmPolicy = "jaeger-default-ism-policy" + writeAliasFormat = "%s-write" + readAliasFormat = "%s-read" + rolloverIndexFormat = "%s-000001" + ilmVersionSupport = 7 + ilmPolicyFile = "ilm-policy.json" + ismPolicyFile = "ism-policy.json" +) + +type ApplyOn int + +const ( + OnArchive ApplyOn = iota + OnDependency + OnServiceAndSpan + OnSampling +) + +var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+") + +type PolicyManager struct { + client es.Client + config *config.Configuration + applyOn ApplyOn +} + +func (p *PolicyManager) Init() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if p.config.Version < ilmVersionSupport { + return ErrIlmNotSupported + } + err := p.createPolicyIfNotExists(ctx) + if err != nil { + return err + } + rolloverIndices := p.rolloverIndices() + for _, index := range rolloverIndices { + err = p.init(ctx, index) + if err != nil { + return err + } + } + return nil +} + +func NewPolicyManager(cl es.Client, configuration *config.Configuration, applyOn ApplyOn) *PolicyManager { + return &PolicyManager{ + client: cl, + config: configuration, + applyOn: applyOn, + } +} + +func (p *PolicyManager) createPolicyIfNotExists(ctx context.Context) error { + if p.config.IsOpenSearch { + policyExists, err := p.client.IsmPolicyExists(ctx, defaultIsmPolicy) + if err != nil { + return err + } + if !policyExists { + policy := loadPolicy(ismPolicyFile) + _, err := p.client.CreateIsmPolicy(ctx, defaultIsmPolicy, policy) + if err != nil { + return err + } + } + } else { + policyExists, err := p.client.IlmPolicyExists(ctx, defaultIlmPolicy) + if err != nil { + return err + } + if !policyExists { + policy := loadPolicy(ilmPolicyFile) + _, err := p.client.CreateIlmPolicy().Policy(defaultIlmPolicy).BodyString(policy).Do(ctx) + if err != nil { + return err + } + } + } + return nil +} + +func (p *PolicyManager) init(ctx context.Context, indexOpt indexOption) error { + if p.config.CreateIndexTemplates { + mappingType, err := mappings.MappingTypeFromString(indexOpt.mapping) + if err != nil { + return err + } + mapping, err := p.getMapping(mappingType) + if err != nil { + return err + } + _, err = p.client.CreateTemplate(indexOpt.templateName(p.config.Indices.IndexPrefix)).Body(mapping).Do(ctx) + if err != nil { + return err + } + } + index := indexOpt.initialRolloverIndex(p.config.Indices.IndexPrefix) + err := p.createIndexIfNotExists(ctx, index) + if err != nil { + return err + } + jaegerIndices, err := p.getJaegerIndices(ctx, indexOpt) + if err != nil { + return err + } + readAlias := indexOpt.readAliasName(p.config.Indices.IndexPrefix) + writeAlias := indexOpt.writeAliasName(p.config.Indices.IndexPrefix) + var aliases []client.Alias + if !filter.AliasExists(jaegerIndices, readAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: readAlias, + IsWriteIndex: false, + }) + } + if !filter.AliasExists(jaegerIndices, writeAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: writeAlias, + IsWriteIndex: p.config.UseILM, + }) + } + for _, alias := range aliases { + _, err = p.client.CreateAlias(alias.Name).Index(alias.Index).IsWriteIndex(alias.IsWriteIndex).Do(ctx) + if err != nil { + return err + } + } + return nil +} + +func (p *PolicyManager) getMapping(mappingType mappings.MappingType) (string, error) { + mappingBuilder := &mappings.MappingBuilder{ + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: p.config.Indices, + EsVersion: p.config.Version, + UseILM: p.config.UseILM, + } + if p.config.IsOpenSearch { + mappingBuilder.IsOpenSearch = true + mappingBuilder.ILMPolicyName = defaultIsmPolicy + } else { + mappingBuilder.ILMPolicyName = defaultIlmPolicy + } + return mappingBuilder.GetMapping(mappingType) +} + +func (p *PolicyManager) createIndexIfNotExists(ctx context.Context, index string) error { + exists, err := p.client.IndexExists(index).Do(ctx) + if err != nil { + return err + } + if !exists { + _, err = p.client.CreateIndex(index).Do(ctx) + if err != nil { + return err + } + } + return nil +} + +func (p *PolicyManager) getJaegerIndices(ctx context.Context, indexOpt indexOption) ([]client.Index, error) { + // One of the example of the prefix is: jaeger-main-jaeger-span-* where jaeger-main is the index prefix + prefix := indexOpt.indexName(p.config.Indices.IndexPrefix) + "-*" + res, err := p.client.GetIndices().Index(prefix).Do(ctx) + if err != nil { + return nil, err + } + indices := make([]client.Index, len(res)) + for idx, opts := range res { + aliases := map[string]bool{} + for alias := range opts.Aliases { + aliases[alias] = true + } + // ignoring error and ok, ES should return valid date in string format + creationDateStr, _ := opts.Settings["index.creation_date"].(string) + creationDate, _ := strconv.ParseInt(creationDateStr, 10, 64) + indices = append(indices, client.Index{ + Index: idx, + CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate), + Aliases: aliases, + }) + } + return indices, nil +} + +type indexOption struct { + indexType string + mapping string +} + +func (i *indexOption) indexName(indexPrefix config.IndexPrefix) string { + return indexPrefix.Apply(i.indexType) +} + +// readAliasName returns read alias name of the index +func (i *indexOption) readAliasName(indexPrefix config.IndexPrefix) string { + return fmt.Sprintf(readAliasFormat, i.indexName(indexPrefix)) +} + +// writeAliasName returns write alias name of the index +func (i *indexOption) writeAliasName(indexPrefix config.IndexPrefix) string { + return fmt.Sprintf(writeAliasFormat, i.indexName(indexPrefix)) +} + +// initialRolloverIndex returns the initial index rollover name +func (i *indexOption) initialRolloverIndex(indexPrefix config.IndexPrefix) string { + return fmt.Sprintf(rolloverIndexFormat, i.indexName(indexPrefix)) +} + +// templateName returns the prefixed template name +func (i *indexOption) templateName(indexPrefix config.IndexPrefix) string { + return indexPrefix.Apply(i.mapping) +} + +func (p *PolicyManager) rolloverIndices() []indexOption { + switch p.applyOn { + case OnArchive: + return []indexOption{ + { + indexType: "jaeger-span-archive", + mapping: "jaeger-span", + }, + } + case OnDependency: + return []indexOption{ + { + mapping: "jaeger-dependencies", + indexType: "jaeger-dependencies", + }, + } + case OnServiceAndSpan: + return []indexOption{ + { + mapping: "jaeger-span", + indexType: "jaeger-span", + }, + { + mapping: "jaeger-service", + indexType: "jaeger-service", + }, + } + case OnSampling: + return []indexOption{ + { + mapping: "jaeger-sampling", + indexType: "jaeger-sampling", + }, + } + } + return []indexOption{} +} + +func loadPolicy(name string) string { + file, _ := ILM.ReadFile(name) + return string(file) +} diff --git a/plugin/storage/es/ilm/ism-policy.json b/plugin/storage/es/ilm/ism-policy.json new file mode 100644 index 00000000000..677cac0bf4b --- /dev/null +++ b/plugin/storage/es/ilm/ism-policy.json @@ -0,0 +1,18 @@ +{ + "policy": { + "description": "Jaeger default ISM Policy", + "default_state": "hot", + "states": [ + { + "name": "hot", + "actions": [ + { + "rollover": { + "min_index_age": "1d" + } + } + ] + } + ] + } +} \ No newline at end of file diff --git a/plugin/storage/es/mappings/jaeger-dependencies-7.json b/plugin/storage/es/mappings/jaeger-dependencies-7.json index 18afe1b056e..e05c1c74e21 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-7.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-7.json @@ -10,12 +10,16 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }} - ,"lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" } {{- end }} + {{- end }} }, "mappings":{} } diff --git a/plugin/storage/es/mappings/jaeger-dependencies-8.json b/plugin/storage/es/mappings/jaeger-dependencies-8.json index e06d9826a13..55641052b6a 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-8.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-8.json @@ -13,10 +13,14 @@ "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true {{- if .UseILM }}, - "lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" - } + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + } + {{- end }} {{- end }} }, "mappings": {} diff --git a/plugin/storage/es/mappings/jaeger-sampling-7.json b/plugin/storage/es/mappings/jaeger-sampling-7.json index 167c1d47928..6d90482f1dd 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-7.json +++ b/plugin/storage/es/mappings/jaeger-sampling-7.json @@ -10,12 +10,16 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":false - {{- if .UseILM }} - ,"lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" - } - {{- end }} + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sapmling-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + } + {{- end }} + {{- end }} }, "mappings":{} } diff --git a/plugin/storage/es/mappings/jaeger-sampling-8.json b/plugin/storage/es/mappings/jaeger-sampling-8.json index d9adeb8bfa2..9c16b064d9a 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-8.json +++ b/plugin/storage/es/mappings/jaeger-sampling-8.json @@ -12,13 +12,17 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": false - {{- if .UseILM }}, - "lifecycle": { + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + {{- else }} + "lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" } - {{- end }} - }, + {{- end }} + {{- end }} + }, "mappings": {} } } diff --git a/plugin/storage/es/mappings/jaeger-service-7.json b/plugin/storage/es/mappings/jaeger-service-7.json index a0bf5c3f392..d1d55849ffe 100644 --- a/plugin/storage/es/mappings/jaeger-service-7.json +++ b/plugin/storage/es/mappings/jaeger-service-7.json @@ -10,12 +10,16 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }} - ,"lifecycle": { + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + {{- else }} + "lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" - } - {{- end }} + } + {{- end }} + {{- end }} }, "mappings":{ "dynamic_templates":[ diff --git a/plugin/storage/es/mappings/jaeger-service-8.json b/plugin/storage/es/mappings/jaeger-service-8.json index 33039fb6be2..c403bd2878a 100644 --- a/plugin/storage/es/mappings/jaeger-service-8.json +++ b/plugin/storage/es/mappings/jaeger-service-8.json @@ -12,12 +12,16 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true - {{- if .UseILM }}, - "lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" - } - {{- end }} + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + } + {{- end }} + {{- end }} }, "mappings": { "dynamic_templates": [ diff --git a/plugin/storage/es/mappings/jaeger-span-7.json b/plugin/storage/es/mappings/jaeger-span-7.json index 3e8f7c39358..5074dee093e 100644 --- a/plugin/storage/es/mappings/jaeger-span-7.json +++ b/plugin/storage/es/mappings/jaeger-span-7.json @@ -10,12 +10,16 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }} - ,"lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" - } - {{- end }} + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + } + {{- end }} + {{- end }} }, "mappings":{ "dynamic_templates":[ diff --git a/plugin/storage/es/mappings/jaeger-span-8.json b/plugin/storage/es/mappings/jaeger-span-8.json index b9ee8a76b8f..8f3116d4d79 100644 --- a/plugin/storage/es/mappings/jaeger-span-8.json +++ b/plugin/storage/es/mappings/jaeger-span-8.json @@ -13,13 +13,17 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true - {{- if .UseILM }}, - "lifecycle": { - "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" - } - {{- end }} - }, + {{- if .UseILM }}, + {{- if .IsOpenSearch }}, + "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + {{- else }} + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + } + {{- end }} + {{- end }} + }, "mappings": { "dynamic_templates": [ { diff --git a/plugin/storage/es/mappings/mapping.go b/plugin/storage/es/mappings/mapping.go index b1d8e061407..b1431ec3c34 100644 --- a/plugin/storage/es/mappings/mapping.go +++ b/plugin/storage/es/mappings/mapping.go @@ -34,6 +34,7 @@ type MappingBuilder struct { EsVersion uint UseILM bool ILMPolicyName string + IsOpenSearch bool } // templateParams holds parameters required to render an elasticsearch index template @@ -44,13 +45,14 @@ type templateParams struct { Shards int64 Replicas int64 Priority int64 + IsOpenSearch bool } func (mb MappingBuilder) getMappingTemplateOptions(mappingType MappingType) templateParams { mappingOpts := templateParams{} mappingOpts.UseILM = mb.UseILM mappingOpts.ILMPolicyName = mb.ILMPolicyName - + mappingOpts.IsOpenSearch = mb.IsOpenSearch switch mappingType { case SpanMapping: mappingOpts.Shards = mb.Indices.Spans.Shards diff --git a/storage_v2/depstore/mocks/Writer.go b/storage_v2/depstore/mocks/Writer.go new file mode 100644 index 00000000000..0c3680c4059 --- /dev/null +++ b/storage_v2/depstore/mocks/Writer.go @@ -0,0 +1,52 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + model "github.com/jaegertracing/jaeger/model" + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// Writer is an autogenerated mock type for the Writer type +type Writer struct { + mock.Mock +} + +// WriteDependencies provides a mock function with given fields: ts, dependencies +func (_m *Writer) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { + ret := _m.Called(ts, dependencies) + + if len(ret) == 0 { + panic("no return value specified for WriteDependencies") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time, []model.DependencyLink) error); ok { + r0 = rf(ts, dependencies) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *Writer { + mock := &Writer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From cc7c5118a7b9723c96dc316d6870f1d82ec4185c Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 02:16:23 +0530 Subject: [PATCH 02/11] opensearch integration Signed-off-by: Manik2708 --- cmd/jaeger/config-opensearch-ilm.yaml | 76 +++++ .../extension/jaegerstorage/extension.go | 2 + .../e2e_elasticsearch_ilm_integration.go | 25 +- pkg/es/wrapper/wrapper.go | 4 +- plugin/storage/es/dependencystore/storage.go | 6 + plugin/storage/es/factory.go | 21 +- plugin/storage/es/factory_test.go | 17 +- plugin/storage/es/ilm/createpolicy.go | 53 ++++ plugin/storage/es/ilm/ilm.go | 284 ------------------ plugin/storage/es/ilm/ism-policy.json | 6 +- plugin/storage/es/ilm/policymanager.go | 147 +++++++++ .../es/mappings/jaeger-dependencies-7.json | 8 +- .../es/mappings/jaeger-dependencies-8.json | 8 +- .../es/mappings/jaeger-sampling-7.json | 8 +- .../es/mappings/jaeger-sampling-8.json | 8 +- .../storage/es/mappings/jaeger-service-7.json | 8 +- .../storage/es/mappings/jaeger-service-8.json | 8 +- plugin/storage/es/mappings/jaeger-span-7.json | 8 +- plugin/storage/es/mappings/jaeger-span-8.json | 8 +- plugin/storage/es/mappings/mapping_test.go | 8 +- plugin/storage/es/samplingstore/storage.go | 6 + plugin/storage/es/spanstore/writer.go | 38 ++- 22 files changed, 401 insertions(+), 356 deletions(-) create mode 100644 cmd/jaeger/config-opensearch-ilm.yaml create mode 100644 plugin/storage/es/ilm/createpolicy.go delete mode 100644 plugin/storage/es/ilm/ilm.go create mode 100644 plugin/storage/es/ilm/policymanager.go diff --git a/cmd/jaeger/config-opensearch-ilm.yaml b/cmd/jaeger/config-opensearch-ilm.yaml new file mode 100644 index 00000000000..cb855cddb44 --- /dev/null +++ b/cmd/jaeger/config-opensearch-ilm.yaml @@ -0,0 +1,76 @@ +service: + extensions: [jaeger_storage, jaeger_query, healthcheckv2] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + telemetry: + resource: + service.name: jaeger + metrics: + level: detailed + address: 0.0.0.0:8888 + logs: + level: debug + # TODO Initialize telemetry tracer once OTEL released new feature. + # https://github.com/open-telemetry/opentelemetry-collector/issues/10663 + +extensions: + healthcheckv2: + use_v2: true + http: + + jaeger_query: + storage: + traces: some_storage + traces_archive: another_storage + ui: + config_file: ./cmd/jaeger/config-ui.json + + jaeger_storage: + backends: + some_storage: + opensearch: + use_ilm: true + create_mappings: true + use_aliases: true + indices: + index_prefix: "jaeger-main" + spans: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + another_storage: + opensearch: + indices: + index_prefix: "jaeger-archive" + +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: some_storage diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 53e29a07d7e..5be4f7d1c56 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -155,12 +155,14 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error { s.telset.Logger, ) case cfg.Elasticsearch != nil: + //nolint: contextcheck factory, err = es.NewFactoryWithConfig( *cfg.Elasticsearch, scopedMetricsFactory(storageName, "elasticsearch", "tracestore"), s.telset.Logger, ) case cfg.Opensearch != nil: + //nolint: contextcheck factory, err = es.NewFactoryWithConfig( *cfg.Opensearch, scopedMetricsFactory(storageName, "opensearch", "tracestore"), diff --git a/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go b/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go index 8ec4647b452..e36c20b09d7 100644 --- a/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go +++ b/cmd/jaeger/internal/integration/e2e_elasticsearch_ilm_integration.go @@ -41,16 +41,23 @@ func (e *E2EElasticSearchILMIntegration) RunTests(t *testing.T) { if version < 7 { t.Skip("Automated Rollover is supported only for 7+ versions") } - s := &E2EStorageIntegration{ - ConfigFile: "../../config-elasticsearch-ilm.yaml", - StorageIntegration: integration.StorageIntegration{ - CleanUp: purge, - Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), - }, - } if e.isOpenSearch { + s := &E2EStorageIntegration{ + ConfigFile: "../../config-opensearch-ilm.yaml", + StorageIntegration: integration.StorageIntegration{ + CleanUp: purge, + Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), + }, + } s.e2eInitialize(t, "opensearch") } else { + s := &E2EStorageIntegration{ + ConfigFile: "../../config-elasticsearch-ilm.yaml", + StorageIntegration: integration.StorageIntegration{ + CleanUp: purge, + Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), + }, + } s.e2eInitialize(t, "elasticsearch") } var v8client *esV8.Client @@ -78,7 +85,7 @@ func (e *E2EElasticSearchILMIntegration) checkForIlmPolicyCreation(t *testing.T, } } else { _, err := client.PerformRequest(context.Background(), elastic.PerformRequestOptions{ - Path: "_plugins/_ism/policies/" + defaultIsmPolicy, + Path: "/_plugins/_ism/policies/" + defaultIsmPolicy, Method: http.MethodGet, }) require.NoError(t, err) @@ -177,7 +184,7 @@ func (e *E2EElasticSearchILMIntegration) cleanES(t *testing.T, client *elastic.C } } else { _, err := client.PerformRequest(context.Background(), elastic.PerformRequestOptions{ - Path: "_plugins/_ism/policies/" + defaultIsmPolicy, + Path: "/_plugins/_ism/policies/" + defaultIsmPolicy, Method: http.MethodDelete, }) require.NoError(t, err) diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index 21c9b4c2170..f9a80b0ea77 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -79,7 +79,7 @@ func (c ClientWrapper) CreateIlmPolicy() es.XPackIlmPutLifecycle { func (c ClientWrapper) CreateIsmPolicy(ctx context.Context, id, policy string) (*elastic.Response, error) { return c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ - Path: "_plugins/_ism/policies/" + id, + Path: "/_plugins/_ism/policies/" + id, Method: http.MethodPut, Body: policy, }) @@ -99,7 +99,7 @@ func (c ClientWrapper) IlmPolicyExists(ctx context.Context, id string) (bool, er func (c ClientWrapper) IsmPolicyExists(ctx context.Context, id string) (bool, error) { _, err := c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ - Path: "_opendistro/_ism/policies/" + id, + Path: "/_plugins/_ism/policies/" + id, Method: http.MethodGet, }) if err != nil { diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index ffceb95c98c..148beffe55a 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -18,6 +18,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" + "github.com/jaegertracing/jaeger/plugin/storage/es/ilm" ) const ( @@ -135,3 +136,8 @@ func (s *DependencyStore) getWriteIndex(ts time.Time) string { } return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) } + +func (s *DependencyStore) CreatePolicy(version uint, isOpenSearch bool) error { + policyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, s.dependencyIndexPrefix) + return policyManager.Init() +} diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 9f9e4aeb101..ec68a2603dd 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -164,6 +164,13 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } } + if f.primaryConfig.UseILM { + err = ilm.CreatePolicyIfNotExists(primaryClient, f.primaryConfig.IsOpenSearch, f.primaryConfig.Version) + if err != nil { + return fmt.Errorf("failed to create ILM policy: %w", err) + } + } + return nil } @@ -302,8 +309,8 @@ func createSpanWriter( ServiceCacheTTL: cfg.ServiceCacheTTL, }) - // Creating a template here would conflict with the one created for ILM resulting to no index rollover - if cfg.CreateIndexTemplates && !cfg.UseILM { + // Creating a template here no matter what is ilm. Creating template before creating policy will not give any error + if cfg.CreateIndexTemplates { mappingBuilder := mappingBuilderFromConfig(cfg) spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() if err != nil { @@ -314,8 +321,7 @@ func createSpanWriter( } } if cfg.UseILM { - policyManager := ilm.NewPolicyManager(clientFn(), cfg, ilm.OnServiceAndSpan) - err = policyManager.Init() + err := writer.CreatePolicy(cfg.Version, cfg.IsOpenSearch) if err != nil { return nil, err } @@ -335,7 +341,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store } store := esSampleStore.NewSamplingStore(params) - if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM { + if f.primaryConfig.CreateIndexTemplates { mappingBuilder := mappingBuilderFromConfig(f.primaryConfig) samplingMapping, err := mappingBuilder.GetSamplingMappings() if err != nil { @@ -346,8 +352,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store } } if f.primaryConfig.UseILM { - policyManager := ilm.NewPolicyManager(f.getPrimaryClient(), f.primaryConfig, ilm.OnSampling) - err := policyManager.Init() + err := store.CreatePolicy(f.primaryConfig.Version, f.primaryConfig.IsOpenSearch) if err != nil { return nil, err } @@ -361,6 +366,8 @@ func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder Indices: cfg.Indices, EsVersion: cfg.Version, UseILM: cfg.UseILM, + IsOpenSearch: cfg.IsOpenSearch, + ILMPolicyName: ilm.DefaultIlmPolicy, } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 7dcd103ebf1..e66303af9d7 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -54,6 +54,7 @@ func (m *mockClientBuilder) NewClient(*escfg.Configuration, *zap.Logger, metrics tService.On("Body", mock.Anything).Return(tService) tService.On("Do", context.Background()).Return(nil, m.createTemplateError) c.On("CreateTemplate", mock.Anything).Return(tService) + c.On("IlmPolicyExists", mock.Anything, mock.Anything).Return(true, nil) c.On("GetVersion").Return(uint(6)) c.On("Close").Return(nil) return c, nil @@ -122,11 +123,13 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { f := NewFactory() f.primaryConfig = &escfg.Configuration{ - UseILM: true, + UseILM: true, + Version: 7, } f.archiveConfig = &escfg.Configuration{ Enabled: true, UseILM: true, + Version: 7, } f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -219,18 +222,6 @@ func TestCreateTemplateError(t *testing.T) { require.Error(t, err, "template-error") } -func TestILMDisableTemplateCreation(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} - f.archiveConfig = &escfg.Configuration{} - f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - defer f.Close() - require.NoError(t, err) - _, err = f.CreateSpanWriter() - require.NoError(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error -} - func TestArchiveDisabled(t *testing.T) { f := NewFactory() f.archiveConfig = &escfg.Configuration{Enabled: false} diff --git a/plugin/storage/es/ilm/createpolicy.go b/plugin/storage/es/ilm/createpolicy.go new file mode 100644 index 00000000000..963d14972b9 --- /dev/null +++ b/plugin/storage/es/ilm/createpolicy.go @@ -0,0 +1,53 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package ilm + +import ( + "context" + "embed" + + "github.com/jaegertracing/jaeger/pkg/es" +) + +//go:embed *.json +var ILM embed.FS + +func CreatePolicyIfNotExists(client es.Client, isOpenSearch bool, version uint) error { + if version < ilmVersionSupport { + return ErrIlmNotSupported + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if !isOpenSearch { + policyExists, err := client.IlmPolicyExists(ctx, DefaultIlmPolicy) + if err != nil { + return err + } + if !policyExists { + policy := loadPolicy(ilmPolicyFile) + _, err = client.CreateIlmPolicy().Policy(DefaultIlmPolicy).BodyString(policy).Do(ctx) + if err != nil { + return err + } + } + } else { + policyExists, err := client.IsmPolicyExists(ctx, DefaultIsmPolicy) + if err != nil { + return err + } + if !policyExists { + policy := loadPolicy(ismPolicyFile) + _, err = client.CreateIsmPolicy(ctx, DefaultIsmPolicy, policy) + if err != nil { + return err + } + } + } + return nil +} + +func loadPolicy(name string) string { + file, _ := ILM.ReadFile(name) + return string(file) +} diff --git a/plugin/storage/es/ilm/ilm.go b/plugin/storage/es/ilm/ilm.go deleted file mode 100644 index f2e13127357..00000000000 --- a/plugin/storage/es/ilm/ilm.go +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright (c) 2025 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package ilm - -import ( - "context" - "embed" - "errors" - "fmt" - "strconv" - "time" - - "github.com/jaegertracing/jaeger/pkg/es" - "github.com/jaegertracing/jaeger/pkg/es/client" - "github.com/jaegertracing/jaeger/pkg/es/config" - "github.com/jaegertracing/jaeger/pkg/es/filter" - "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" -) - -//go:embed *.json -var ILM embed.FS - -const ( - defaultIlmPolicy = "jaeger-default-ilm-policy" - defaultIsmPolicy = "jaeger-default-ism-policy" - writeAliasFormat = "%s-write" - readAliasFormat = "%s-read" - rolloverIndexFormat = "%s-000001" - ilmVersionSupport = 7 - ilmPolicyFile = "ilm-policy.json" - ismPolicyFile = "ism-policy.json" -) - -type ApplyOn int - -const ( - OnArchive ApplyOn = iota - OnDependency - OnServiceAndSpan - OnSampling -) - -var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+") - -type PolicyManager struct { - client es.Client - config *config.Configuration - applyOn ApplyOn -} - -func (p *PolicyManager) Init() error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - if p.config.Version < ilmVersionSupport { - return ErrIlmNotSupported - } - err := p.createPolicyIfNotExists(ctx) - if err != nil { - return err - } - rolloverIndices := p.rolloverIndices() - for _, index := range rolloverIndices { - err = p.init(ctx, index) - if err != nil { - return err - } - } - return nil -} - -func NewPolicyManager(cl es.Client, configuration *config.Configuration, applyOn ApplyOn) *PolicyManager { - return &PolicyManager{ - client: cl, - config: configuration, - applyOn: applyOn, - } -} - -func (p *PolicyManager) createPolicyIfNotExists(ctx context.Context) error { - if p.config.IsOpenSearch { - policyExists, err := p.client.IsmPolicyExists(ctx, defaultIsmPolicy) - if err != nil { - return err - } - if !policyExists { - policy := loadPolicy(ismPolicyFile) - _, err := p.client.CreateIsmPolicy(ctx, defaultIsmPolicy, policy) - if err != nil { - return err - } - } - } else { - policyExists, err := p.client.IlmPolicyExists(ctx, defaultIlmPolicy) - if err != nil { - return err - } - if !policyExists { - policy := loadPolicy(ilmPolicyFile) - _, err := p.client.CreateIlmPolicy().Policy(defaultIlmPolicy).BodyString(policy).Do(ctx) - if err != nil { - return err - } - } - } - return nil -} - -func (p *PolicyManager) init(ctx context.Context, indexOpt indexOption) error { - if p.config.CreateIndexTemplates { - mappingType, err := mappings.MappingTypeFromString(indexOpt.mapping) - if err != nil { - return err - } - mapping, err := p.getMapping(mappingType) - if err != nil { - return err - } - _, err = p.client.CreateTemplate(indexOpt.templateName(p.config.Indices.IndexPrefix)).Body(mapping).Do(ctx) - if err != nil { - return err - } - } - index := indexOpt.initialRolloverIndex(p.config.Indices.IndexPrefix) - err := p.createIndexIfNotExists(ctx, index) - if err != nil { - return err - } - jaegerIndices, err := p.getJaegerIndices(ctx, indexOpt) - if err != nil { - return err - } - readAlias := indexOpt.readAliasName(p.config.Indices.IndexPrefix) - writeAlias := indexOpt.writeAliasName(p.config.Indices.IndexPrefix) - var aliases []client.Alias - if !filter.AliasExists(jaegerIndices, readAlias) { - aliases = append(aliases, client.Alias{ - Index: index, - Name: readAlias, - IsWriteIndex: false, - }) - } - if !filter.AliasExists(jaegerIndices, writeAlias) { - aliases = append(aliases, client.Alias{ - Index: index, - Name: writeAlias, - IsWriteIndex: p.config.UseILM, - }) - } - for _, alias := range aliases { - _, err = p.client.CreateAlias(alias.Name).Index(alias.Index).IsWriteIndex(alias.IsWriteIndex).Do(ctx) - if err != nil { - return err - } - } - return nil -} - -func (p *PolicyManager) getMapping(mappingType mappings.MappingType) (string, error) { - mappingBuilder := &mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Indices: p.config.Indices, - EsVersion: p.config.Version, - UseILM: p.config.UseILM, - } - if p.config.IsOpenSearch { - mappingBuilder.IsOpenSearch = true - mappingBuilder.ILMPolicyName = defaultIsmPolicy - } else { - mappingBuilder.ILMPolicyName = defaultIlmPolicy - } - return mappingBuilder.GetMapping(mappingType) -} - -func (p *PolicyManager) createIndexIfNotExists(ctx context.Context, index string) error { - exists, err := p.client.IndexExists(index).Do(ctx) - if err != nil { - return err - } - if !exists { - _, err = p.client.CreateIndex(index).Do(ctx) - if err != nil { - return err - } - } - return nil -} - -func (p *PolicyManager) getJaegerIndices(ctx context.Context, indexOpt indexOption) ([]client.Index, error) { - // One of the example of the prefix is: jaeger-main-jaeger-span-* where jaeger-main is the index prefix - prefix := indexOpt.indexName(p.config.Indices.IndexPrefix) + "-*" - res, err := p.client.GetIndices().Index(prefix).Do(ctx) - if err != nil { - return nil, err - } - indices := make([]client.Index, len(res)) - for idx, opts := range res { - aliases := map[string]bool{} - for alias := range opts.Aliases { - aliases[alias] = true - } - // ignoring error and ok, ES should return valid date in string format - creationDateStr, _ := opts.Settings["index.creation_date"].(string) - creationDate, _ := strconv.ParseInt(creationDateStr, 10, 64) - indices = append(indices, client.Index{ - Index: idx, - CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate), - Aliases: aliases, - }) - } - return indices, nil -} - -type indexOption struct { - indexType string - mapping string -} - -func (i *indexOption) indexName(indexPrefix config.IndexPrefix) string { - return indexPrefix.Apply(i.indexType) -} - -// readAliasName returns read alias name of the index -func (i *indexOption) readAliasName(indexPrefix config.IndexPrefix) string { - return fmt.Sprintf(readAliasFormat, i.indexName(indexPrefix)) -} - -// writeAliasName returns write alias name of the index -func (i *indexOption) writeAliasName(indexPrefix config.IndexPrefix) string { - return fmt.Sprintf(writeAliasFormat, i.indexName(indexPrefix)) -} - -// initialRolloverIndex returns the initial index rollover name -func (i *indexOption) initialRolloverIndex(indexPrefix config.IndexPrefix) string { - return fmt.Sprintf(rolloverIndexFormat, i.indexName(indexPrefix)) -} - -// templateName returns the prefixed template name -func (i *indexOption) templateName(indexPrefix config.IndexPrefix) string { - return indexPrefix.Apply(i.mapping) -} - -func (p *PolicyManager) rolloverIndices() []indexOption { - switch p.applyOn { - case OnArchive: - return []indexOption{ - { - indexType: "jaeger-span-archive", - mapping: "jaeger-span", - }, - } - case OnDependency: - return []indexOption{ - { - mapping: "jaeger-dependencies", - indexType: "jaeger-dependencies", - }, - } - case OnServiceAndSpan: - return []indexOption{ - { - mapping: "jaeger-span", - indexType: "jaeger-span", - }, - { - mapping: "jaeger-service", - indexType: "jaeger-service", - }, - } - case OnSampling: - return []indexOption{ - { - mapping: "jaeger-sampling", - indexType: "jaeger-sampling", - }, - } - } - return []indexOption{} -} - -func loadPolicy(name string) string { - file, _ := ILM.ReadFile(name) - return string(file) -} diff --git a/plugin/storage/es/ilm/ism-policy.json b/plugin/storage/es/ilm/ism-policy.json index 677cac0bf4b..519a1ed524c 100644 --- a/plugin/storage/es/ilm/ism-policy.json +++ b/plugin/storage/es/ilm/ism-policy.json @@ -13,6 +13,10 @@ } ] } - ] + ], + "ism_template": { + "index_patterns": ["*jaeger-*"], + "priority": 100 + } } } \ No newline at end of file diff --git a/plugin/storage/es/ilm/policymanager.go b/plugin/storage/es/ilm/policymanager.go new file mode 100644 index 00000000000..05110b51b51 --- /dev/null +++ b/plugin/storage/es/ilm/policymanager.go @@ -0,0 +1,147 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package ilm + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/filter" +) + +const ( + DefaultIlmPolicy = "jaeger-default-ilm-policy" + DefaultIsmPolicy = "jaeger-default-ism-policy" + writeAliasSuffix = "write" + readAliasSuffix = "read" + rolloverIndexSuffix = "000001" + ilmVersionSupport = 7 + ilmPolicyFile = "ilm-policy.json" + ismPolicyFile = "ism-policy.json" +) + +var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+") + +type PolicyManager struct { + client func() es.Client + prefixedIndexNameWithSeparator string + version uint + isOpenSearch bool +} + +func (p *PolicyManager) Init() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if p.version < ilmVersionSupport { + return ErrIlmNotSupported + } + err := p.init(ctx) + if err != nil { + return err + } + return nil +} + +func NewPolicyManager(cl func() es.Client, version uint, isOpenSearch bool, prefixedIndexNameWithSeparator string) *PolicyManager { + return &PolicyManager{ + client: cl, + version: version, + isOpenSearch: isOpenSearch, + prefixedIndexNameWithSeparator: prefixedIndexNameWithSeparator, + } +} + +func (p *PolicyManager) init(ctx context.Context) error { + index := p.initialRolloverIndex() + err := p.createIndexIfNotExists(ctx, index) + if err != nil { + return err + } + jaegerIndices, err := p.getJaegerIndices(ctx) + if err != nil { + return err + } + readAlias := p.readAliasName() + writeAlias := p.writeAliasName() + var aliases []client.Alias + if !filter.AliasExists(jaegerIndices, readAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: readAlias, + IsWriteIndex: false, + }) + } + if !filter.AliasExists(jaegerIndices, writeAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: writeAlias, + IsWriteIndex: true, + }) + } + for _, alias := range aliases { + _, err = p.client().CreateAlias(alias.Name).Index(alias.Index).IsWriteIndex(alias.IsWriteIndex).Do(ctx) + if err != nil { + return err + } + } + return nil +} + +func (p *PolicyManager) createIndexIfNotExists(ctx context.Context, index string) error { + exists, err := p.client().IndexExists(index).Do(ctx) + if err != nil { + return err + } + if !exists { + _, err = p.client().CreateIndex(index).Do(ctx) + if err != nil { + return err + } + } + return nil +} + +func (p *PolicyManager) getJaegerIndices(ctx context.Context) ([]client.Index, error) { + // One of the example of the prefix is: jaeger-main-jaeger-span-* where jaeger-main is the index prefix + prefix := p.prefixedIndexNameWithSeparator + "*" + res, err := p.client().GetIndices().Index(prefix).Do(ctx) + if err != nil { + return nil, err + } + indices := make([]client.Index, len(res)) + for idx, opts := range res { + aliases := map[string]bool{} + for alias := range opts.Aliases { + aliases[alias] = true + } + // ignoring error and ok, ES should return valid date in string format + creationDateStr, _ := opts.Settings["index.creation_date"].(string) + creationDate, _ := strconv.ParseInt(creationDateStr, 10, 64) + indices = append(indices, client.Index{ + Index: idx, + CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate), + Aliases: aliases, + }) + } + return indices, nil +} + +// writeAliasName returns write alias name of the index +func (p *PolicyManager) writeAliasName() string { + return p.prefixedIndexNameWithSeparator + writeAliasSuffix +} + +// readAliasName returns read alias name of the index +func (p *PolicyManager) readAliasName() string { + return p.prefixedIndexNameWithSeparator + readAliasSuffix +} + +// initialRolloverIndex returns the initial index rollover name +func (p *PolicyManager) initialRolloverIndex() string { + return p.prefixedIndexNameWithSeparator + rolloverIndexSuffix +} diff --git a/plugin/storage/es/mappings/jaeger-dependencies-7.json b/plugin/storage/es/mappings/jaeger-dependencies-7.json index e05c1c74e21..7034723abe1 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-7.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-7.json @@ -10,11 +10,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" } diff --git a/plugin/storage/es/mappings/jaeger-dependencies-8.json b/plugin/storage/es/mappings/jaeger-dependencies-8.json index 55641052b6a..d206b41782f 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-8.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-8.json @@ -12,11 +12,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" } diff --git a/plugin/storage/es/mappings/jaeger-sampling-7.json b/plugin/storage/es/mappings/jaeger-sampling-7.json index 6d90482f1dd..4a69fb829e4 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-7.json +++ b/plugin/storage/es/mappings/jaeger-sampling-7.json @@ -10,11 +10,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":false - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sapmling-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sapmling-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" } diff --git a/plugin/storage/es/mappings/jaeger-sampling-8.json b/plugin/storage/es/mappings/jaeger-sampling-8.json index 9c16b064d9a..d03af298370 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-8.json +++ b/plugin/storage/es/mappings/jaeger-sampling-8.json @@ -12,11 +12,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": false - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" } diff --git a/plugin/storage/es/mappings/jaeger-service-7.json b/plugin/storage/es/mappings/jaeger-service-7.json index d1d55849ffe..cd4e3e047f7 100644 --- a/plugin/storage/es/mappings/jaeger-service-7.json +++ b/plugin/storage/es/mappings/jaeger-service-7.json @@ -10,11 +10,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" } diff --git a/plugin/storage/es/mappings/jaeger-service-8.json b/plugin/storage/es/mappings/jaeger-service-8.json index c403bd2878a..4e52e5b65ec 100644 --- a/plugin/storage/es/mappings/jaeger-service-8.json +++ b/plugin/storage/es/mappings/jaeger-service-8.json @@ -12,11 +12,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" } diff --git a/plugin/storage/es/mappings/jaeger-span-7.json b/plugin/storage/es/mappings/jaeger-span-7.json index 5074dee093e..ddf7be1c12d 100644 --- a/plugin/storage/es/mappings/jaeger-span-7.json +++ b/plugin/storage/es/mappings/jaeger-span-7.json @@ -10,11 +10,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" } diff --git a/plugin/storage/es/mappings/jaeger-span-8.json b/plugin/storage/es/mappings/jaeger-span-8.json index 8f3116d4d79..62e7c5e7a77 100644 --- a/plugin/storage/es/mappings/jaeger-span-8.json +++ b/plugin/storage/es/mappings/jaeger-span-8.json @@ -13,11 +13,11 @@ "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true - {{- if .UseILM }}, - {{- if .IsOpenSearch }}, - "plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + {{- if .UseILM }} + {{- if .IsOpenSearch }} + ,"plugins.index_state_management.rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" {{- else }} - "lifecycle": { + ,"lifecycle": { "name": "{{ .ILMPolicyName }}", "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" } diff --git a/plugin/storage/es/mappings/mapping_test.go b/plugin/storage/es/mappings/mapping_test.go index f7032253c37..9c20e98e278 100644 --- a/plugin/storage/es/mappings/mapping_test.go +++ b/plugin/storage/es/mappings/mapping_test.go @@ -5,10 +5,12 @@ package mappings import ( "embed" + "encoding/json" "errors" "fmt" "io" "os" + "reflect" "testing" "text/template" @@ -68,14 +70,16 @@ func TestMappingBuilderGetMapping(t *testing.T) { UseILM: true, ILMPolicyName: "jaeger-test-policy", } + var wantObj, gotObj map[string]any got, err := mb.GetMapping(tt.mapping) require.NoError(t, err) + require.NoError(t, json.Unmarshal([]byte(got), &gotObj)) var wantbytes []byte fileSuffix := fmt.Sprintf("-%d", tt.esVersion) wantbytes, err = FIXTURES.ReadFile("fixtures/" + templateName + fileSuffix + ".json") require.NoError(t, err) - want := string(wantbytes) - assert.Equal(t, want, got) + require.NoError(t, json.Unmarshal(wantbytes, &wantObj)) + assert.True(t, reflect.DeepEqual(wantObj, gotObj)) }) } } diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go index db6c960c69c..45d0b07fe81 100644 --- a/plugin/storage/es/samplingstore/storage.go +++ b/plugin/storage/es/samplingstore/storage.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/plugin/storage/es/ilm" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore/dbmodel" "github.com/jaegertracing/jaeger/storage/samplingstore/model" ) @@ -151,6 +152,11 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, }).Add() } +func (s *SamplingStore) CreatePolicy(version uint, isOpenSearch bool) error { + policyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, s.samplingIndexPrefix) + return policyManager.Init() +} + func getLatestIndices(indexPrefix, indexDateLayout string, clientFn es.Client, rollover time.Duration, maxDuration time.Duration) ([]string, error) { ctx := context.Background() now := time.Now().UTC() diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index f8647a1db0f..8b602c528b1 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -7,6 +7,7 @@ package spanstore import ( "context" "fmt" + "strings" "time" "go.uber.org/zap" @@ -16,6 +17,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/es" cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/es/ilm" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/internal/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics" ) @@ -39,9 +41,10 @@ type SpanWriter struct { logger *zap.Logger writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn // indexCache cache.Cache - serviceWriter serviceWriter - spanConverter dbmodel.FromDomain - spanServiceIndex spanAndServiceIndexFn + serviceWriter serviceWriter + spanConverter dbmodel.FromDomain + spanServiceIndex spanAndServiceIndexFn + spanAndServicePrefixFn spanAndServicePrefixFn } // SpanWriterParams holds constructor parameters for NewSpanWriter @@ -83,9 +86,10 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { writerMetrics: spanWriterMetrics{ indexCreate: spanstoremetrics.NewWriter(p.MetricsFactory, "index_create"), }, - serviceWriter: serviceOperationStorage.Write, - spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p, writeAliasSuffix), + serviceWriter: serviceOperationStorage.Write, + spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), + spanServiceIndex: getSpanAndServiceIndexFn(p, writeAliasSuffix), + spanAndServicePrefixFn: p.getSpanAndServicePrefixFn, } } @@ -104,6 +108,28 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, index return nil } +func (s *SpanWriter) CreatePolicy(version uint, isOpenSearch bool) error { + spanPrefix := s.spanAndServicePrefixFn(spanIndexBaseName) + spanPolicyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, spanPrefix) + err := spanPolicyManager.Init() + if err != nil { + return err + } + servicePrefix := s.spanAndServicePrefixFn(serviceIndexBaseName) + servicePolicyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, servicePrefix) + return servicePolicyManager.Init() +} + +type spanAndServicePrefixFn func(baseIndexName string) string + +func (p SpanWriterParams) getSpanAndServicePrefixFn(baseIndexName string) string { + prefix := p.IndexPrefix.Apply(baseIndexName) + if strings.Contains(p.WriteAliasSuffix, "archive") { + prefix += "archive" + cfg.IndexPrefixSeparator + } + return prefix +} + // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) From 96380d75fe43689193a0597a61cc2bbdfa35a3a0 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 03:26:56 +0530 Subject: [PATCH 03/11] docs added Signed-off-by: Manik2708 --- pkg/es/config/config.go | 4 +++- pkg/es/wrapper/wrapper.go | 25 ++++++++++++++++++++ plugin/storage/es/dependencystore/storage.go | 4 ++-- plugin/storage/es/factory.go | 4 ++-- plugin/storage/es/ilm/createpolicy.go | 1 + plugin/storage/es/ilm/policymanager.go | 10 +++++--- plugin/storage/es/samplingstore/storage.go | 4 ++-- plugin/storage/es/spanstore/writer.go | 6 ++--- 8 files changed, 45 insertions(+), 13 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 3b6556c3308..e505877c3fa 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -134,7 +134,9 @@ type Configuration struct { // latest adaptive sampling probabilities. AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"` Tags TagsAsFields `mapstructure:"tags_as_fields"` - IsOpenSearch bool `mapstructure:"is_open_search"` + // IsOpenSearch stores whether the backend is of opensearch type or not. + // If kept empty, jaeger will automatically identify the distinction. + IsOpenSearch bool `mapstructure:"is_open_search"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` } diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index f9a80b0ea77..d79fc6f304f 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -57,26 +57,31 @@ func (c ClientWrapper) DeleteIndex(index string) es.IndicesDeleteService { return WrapESIndicesDeleteService(c.client.DeleteIndex(index)) } +// GetIndices call this function to internal client func (c ClientWrapper) GetIndices() es.IndicesGetService { indicesGetService := elastic.NewIndicesGetService(c.client) return WrapIndicesGetService(indicesGetService) } +// CreateAlias calls the AliasService in the internal client with AddAction induced in it func (c ClientWrapper) CreateAlias(alias string) es.AliasAddAction { aliasAddAction := elastic.NewAliasAddAction(alias) return WrapAliasAddAction(aliasAddAction, c.client) } +// DeleteAlias calls the AliasService in the internal client with RemoveAction induced in it func (c ClientWrapper) DeleteAlias(alias string) es.AliasRemoveAction { aliasRemoveAction := elastic.NewAliasRemoveAction(alias) return WrapAliasRemoveAction(aliasRemoveAction, c.client) } +// CreateIlmPolicy calls the internal XPackIlmPutLifecycle service func (c ClientWrapper) CreateIlmPolicy() es.XPackIlmPutLifecycle { xPack := elastic.NewXPackIlmPutLifecycleService(c.client) return WrapXPackIlmPutLifecycle(xPack) } +// CreateIsmPolicy creates the Ism Policy which is similar to ILM Policy (but not same) for OpenSearch func (c ClientWrapper) CreateIsmPolicy(ctx context.Context, id, policy string) (*elastic.Response, error) { return c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ Path: "/_plugins/_ism/policies/" + id, @@ -85,6 +90,7 @@ func (c ClientWrapper) CreateIsmPolicy(ctx context.Context, id, policy string) ( }) } +// IlmPolicyExists returns true if policy exists and returns false if not func (c ClientWrapper) IlmPolicyExists(ctx context.Context, id string) (bool, error) { ilmGetService := elastic.NewXPackIlmGetLifecycleService(c.client) _, err := ilmGetService.Policy(id).Do(ctx) @@ -97,6 +103,7 @@ func (c ClientWrapper) IlmPolicyExists(ctx context.Context, id string) (bool, er return true, nil } +// IsmPolicyExists returns true if policy exists and returns false if not func (c ClientWrapper) IsmPolicyExists(ctx context.Context, id string) (bool, error) { _, err := c.client.PerformRequest(ctx, elastic.PerformRequestOptions{ Path: "/_plugins/_ism/policies/" + id, @@ -349,76 +356,94 @@ func (s MultiSearchServiceWrapper) Do(ctx context.Context) (*elastic.MultiSearch return s.multiSearchService.Do(ctx) } +// AliasAddActionWrapper is a wrapper around elastic.AliasAddAction type AliasAddActionWrapper struct { aliasAddAction *elastic.AliasAddAction client *elastic.Client } +// WrapAliasAddAction creates an AliasAddActionWrapper out of *elastic.AliasAddAction. func WrapAliasAddAction(aliasAddAction *elastic.AliasAddAction, client *elastic.Client) AliasAddActionWrapper { return AliasAddActionWrapper{aliasAddAction: aliasAddAction, client: client} } +// Index calls this function to internal service. func (a AliasAddActionWrapper) Index(index ...string) es.AliasAddAction { return WrapAliasAddAction(a.aliasAddAction.Index(index...), a.client) } +// IsWriteIndex calls this function to internal service. func (a AliasAddActionWrapper) IsWriteIndex(flag bool) es.AliasAddAction { return WrapAliasAddAction(a.aliasAddAction.IsWriteIndex(flag), a.client) } +// Do calls this function to internal service. func (a AliasAddActionWrapper) Do(ctx context.Context) (*elastic.AliasResult, error) { return a.client.Alias().Action(a.aliasAddAction).Do(ctx) } +// AliasRemoveActionWrapper is a wrapper around elastic.AliasRemoveAction type AliasRemoveActionWrapper struct { aliasRemoveAction *elastic.AliasRemoveAction client *elastic.Client } +// WrapAliasRemoveAction creates an AliasRemoveActionWrapper out of *elastic.AliasRemoveAction. func WrapAliasRemoveAction(aliasRemoveAction *elastic.AliasRemoveAction, client *elastic.Client) AliasRemoveActionWrapper { return AliasRemoveActionWrapper{aliasRemoveAction: aliasRemoveAction, client: client} } +// Index calls this function to internal service. func (a AliasRemoveActionWrapper) Index(index ...string) es.AliasRemoveAction { return WrapAliasRemoveAction(a.aliasRemoveAction.Index(index...), a.client) } +// Do calls this function to internal service. func (a AliasRemoveActionWrapper) Do(ctx context.Context) (*elastic.AliasResult, error) { return a.client.Alias().Action(a.aliasRemoveAction).Do(ctx) } +// XPackIlmPutLifecycleWrapper is a wrapper around elastic.XPackIlmPutLifecycleService type XPackIlmPutLifecycleWrapper struct { xPackPutLifecycleWrapper *elastic.XPackIlmPutLifecycleService } +// WrapXPackIlmPutLifecycle creates an AliasRemoveActionWrapper out of *elastic.XPackIlmPutLifecycleService. func WrapXPackIlmPutLifecycle(xPackIlmPutLifecycleWrapper *elastic.XPackIlmPutLifecycleService) XPackIlmPutLifecycleWrapper { return XPackIlmPutLifecycleWrapper{xPackPutLifecycleWrapper: xPackIlmPutLifecycleWrapper} } +// BodyString calls this function to internal service. func (x XPackIlmPutLifecycleWrapper) BodyString(body string) es.XPackIlmPutLifecycle { return WrapXPackIlmPutLifecycle(x.xPackPutLifecycleWrapper.BodyString(body)) } +// Policy calls this function to internal service. func (x XPackIlmPutLifecycleWrapper) Policy(policy string) es.XPackIlmPutLifecycle { return WrapXPackIlmPutLifecycle(x.xPackPutLifecycleWrapper.Policy(policy)) } +// Do calls this function to internal service. func (x XPackIlmPutLifecycleWrapper) Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error) { return x.xPackPutLifecycleWrapper.Do(ctx) } +// IndicesGetServiceWrapper is a wrapper around elastic.IndicesGetService type IndicesGetServiceWrapper struct { indicesGetService *elastic.IndicesGetService } +// WrapIndicesGetService creates an AliasRemoveActionWrapper out of *elastic.IndicesGetService. func WrapIndicesGetService(indicesGetService *elastic.IndicesGetService) IndicesGetServiceWrapper { return IndicesGetServiceWrapper{indicesGetService: indicesGetService} } +// Index calls this function to internal service. func (i IndicesGetServiceWrapper) Index(indices ...string) es.IndicesGetService { return WrapIndicesGetService(i.indicesGetService.Index(indices...)) } +// Do calls this function to internal service. func (i IndicesGetServiceWrapper) Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error) { return i.indicesGetService.Do(ctx) } diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 148beffe55a..2896dfdfa6e 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -137,7 +137,7 @@ func (s *DependencyStore) getWriteIndex(ts time.Time) string { return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) } -func (s *DependencyStore) CreatePolicy(version uint, isOpenSearch bool) error { - policyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, s.dependencyIndexPrefix) +func (s *DependencyStore) CreatePolicy(version uint) error { + policyManager := ilm.NewPolicyManager(s.client, version, s.dependencyIndexPrefix) return policyManager.Init() } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index ec68a2603dd..8a96474d57b 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -321,7 +321,7 @@ func createSpanWriter( } } if cfg.UseILM { - err := writer.CreatePolicy(cfg.Version, cfg.IsOpenSearch) + err := writer.CreatePolicy(cfg.Version) if err != nil { return nil, err } @@ -352,7 +352,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store } } if f.primaryConfig.UseILM { - err := store.CreatePolicy(f.primaryConfig.Version, f.primaryConfig.IsOpenSearch) + err := store.CreatePolicy(f.primaryConfig.Version) if err != nil { return nil, err } diff --git a/plugin/storage/es/ilm/createpolicy.go b/plugin/storage/es/ilm/createpolicy.go index 963d14972b9..6edf68a2384 100644 --- a/plugin/storage/es/ilm/createpolicy.go +++ b/plugin/storage/es/ilm/createpolicy.go @@ -13,6 +13,7 @@ import ( //go:embed *.json var ILM embed.FS +// CreatePolicyIfNotExists creates ILM or ISM policy depending upon whether the server is OpenSearch or ElasticSearch func CreatePolicyIfNotExists(client es.Client, isOpenSearch bool, version uint) error { if version < ilmVersionSupport { return ErrIlmNotSupported diff --git a/plugin/storage/es/ilm/policymanager.go b/plugin/storage/es/ilm/policymanager.go index 05110b51b51..21dd8a463a3 100644 --- a/plugin/storage/es/ilm/policymanager.go +++ b/plugin/storage/es/ilm/policymanager.go @@ -27,13 +27,15 @@ const ( var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+") +// PolicyManager manages the environment for ILM or ISM policy type PolicyManager struct { client func() es.Client prefixedIndexNameWithSeparator string version uint - isOpenSearch bool } +// Init makes the jaeger ready for automatic rollover by using ILM by creating +// initial rollover indices and read-write aliases func (p *PolicyManager) Init() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,11 +49,13 @@ func (p *PolicyManager) Init() error { return nil } -func NewPolicyManager(cl func() es.Client, version uint, isOpenSearch bool, prefixedIndexNameWithSeparator string) *PolicyManager { +// NewPolicyManager creates the policy manager with appropriate version and prefixedIndexNameWithSeparator. +// prefixedIndexNameWithSeparator is the prefix with separator. For example if index prefix is jaeger-main +// and policy manager is called for span indices, then prefixedIndexNameWithSeparator will be jaeger-main-jaeger-span- +func NewPolicyManager(cl func() es.Client, version uint, prefixedIndexNameWithSeparator string) *PolicyManager { return &PolicyManager{ client: cl, version: version, - isOpenSearch: isOpenSearch, prefixedIndexNameWithSeparator: prefixedIndexNameWithSeparator, } } diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go index 45d0b07fe81..2460710ae73 100644 --- a/plugin/storage/es/samplingstore/storage.go +++ b/plugin/storage/es/samplingstore/storage.go @@ -152,8 +152,8 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, }).Add() } -func (s *SamplingStore) CreatePolicy(version uint, isOpenSearch bool) error { - policyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, s.samplingIndexPrefix) +func (s *SamplingStore) CreatePolicy(version uint) error { + policyManager := ilm.NewPolicyManager(s.client, version, s.samplingIndexPrefix) return policyManager.Init() } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 8b602c528b1..9096297c8fe 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -108,15 +108,15 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, index return nil } -func (s *SpanWriter) CreatePolicy(version uint, isOpenSearch bool) error { +func (s *SpanWriter) CreatePolicy(version uint) error { spanPrefix := s.spanAndServicePrefixFn(spanIndexBaseName) - spanPolicyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, spanPrefix) + spanPolicyManager := ilm.NewPolicyManager(s.client, version, spanPrefix) err := spanPolicyManager.Init() if err != nil { return err } servicePrefix := s.spanAndServicePrefixFn(serviceIndexBaseName) - servicePolicyManager := ilm.NewPolicyManager(s.client, version, isOpenSearch, servicePrefix) + servicePolicyManager := ilm.NewPolicyManager(s.client, version, servicePrefix) return servicePolicyManager.Init() } From 2a4163bde46fba0dba822120961daadd8a450bbb Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 13:37:18 +0530 Subject: [PATCH 04/11] cleanup Signed-off-by: Manik2708 --- pkg/es/client.go | 4 ++++ plugin/storage/es/dependencystore/storage.go | 4 ++-- plugin/storage/es/factory.go | 6 +++--- plugin/storage/es/ilm/policymanager.go | 7 +------ plugin/storage/es/samplingstore/storage.go | 4 ++-- plugin/storage/es/spanstore/writer.go | 6 +++--- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/es/client.go b/pkg/es/client.go index a13accf28c2..2d8e4360333 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -78,23 +78,27 @@ type MultiSearchService interface { Do(ctx context.Context) (*elastic.MultiSearchResult, error) } +// AliasAddAction is an abstraction for elastic.AliasAddAction type AliasAddAction interface { Index(index ...string) AliasAddAction IsWriteIndex(flag bool) AliasAddAction Do(ctx context.Context) (*elastic.AliasResult, error) } +// AliasRemoveAction is an abstraction for elastic.AliasRemoveAction type AliasRemoveAction interface { Index(index ...string) AliasRemoveAction Do(ctx context.Context) (*elastic.AliasResult, error) } +// XPackIlmPutLifecycle is an abstraction for elastic.XPackIlmPutLifecycle type XPackIlmPutLifecycle interface { BodyString(body string) XPackIlmPutLifecycle Policy(policy string) XPackIlmPutLifecycle Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error) } +// IndicesGetService is an abstraction for elastic.IndicesGetService type IndicesGetService interface { Index(indices ...string) IndicesGetService Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error) diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 2896dfdfa6e..49673a85a5b 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -137,7 +137,7 @@ func (s *DependencyStore) getWriteIndex(ts time.Time) string { return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) } -func (s *DependencyStore) CreatePolicy(version uint) error { - policyManager := ilm.NewPolicyManager(s.client, version, s.dependencyIndexPrefix) +func (s *DependencyStore) CreatePolicy() error { + policyManager := ilm.NewPolicyManager(s.client, s.dependencyIndexPrefix) return policyManager.Init() } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 8a96474d57b..dbf52f1135c 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -167,7 +167,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if f.primaryConfig.UseILM { err = ilm.CreatePolicyIfNotExists(primaryClient, f.primaryConfig.IsOpenSearch, f.primaryConfig.Version) if err != nil { - return fmt.Errorf("failed to create ILM policy: %w", err) + return fmt.Errorf("failed to create index management policy: %w", err) } } @@ -321,7 +321,7 @@ func createSpanWriter( } } if cfg.UseILM { - err := writer.CreatePolicy(cfg.Version) + err := writer.InitializePolicyManager() if err != nil { return nil, err } @@ -352,7 +352,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store } } if f.primaryConfig.UseILM { - err := store.CreatePolicy(f.primaryConfig.Version) + err := store.InitializePolicyManager() if err != nil { return nil, err } diff --git a/plugin/storage/es/ilm/policymanager.go b/plugin/storage/es/ilm/policymanager.go index 21dd8a463a3..d6f9bdef5e8 100644 --- a/plugin/storage/es/ilm/policymanager.go +++ b/plugin/storage/es/ilm/policymanager.go @@ -31,7 +31,6 @@ var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+") type PolicyManager struct { client func() es.Client prefixedIndexNameWithSeparator string - version uint } // Init makes the jaeger ready for automatic rollover by using ILM by creating @@ -39,9 +38,6 @@ type PolicyManager struct { func (p *PolicyManager) Init() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if p.version < ilmVersionSupport { - return ErrIlmNotSupported - } err := p.init(ctx) if err != nil { return err @@ -52,10 +48,9 @@ func (p *PolicyManager) Init() error { // NewPolicyManager creates the policy manager with appropriate version and prefixedIndexNameWithSeparator. // prefixedIndexNameWithSeparator is the prefix with separator. For example if index prefix is jaeger-main // and policy manager is called for span indices, then prefixedIndexNameWithSeparator will be jaeger-main-jaeger-span- -func NewPolicyManager(cl func() es.Client, version uint, prefixedIndexNameWithSeparator string) *PolicyManager { +func NewPolicyManager(cl func() es.Client, prefixedIndexNameWithSeparator string) *PolicyManager { return &PolicyManager{ client: cl, - version: version, prefixedIndexNameWithSeparator: prefixedIndexNameWithSeparator, } } diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go index 2460710ae73..9a3bb7d6e52 100644 --- a/plugin/storage/es/samplingstore/storage.go +++ b/plugin/storage/es/samplingstore/storage.go @@ -152,8 +152,8 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, }).Add() } -func (s *SamplingStore) CreatePolicy(version uint) error { - policyManager := ilm.NewPolicyManager(s.client, version, s.samplingIndexPrefix) +func (s *SamplingStore) InitializePolicyManager() error { + policyManager := ilm.NewPolicyManager(s.client, s.samplingIndexPrefix) return policyManager.Init() } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 9096297c8fe..a049d01282c 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -108,15 +108,15 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, index return nil } -func (s *SpanWriter) CreatePolicy(version uint) error { +func (s *SpanWriter) InitializePolicyManager() error { spanPrefix := s.spanAndServicePrefixFn(spanIndexBaseName) - spanPolicyManager := ilm.NewPolicyManager(s.client, version, spanPrefix) + spanPolicyManager := ilm.NewPolicyManager(s.client, spanPrefix) err := spanPolicyManager.Init() if err != nil { return err } servicePrefix := s.spanAndServicePrefixFn(serviceIndexBaseName) - servicePolicyManager := ilm.NewPolicyManager(s.client, version, servicePrefix) + servicePolicyManager := ilm.NewPolicyManager(s.client, servicePrefix) return servicePolicyManager.Init() } From b17d345795221947b32c286d7a5df3358ac80c3c Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 13:43:09 +0530 Subject: [PATCH 05/11] conflicts Signed-off-by: Manik2708 --- storage_v2/depstore/mocks/Writer.go | 52 ----------------------------- 1 file changed, 52 deletions(-) delete mode 100644 storage_v2/depstore/mocks/Writer.go diff --git a/storage_v2/depstore/mocks/Writer.go b/storage_v2/depstore/mocks/Writer.go deleted file mode 100644 index 0c3680c4059..00000000000 --- a/storage_v2/depstore/mocks/Writer.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 -// -// Run 'make generate-mocks' to regenerate. - -// Code generated by mockery. DO NOT EDIT. - -package mocks - -import ( - model "github.com/jaegertracing/jaeger/model" - mock "github.com/stretchr/testify/mock" - - time "time" -) - -// Writer is an autogenerated mock type for the Writer type -type Writer struct { - mock.Mock -} - -// WriteDependencies provides a mock function with given fields: ts, dependencies -func (_m *Writer) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - ret := _m.Called(ts, dependencies) - - if len(ret) == 0 { - panic("no return value specified for WriteDependencies") - } - - var r0 error - if rf, ok := ret.Get(0).(func(time.Time, []model.DependencyLink) error); ok { - r0 = rf(ts, dependencies) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewWriter(t interface { - mock.TestingT - Cleanup(func()) -}) *Writer { - mock := &Writer{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} From bd19543f59bd65d964d0a60699c019e48e54c47f Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 14:09:04 +0530 Subject: [PATCH 06/11] initial unit test Signed-off-by: Manik2708 --- plugin/storage/es/ilm/createpolicy_test.go | 34 ++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 plugin/storage/es/ilm/createpolicy_test.go diff --git a/plugin/storage/es/ilm/createpolicy_test.go b/plugin/storage/es/ilm/createpolicy_test.go new file mode 100644 index 00000000000..4a1ee6b2a7d --- /dev/null +++ b/plugin/storage/es/ilm/createpolicy_test.go @@ -0,0 +1,34 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package ilm + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/mocks" +) + +func mockTestingClient(mockFxn func(cl *mocks.Client)) es.Client { + c := &mocks.Client{} + mockFxn(c) + return c +} + +func mockIlmPolicyExists(exists bool) es.Client { + return mockTestingClient(func(cl *mocks.Client) { + cl.On("IlmPolicyExists", mock.Anything, DefaultIlmPolicy).Return(exists, nil) + }) +} + +func TestCreatePolicyIfNotExists(t *testing.T) { + t.Run("IlmPolicyExists", func(t *testing.T) { + cl := mockIlmPolicyExists(true) + err := CreatePolicyIfNotExists(cl, false, 7) + require.NoError(t, err) + }) +} From 2da40c97de8d142b3f0fd9ddc59df1973b967913 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sat, 25 Jan 2025 14:15:17 +0530 Subject: [PATCH 07/11] goleak Signed-off-by: Manik2708 --- plugin/storage/es/dependencystore/storage.go | 2 +- plugin/storage/es/ilm/package_test.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 plugin/storage/es/ilm/package_test.go diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index efcb5897f18..8b790fb30f2 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -137,7 +137,7 @@ func (s *DependencyStore) getWriteIndex(ts time.Time) string { return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) } -func (s *DependencyStore) CreatePolicy() error { +func (s *DependencyStore) InitializePolicyManager() error { policyManager := ilm.NewPolicyManager(s.client, s.dependencyIndexPrefix) return policyManager.Init() } diff --git a/plugin/storage/es/ilm/package_test.go b/plugin/storage/es/ilm/package_test.go new file mode 100644 index 00000000000..82e0474be6d --- /dev/null +++ b/plugin/storage/es/ilm/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package ilm + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} From fc1a96495db7d11c7d0fe591c3b2985020a901ca Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sun, 26 Jan 2025 08:59:01 +0530 Subject: [PATCH 08/11] conflicts3 Signed-off-by: Manik2708 --- plugin/storage/es/factory.go | 6 ++--- plugin/storage/es/factory_test.go | 41 ------------------------------- 2 files changed, 3 insertions(+), 44 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index fcae156ac46..0a7112ce1ac 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -157,8 +157,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.config.UseReadWriteAliases = true } - if f.primaryConfig.UseILM { - err = ilm.CreatePolicyIfNotExists(primaryClient, f.primaryConfig.IsOpenSearch, f.primaryConfig.Version) + if f.config.UseILM { + err = ilm.CreatePolicyIfNotExists(client, f.config.IsOpenSearch, f.config.Version) if err != nil { return fmt.Errorf("failed to create index management policy: %w", err) } @@ -295,7 +295,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store return nil, fmt.Errorf("failed to create template: %w", err) } } - if f.primaryConfig.UseILM { + if f.config.UseILM { err := store.InitializePolicyManager() if err != nil { return nil, err diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 74de9cbaa23..becf661d95d 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -232,47 +232,6 @@ func TestCreateTemplateError(t *testing.T) { require.Error(t, err, "template-error") } -func TestArchiveDisabled(t *testing.T) { - f := NewFactory() - f.archiveConfig = &escfg.Configuration{Enabled: false} - f.newClientFn = (&mockClientBuilder{}).NewClient - w, err := f.CreateArchiveSpanWriter() - assert.Nil(t, w) - require.NoError(t, err) - r, err := f.CreateArchiveSpanReader() - assert.Nil(t, r) - require.NoError(t, err) -} - -func TestArchiveEnabled(t *testing.T) { - tests := []struct { - useReadWriteAliases bool - }{ - { - useReadWriteAliases: false, - }, - { - useReadWriteAliases: true, - }, - } - for _, test := range tests { - t.Run(fmt.Sprintf("useReadWriteAliases=%v", test.useReadWriteAliases), func(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: test.useReadWriteAliases} - f.newClientFn = (&mockClientBuilder{}).NewClient - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - defer f.Close() // Ensure resources are cleaned up if initialization is successful - w, err := f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.NotNil(t, w) - r, err := f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.NotNil(t, r) - }) - } - func TestConfigureFromOptions(t *testing.T) { f := NewFactory() o := &Options{ From c36e38d25f45f840ef5c7b059616e221549c0a3d Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sun, 26 Jan 2025 09:09:36 +0530 Subject: [PATCH 09/11] tests fix Signed-off-by: Manik2708 --- plugin/storage/es/factory_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index becf661d95d..53501f7a6a1 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -144,7 +144,8 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { f := NewFactory() f.config = &escfg.Configuration{ - UseILM: true, + UseILM: true, + Version: 7, } f.newClientFn = (&mockClientBuilder{}).NewClient require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) From bed65019f25eac3a2cc8bbd4ce9d7cf208e051b8 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sun, 26 Jan 2025 16:59:56 +0530 Subject: [PATCH 10/11] archive storage distinction Signed-off-by: Manik2708 --- plugin/storage/es/spanstore/writer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 3579aca1252..91d4d0fb490 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -7,7 +7,6 @@ package spanstore import ( "context" "fmt" - "strings" "time" "go.uber.org/zap" @@ -124,9 +123,6 @@ type spanAndServicePrefixFn func(baseIndexName string) string func (p SpanWriterParams) getSpanAndServicePrefixFn(baseIndexName string) string { prefix := p.IndexPrefix.Apply(baseIndexName) - if strings.Contains(p.WriteAliasSuffix, "archive") { - prefix += "archive" + cfg.IndexPrefixSeparator - } return prefix } From 228c68e974c2aab571f7393c3f8c1294b577b762 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Sun, 26 Jan 2025 17:01:28 +0530 Subject: [PATCH 11/11] archive storage distinction Signed-off-by: Manik2708 --- plugin/storage/es/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 0a7112ce1ac..bad4a66b296 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -253,7 +253,7 @@ func createSpanWriter( ServiceCacheTTL: cfg.ServiceCacheTTL, }) - // Creating a template here no matter what is ilm. Creating template before creating policy will not give any error + // Creating a template here no matter what is ilm. ILM configured mapping has already been injected in mappingBuilderFromConfig if cfg.CreateIndexTemplates { mappingBuilder := mappingBuilderFromConfig(cfg) spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()