Skip to content

Commit 27d7b1f

Browse files
authored
feat(telemetrygen): added support for delta temporality (open-telemetry#38146)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds support for configuring metric temporality (delta or cumulative) in the telemetrygen tool. This allows users to generate metrics with different temporality types, which is particularly useful for testing different metric collection scenarios. ##### Changes - Added `temporalityType` flag for metrics pipeline that accepts values: `delta` or `cumulative` <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#38073 <!--Describe what testing was performed and which tests were added.--> #### Testing Testing was performed by setting up as simple collector with `otlp `receiver and `debug` exporter ##### Sum Datapoint ``` Resource SchemaURL: https://opentelemetry.io/schemas/1.13.0 Resource attributes: -> service.name: Str(telemetrygen) ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope Metric #0 Descriptor: -> Name: gen -> Description: -> Unit: -> DataType: Histogram -> AggregationTemporality: Delta ``` ##### Histogram Datapoint ``` Count: 0 Sum: 3940.000000 ExplicitBounds #0: 0.000000 ExplicitBounds #1: 5.000000 ExplicitBounds #2: 10.000000 ExplicitBounds #3: 25.000000 ExplicitBounds #4: 50.000000 ExplicitBounds #5: 75.000000 ExplicitBounds #6: 100.000000 ExplicitBounds #7: 250.000000 ExplicitBounds #8: 500.000000 ExplicitBounds #9: 750.000000 ExplicitBounds open-telemetry#10: 1000.000000 ExplicitBounds open-telemetry#11: 2500.000000 ExplicitBounds open-telemetry#12: 5000.000000 ExplicitBounds open-telemetry#13: 7500.000000 ExplicitBounds open-telemetry#14: 10000.000000 Buckets #0, Count: 0 Buckets #1, Count: 0 Buckets #2, Count: 1 Buckets #3, Count: 0 Buckets #4, Count: 0 Buckets #5, Count: 0 Buckets #6, Count: 3 Buckets #7, Count: 4 Buckets #8, Count: 1 Buckets #9, Count: 1 Buckets open-telemetry#10, Count: 0 Buckets open-telemetry#11, Count: 0 Buckets open-telemetry#12, Count: 0 Buckets open-telemetry#13, Count: 0 Buckets open-telemetry#14, Count: 0 ``` - [x] Need to add tests for the config --------- Signed-off-by: Nikos Angelopoulos <[email protected]>
1 parent 9ab7c66 commit 27d7b1f

File tree

6 files changed

+181
-29
lines changed

6 files changed

+181
-29
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: telemetrygen
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add support for `aggregation-temporality` flag in telemetrygen. Supported values (delta or cumulative)"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38073]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package metrics
2+
3+
import (
4+
"fmt"
5+
6+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
7+
)
8+
9+
type AggregationTemporality metricdata.Temporality
10+
11+
func (t *AggregationTemporality) Set(v string) error {
12+
switch v {
13+
case "delta":
14+
*t = AggregationTemporality(metricdata.DeltaTemporality)
15+
return nil
16+
case "cumulative":
17+
*t = AggregationTemporality(metricdata.CumulativeTemporality)
18+
return nil
19+
default:
20+
return fmt.Errorf(`temporality must be one of "delta" or "cumulative"`)
21+
}
22+
}
23+
24+
func (t *AggregationTemporality) String() string {
25+
return string(metricdata.Temporality(*t))
26+
}
27+
28+
func (t *AggregationTemporality) Type() string {
29+
return "temporality"
30+
}
31+
32+
// AsTemporality converts the AggregationTemporality to metricdata.Temporality
33+
func (t AggregationTemporality) AsTemporality() metricdata.Temporality {
34+
return metricdata.Temporality(t)
35+
}

cmd/telemetrygen/pkg/metrics/config.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,20 @@ import (
77
"fmt"
88

99
"github.com/spf13/pflag"
10+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
1213
)
1314

1415
// Config describes the test scenario.
1516
type Config struct {
1617
common.Config
17-
NumMetrics int
18-
MetricName string
19-
MetricType MetricType
20-
SpanID string
21-
TraceID string
18+
NumMetrics int
19+
MetricName string
20+
MetricType MetricType
21+
AggregationTemporality AggregationTemporality
22+
SpanID string
23+
TraceID string
2224
}
2325

2426
// NewConfig creates a new Config with default values.
@@ -34,11 +36,13 @@ func (c *Config) Flags(fs *pflag.FlagSet) {
3436

3537
fs.StringVar(&c.HTTPPath, "otlp-http-url-path", c.HTTPPath, "Which URL path to write to")
3638

37-
fs.Var(&c.MetricType, "metric-type", "Metric type enum. must be one of 'Gauge' or 'Sum'")
3839
fs.IntVar(&c.NumMetrics, "metrics", c.NumMetrics, "Number of metrics to generate in each worker (ignored if duration is provided)")
3940

4041
fs.StringVar(&c.TraceID, "trace-id", c.TraceID, "TraceID to use as exemplar")
4142
fs.StringVar(&c.SpanID, "span-id", c.SpanID, "SpanID to use as exemplar")
43+
44+
fs.Var(&c.MetricType, "metric-type", "Metric type enum. must be one of 'Gauge' or 'Sum'")
45+
fs.Var(&c.AggregationTemporality, "aggregation-temporality", "aggregation-temporality for metrics. Must be one of 'delta' or 'cumulative'")
4246
}
4347

4448
// SetDefaults sets the default values for the configuration
@@ -49,9 +53,12 @@ func (c *Config) SetDefaults() {
4953
c.HTTPPath = "/v1/metrics"
5054
c.NumMetrics = 1
5155

56+
c.MetricName = "gen"
5257
// Use Gauge as default metric type.
5358
c.MetricType = MetricTypeGauge
54-
c.MetricName = "gen"
59+
// Use cumulative temporality as default.
60+
c.AggregationTemporality = AggregationTemporality(metricdata.CumulativeTemporality)
61+
5562
c.TraceID = ""
5663
c.SpanID = ""
5764
}

cmd/telemetrygen/pkg/metrics/metrics.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,17 @@ func run(c *Config, expF exporterFunc, logger *zap.Logger) error {
6666
for i := 0; i < c.WorkerCount; i++ {
6767
wg.Add(1)
6868
w := worker{
69-
numMetrics: c.NumMetrics,
70-
metricName: c.MetricName,
71-
metricType: c.MetricType,
72-
exemplars: exemplarsFromConfig(c),
73-
limitPerSecond: limit,
74-
totalDuration: c.TotalDuration,
75-
running: running,
76-
wg: &wg,
77-
logger: logger.With(zap.Int("worker", i)),
78-
index: i,
69+
numMetrics: c.NumMetrics,
70+
metricName: c.MetricName,
71+
metricType: c.MetricType,
72+
aggregationTemporality: c.AggregationTemporality,
73+
exemplars: exemplarsFromConfig(c),
74+
limitPerSecond: limit,
75+
totalDuration: c.TotalDuration,
76+
running: running,
77+
wg: &wg,
78+
logger: logger.With(zap.Int("worker", i)),
79+
index: i,
7980
}
8081
exp, err := expF()
8182
if err != nil {

cmd/telemetrygen/pkg/metrics/worker.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@ import (
1818
)
1919

2020
type worker struct {
21-
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
22-
metricName string // name of metric to generate
23-
metricType MetricType // type of metric to generate
24-
exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric
25-
numMetrics int // how many metrics the worker has to generate (only when duration==0)
26-
totalDuration time.Duration // how long to run the test for (overrides `numMetrics`)
27-
limitPerSecond rate.Limit // how many metrics per second to generate
28-
wg *sync.WaitGroup // notify when done
29-
logger *zap.Logger // logger
30-
index int // worker index
21+
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
22+
metricName string // name of metric to generate
23+
metricType MetricType // type of metric to generate
24+
aggregationTemporality AggregationTemporality // Temporality type to use
25+
exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric
26+
numMetrics int // how many metrics the worker has to generate (only when duration==0)
27+
totalDuration time.Duration // how long to run the test for (overrides `numMetrics`)
28+
limitPerSecond rate.Limit // how many metrics per second to generate
29+
wg *sync.WaitGroup // notify when done
30+
logger *zap.Logger // logger
31+
index int // worker index
3132
}
3233

3334
var histogramBucketSamples = []struct {
@@ -103,7 +104,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Expor
103104
Name: w.metricName,
104105
Data: metricdata.Sum[int64]{
105106
IsMonotonic: true,
106-
Temporality: metricdata.CumulativeTemporality,
107+
Temporality: w.aggregationTemporality.AsTemporality(),
107108
DataPoints: []metricdata.DataPoint[int64]{
108109
{
109110
StartTime: time.Now().Add(-1 * time.Second),
@@ -122,7 +123,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Expor
122123
metrics = append(metrics, metricdata.Metrics{
123124
Name: w.metricName,
124125
Data: metricdata.Histogram[int64]{
125-
Temporality: metricdata.CumulativeTemporality,
126+
Temporality: w.aggregationTemporality.AsTemporality(),
126127
DataPoints: []metricdata.HistogramDataPoint[int64]{
127128
{
128129
StartTime: time.Now().Add(-1 * time.Second),

cmd/telemetrygen/pkg/metrics/worker_test.go

+81
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,21 @@ func (m *mockExporter) Shutdown(_ context.Context) error {
5050
return nil
5151
}
5252

53+
func checkMetricTemporality(t *testing.T, ms metricdata.Metrics, metricType MetricType, expectedAggregationTemporality metricdata.Temporality) {
54+
switch metricType {
55+
case MetricTypeSum:
56+
sumData, ok := ms.Data.(metricdata.Sum[int64])
57+
require.True(t, ok, "expected Sum data type")
58+
assert.Equal(t, expectedAggregationTemporality, sumData.Temporality)
59+
case MetricTypeHistogram:
60+
histogramData, ok := ms.Data.(metricdata.Histogram[int64])
61+
require.True(t, ok, "expected Histogram data type")
62+
assert.Equal(t, expectedAggregationTemporality, histogramData.Temporality)
63+
default:
64+
t.Fatalf("unsupported metric type: %v", metricType)
65+
}
66+
}
67+
5368
func TestFixedNumberOfMetrics(t *testing.T) {
5469
// arrange
5570
cfg := &Config{
@@ -98,6 +113,72 @@ func TestRateOfMetrics(t *testing.T) {
98113
assert.LessOrEqual(t, len(m.rms), 20, "there should have been less than 20 metrics, had %d", len(m.rms))
99114
}
100115

116+
func TestMetricsWithTemporality(t *testing.T) {
117+
tests := []struct {
118+
name string
119+
metricType MetricType
120+
aggregationTemporality AggregationTemporality
121+
expectedAggregationTemporality metricdata.Temporality
122+
}{
123+
{
124+
name: "Sum: delta temporality",
125+
metricType: MetricTypeSum,
126+
aggregationTemporality: AggregationTemporality(metricdata.DeltaTemporality),
127+
expectedAggregationTemporality: metricdata.DeltaTemporality,
128+
},
129+
{
130+
name: "Sum: cumulative temporality",
131+
metricType: MetricTypeSum,
132+
aggregationTemporality: AggregationTemporality(metricdata.CumulativeTemporality),
133+
expectedAggregationTemporality: metricdata.CumulativeTemporality,
134+
},
135+
{
136+
name: "Histogram: delta temporality",
137+
metricType: MetricTypeHistogram,
138+
aggregationTemporality: AggregationTemporality(metricdata.DeltaTemporality),
139+
expectedAggregationTemporality: metricdata.DeltaTemporality,
140+
},
141+
{
142+
name: "Histogram: cumulative temporality",
143+
metricType: MetricTypeHistogram,
144+
aggregationTemporality: AggregationTemporality(metricdata.CumulativeTemporality),
145+
expectedAggregationTemporality: metricdata.CumulativeTemporality,
146+
},
147+
}
148+
149+
for _, tt := range tests {
150+
t.Run(tt.name, func(t *testing.T) {
151+
// arrange
152+
cfg := &Config{
153+
Config: common.Config{
154+
WorkerCount: 1,
155+
},
156+
NumMetrics: 1,
157+
MetricName: "test",
158+
MetricType: tt.metricType,
159+
AggregationTemporality: tt.aggregationTemporality,
160+
}
161+
m := &mockExporter{}
162+
expFunc := func() (sdkmetric.Exporter, error) {
163+
return m, nil
164+
}
165+
166+
// act
167+
logger, _ := zap.NewDevelopment()
168+
require.NoError(t, run(cfg, expFunc, logger))
169+
170+
time.Sleep(1 * time.Second)
171+
172+
// assert
173+
require.Len(t, m.rms, 1)
174+
ms := m.rms[0].ScopeMetrics[0].Metrics[0]
175+
assert.Equal(t, "test", ms.Name)
176+
177+
checkMetricTemporality(t, ms, tt.metricType, tt.expectedAggregationTemporality)
178+
})
179+
}
180+
}
181+
101182
func TestUnthrottled(t *testing.T) {
102183
// arrange
103184
cfg := &Config{

0 commit comments

Comments
 (0)