Skip to content

Commit bf165e2

Browse files
committed
Implement spans exporting for ClickHouse storage in Jaeger V2
Signed-off-by: haanhvu <[email protected]>
1 parent dd4ccea commit bf165e2

File tree

13 files changed

+627
-23
lines changed

13 files changed

+627
-23
lines changed

cmd/jaeger/internal/exporters/storageexporter/exporter.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,18 @@ import (
1414
"go.uber.org/zap"
1515

1616
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
17+
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
1718
"github.com/jaegertracing/jaeger/storage/spanstore"
1819
)
1920

2021
type storageExporter struct {
2122
config *Config
2223
logger *zap.Logger
2324
spanWriter spanstore.Writer
25+
clickhouse bool
26+
// Separate traces exporting function for ClickHouse storage.
27+
// This is temporary until we have v2 storage API.
28+
chExportTraces func(ctx context.Context, td ptrace.Traces) error
2429
}
2530

2631
func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
@@ -36,8 +41,15 @@ func (exp *storageExporter) start(_ context.Context, host component.Host) error
3641
return fmt.Errorf("cannot find storage factory: %w", err)
3742
}
3843

39-
if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
40-
return fmt.Errorf("cannot create span writer: %w", err)
44+
switch t := f.(type) {
45+
case *ch.Factory:
46+
exp.clickhouse = true
47+
exp.chExportTraces = t.ExportSpans
48+
default:
49+
exp.clickhouse = false
50+
if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
51+
return fmt.Errorf("cannot create span writer: %w", err)
52+
}
4153
}
4254

4355
return nil
@@ -49,6 +61,10 @@ func (*storageExporter) close(_ context.Context) error {
4961
}
5062

5163
func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
64+
if exp.clickhouse {
65+
return exp.chExportTraces(ctx, td)
66+
}
67+
5268
batches, err := otlp2jaeger.ProtoFromTraces(td)
5369
if err != nil {
5470
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)

cmd/jaeger/internal/exporters/storageexporter/factory.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func createTracesExporter(ctx context.Context, set exporter.CreateSettings, conf
4141
// Disable Timeout/RetryOnFailure and SendingQueue
4242
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
4343
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
44-
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
44+
// Enable queue settings for Clickhouse only
45+
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
4546
exporterhelper.WithStart(ex.start),
4647
exporterhelper.WithShutdown(ex.close),
4748
)

cmd/jaeger/internal/extension/jaegerstorage/config.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
1212
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
1313
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
14+
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
1415
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
1516
)
1617

@@ -22,16 +23,12 @@ type Config struct {
2223
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
2324
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
2425
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
26+
ClickHouse map[string]clickhouse.Config `mapstructure:"clickhouse"`
2527
// TODO add other storage types here
2628
// TODO how will this work with 3rd party storage implementations?
2729
// Option: instead of looking for specific name, check interface.
2830
}
2931

30-
type MemoryStorage struct {
31-
Name string `mapstructure:"name"`
32-
memoryCfg.Configuration
33-
}
34-
3532
func (cfg *Config) Validate() error {
3633
emptyCfg := createDefaultConfig().(*Config)
3734
if reflect.DeepEqual(*cfg, *emptyCfg) {

cmd/jaeger/internal/extension/jaegerstorage/extension.go

+27-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/jaegertracing/jaeger/pkg/metrics"
1919
"github.com/jaegertracing/jaeger/plugin/storage/badger"
2020
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
21+
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
2122
"github.com/jaegertracing/jaeger/plugin/storage/es"
2223
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
2324
"github.com/jaegertracing/jaeger/plugin/storage/memory"
@@ -71,24 +72,31 @@ func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt
7172
}
7273

7374
type starter[Config any, Factory storage.Factory] struct {
74-
ext *storageExt
75-
storageKind string
76-
cfg map[string]Config
77-
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
75+
ext *storageExt
76+
storageKind string
77+
cfg map[string]Config
78+
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
79+
clickhouseBuilder func(context.Context, Config, *zap.Logger) Factory
7880
}
7981

80-
func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) error {
82+
func (s *starter[Config, Factory]) build(ctx context.Context, _ component.Host) error {
8183
for name, cfg := range s.cfg {
8284
if _, ok := s.ext.factories[name]; ok {
8385
return fmt.Errorf("duplicate %s storage name %s", s.storageKind, name)
8486
}
85-
factory, err := s.builder(
86-
cfg,
87-
metrics.NullFactory,
88-
s.ext.logger.With(zap.String("storage_name", name)),
89-
)
90-
if err != nil {
91-
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
87+
var factory Factory
88+
if s.clickhouseBuilder != nil {
89+
factory = s.clickhouseBuilder(ctx, cfg, s.ext.logger.With(zap.String("storage_name", name)))
90+
} else {
91+
var err error
92+
factory, err = s.builder(
93+
cfg,
94+
metrics.NullFactory,
95+
s.ext.logger.With(zap.String("storage_name", name)),
96+
)
97+
if err != nil {
98+
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
99+
}
92100
}
93101
s.ext.factories[name] = factory
94102
}
@@ -139,6 +147,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
139147
cfg: s.config.Cassandra,
140148
builder: cassandra.NewFactoryWithConfig,
141149
}
150+
clickhouseStarter := &starter[clickhouse.Config, *clickhouse.Factory]{
151+
ext: s,
152+
storageKind: "clickhouse",
153+
cfg: s.config.ClickHouse,
154+
clickhouseBuilder: clickhouse.NewFactory,
155+
}
142156

143157
builders := []func(ctx context.Context, host component.Host) error{
144158
memStarter.build,
@@ -147,6 +161,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
147161
esStarter.build,
148162
osStarter.build,
149163
cassandraStarter.build,
164+
clickhouseStarter.build,
150165
// TODO add support for other backends
151166
}
152167
for _, builder := range builders {

config.yaml

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
service:
2+
extensions: [jaeger_storage, jaeger_query]
3+
pipelines:
4+
traces:
5+
receivers: [otlp, jaeger, zipkin]
6+
processors: [batch]
7+
exporters: [jaeger_storage_exporter]
8+
9+
extensions:
10+
# health_check:
11+
# pprof:
12+
# endpoint: 0.0.0.0:1777
13+
# zpages:
14+
# endpoint: 0.0.0.0:55679
15+
16+
jaeger_query:
17+
trace_storage: ch_store
18+
ui_config: ./cmd/jaeger/config-ui.json
19+
20+
jaeger_storage:
21+
memory:
22+
memstore:
23+
max_traces: 100000
24+
memstore_archive:
25+
max_traces: 100000
26+
clickhouse:
27+
ch_store:
28+
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
29+
spans_table_name: jaeger_spans
30+
31+
receivers:
32+
otlp:
33+
protocols:
34+
grpc:
35+
endpoint: 127.0.0.1:4317
36+
http:
37+
endpoint: 127.0.0.1:4318
38+
39+
jaeger:
40+
protocols:
41+
grpc:
42+
thrift_binary:
43+
thrift_compact:
44+
thrift_http:
45+
46+
zipkin:
47+
48+
processors:
49+
batch:
50+
51+
exporters:
52+
jaeger_storage_exporter:
53+
trace_storage: ch_store

go.mod

+10-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ require (
8888
)
8989

9090
require (
91+
github.com/ClickHouse/ch-go v0.58.2 // indirect
92+
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
9193
github.com/IBM/sarama v1.43.2 // indirect
94+
github.com/andybalholm/brotli v1.0.6 // indirect
9295
github.com/aws/aws-sdk-go v1.53.11 // indirect
9396
github.com/beorn7/perks v1.0.1 // indirect
9497
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
@@ -97,12 +100,14 @@ require (
97100
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
98101
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
99102
github.com/dgraph-io/ristretto v0.1.1 // indirect
100-
github.com/dustin/go-humanize v1.0.0 // indirect
103+
github.com/dustin/go-humanize v1.0.1 // indirect
101104
github.com/eapache/go-resiliency v1.6.0 // indirect
102105
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
103106
github.com/eapache/queue v1.1.0 // indirect
104107
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
105108
github.com/felixge/httpsnoop v1.0.4 // indirect
109+
github.com/go-faster/city v1.0.1 // indirect
110+
github.com/go-faster/errors v0.6.1 // indirect
106111
github.com/go-logr/logr v1.4.1 // indirect
107112
github.com/go-logr/stdr v1.2.2 // indirect
108113
github.com/go-ole/go-ole v1.2.6 // indirect
@@ -156,6 +161,7 @@ require (
156161
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.102.0 // indirect
157162
github.com/opentracing/opentracing-go v1.2.0 // indirect
158163
github.com/openzipkin/zipkin-go v0.4.3 // indirect
164+
github.com/paulmach/orb v0.10.0 // indirect
159165
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
160166
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
161167
github.com/pierrec/lz4/v4 v4.1.21 // indirect
@@ -170,8 +176,10 @@ require (
170176
github.com/russross/blackfriday/v2 v2.1.0 // indirect
171177
github.com/sagikazarmark/locafero v0.4.0 // indirect
172178
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
179+
github.com/segmentio/asm v1.2.0 // indirect
173180
github.com/shirou/gopsutil/v3 v3.24.4 // indirect
174181
github.com/shoenig/go-m1cpu v0.1.6 // indirect
182+
github.com/shopspring/decimal v1.3.1 // indirect
175183
github.com/sourcegraph/conc v0.3.0 // indirect
176184
github.com/spf13/afero v1.11.0 // indirect
177185
github.com/spf13/cast v1.6.0 // indirect
@@ -195,7 +203,7 @@ require (
195203
go.opentelemetry.io/collector/exporter/debugexporter v0.102.1
196204
go.opentelemetry.io/collector/extension/auth v0.102.1 // indirect
197205
go.opentelemetry.io/collector/featuregate v1.9.0 // indirect
198-
go.opentelemetry.io/collector/semconv v0.102.1 // indirect
206+
go.opentelemetry.io/collector/semconv v0.102.1
199207
go.opentelemetry.io/collector/service v0.102.1 // indirect
200208
go.opentelemetry.io/contrib/config v0.7.0 // indirect
201209
go.opentelemetry.io/contrib/propagators/b3 v1.27.0 // indirect

0 commit comments

Comments
 (0)