Skip to content

Commit 5e6bce8

Browse files
committed
implement review feedback
1 parent 5eb521e commit 5e6bce8

File tree

6 files changed

+29
-54
lines changed

6 files changed

+29
-54
lines changed

connector/spanmetricsconnector/README.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,13 @@ The following settings can be optionally configured:
114114
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
115115
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
116116
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
117+
- `metric_timestamp_cache_size` (default `1000`): Only relevant for delta temporality span metrics. Controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
117118
- `exemplars`: Use to configure how to attach exemplars to histograms
118119
- `enabled` (default: `false`): enabling will add spans as Exemplars.
119120
- `events`: Use to configure the events metric.
120121
- `enabled`: (default: `false`): enabling will add the events metric.
121122
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
122123
- `resource_metrics_key_attributes`: Filter the resource attributes used to produce the resource metrics key map hash. Use this in case changing resource attributes (e.g. process id) are breaking counter metrics.
123-
- `delta_temporality`: Configuration exclusive to generating delta temporality span metrics
124-
- `metric_timestamp_cache_size` (default `10000`): Size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
125124

126125
## Examples
127126

connector/spanmetricsconnector/config.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var defaultHistogramBucketsMs = []float64{
2323
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
2424
}
2525

26-
var defaultDeltaTimestampCacheSize = 10000
26+
var defaultDeltaTimestampCacheSize = 1000
2727

2828
// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
2929
type Dimension struct {
@@ -73,6 +73,9 @@ type Config struct {
7373
// Default value (0) means that the metrics will never expire.
7474
MetricsExpiration time.Duration `mapstructure:"metrics_expiration"`
7575

76+
// TimestampCacheSize controls the size of the cache used to keep track of delta metrics' TimestampUnixNano the last time it was flushed
77+
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`
78+
7679
// Namespace is the namespace of the metrics emitted by the connector.
7780
Namespace string `mapstructure:"namespace"`
7881

@@ -81,9 +84,6 @@ type Config struct {
8184

8285
// Events defines the configuration for events section of spans.
8386
Events EventsConfig `mapstructure:"events"`
84-
85-
// DeltaTemporalityConfig is configuration that's exclusive to generating delta span metrics
86-
DeltaTemporalityConfig *DeltaTemporalityConfig `mapstructure:"delta_temporality"`
8787
}
8888

8989
type HistogramConfig struct {
@@ -115,8 +115,6 @@ type EventsConfig struct {
115115
}
116116

117117
type DeltaTemporalityConfig struct {
118-
// TimestampCacheSize controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed
119-
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`
120118
}
121119

122120
var _ component.ConfigValidator = (*Config)(nil)
@@ -169,8 +167,8 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
169167
}
170168

171169
func (c Config) GetDeltaTimestampCacheSize() int {
172-
if c.DeltaTemporalityConfig != nil && c.DeltaTemporalityConfig.TimestampCacheSize != nil {
173-
return *c.DeltaTemporalityConfig.TimestampCacheSize
170+
if c.TimestampCacheSize != nil {
171+
return *c.TimestampCacheSize
174172
}
175173
return defaultDeltaTimestampCacheSize
176174
}

connector/spanmetricsconnector/config_test.go

+1-14
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func TestLoadConfig(t *testing.T) {
131131
id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"),
132132
expected: &Config{
133133
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
134-
DeltaTemporalityConfig: &DeltaTemporalityConfig{TimestampCacheSize: &customTimestampCacheSize},
134+
TimestampCacheSize: &customTimestampCacheSize,
135135
DimensionsCacheSize: defaultDimensionsCacheSize,
136136
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
137137
MetricsFlushInterval: 15 * time.Second,
@@ -151,19 +151,6 @@ func TestLoadConfig(t *testing.T) {
151151
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
152152
},
153153
},
154-
{
155-
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size2"),
156-
expected: &Config{
157-
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
158-
DimensionsCacheSize: defaultDimensionsCacheSize,
159-
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
160-
MetricsFlushInterval: 15 * time.Second,
161-
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
162-
},
163-
extraAssertions: func(config *Config) {
164-
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
165-
},
166-
},
167154
{
168155
id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"),
169156
errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",

connector/spanmetricsconnector/connector.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
131131

132132
var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
133133
if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
134-
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, v pcommon.Timestamp) {
134+
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, _ pcommon.Timestamp) {
135135
logger.Info("Evicting cached delta timestamp", zap.String("key", string(k)))
136136
})
137137
if err != nil {

connector/spanmetricsconnector/connector_test.go

+18-21
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func disabledHistogramsConfig() HistogramConfig {
452452
}
453453
}
454454

455-
func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTemporalityConfig *DeltaTemporalityConfig, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) {
455+
func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) {
456456
cfg := &Config{
457457
AggregationTemporality: temporality,
458458
Histogram: histogramConfig(),
@@ -477,9 +477,9 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC
477477
// Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc.
478478
{regionResourceAttrName, nil},
479479
},
480-
Events: eventsConfig(),
481-
MetricsExpiration: expiration,
482-
DeltaTemporalityConfig: deltaTemporalityConfig,
480+
Events: eventsConfig(),
481+
MetricsExpiration: expiration,
482+
TimestampCacheSize: &deltaTimestampCacheSize,
483483
}
484484

485485
mockClock := clock.NewMock(time.Now())
@@ -641,7 +641,7 @@ func TestConcurrentShutdown(t *testing.T) {
641641
core, observedLogs := observer.New(zapcore.InfoLevel)
642642

643643
// Test
644-
p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
644+
p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
645645
require.NoError(t, err)
646646
// Override the default no-op consumer and logger for testing.
647647
p.metricsConsumer = new(consumertest.MetricsSink)
@@ -719,7 +719,7 @@ func TestConsumeMetricsErrors(t *testing.T) {
719719
logger := zap.New(core)
720720

721721
var wg sync.WaitGroup
722-
p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
722+
p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
723723
require.NoError(t, err)
724724
// Override the default no-op consumer and logger for testing.
725725
p.metricsConsumer = &errConsumer{
@@ -884,7 +884,7 @@ func TestConsumeTraces(t *testing.T) {
884884
// Prepare
885885

886886
mcon := &consumertest.MetricsSink{}
887-
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, nil)
887+
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000)
888888
require.NoError(t, err)
889889
// Override the default no-op consumer with metrics sink for testing.
890890
p.metricsConsumer = mcon
@@ -911,7 +911,7 @@ func TestConsumeTraces(t *testing.T) {
911911
}
912912

913913
func TestMetricKeyCache(t *testing.T) {
914-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
914+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
915915
require.NoError(t, err)
916916
traces := buildSampleTrace()
917917

@@ -940,7 +940,7 @@ func TestMetricKeyCache(t *testing.T) {
940940
}
941941

942942
func TestResourceMetricsCache(t *testing.T) {
943-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
943+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
944944
require.NoError(t, err)
945945

946946
// Test
@@ -977,7 +977,7 @@ func TestResourceMetricsCache(t *testing.T) {
977977
}
978978

979979
func TestResourceMetricsExpiration(t *testing.T) {
980-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, nil)
980+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000)
981981
require.NoError(t, err)
982982

983983
// Test
@@ -1002,7 +1002,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {
10021002
"service.name",
10031003
}
10041004

1005-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, nil)
1005+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000)
10061006
require.NoError(t, err)
10071007

10081008
// Test
@@ -1040,7 +1040,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {
10401040

10411041
func BenchmarkConnectorConsumeTraces(b *testing.B) {
10421042
// Prepare
1043-
conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
1043+
conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
10441044
require.NoError(b, err)
10451045

10461046
traces := buildSampleTrace()
@@ -1054,7 +1054,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {
10541054

10551055
func TestExcludeDimensionsConsumeTraces(t *testing.T) {
10561056
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
1057-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil, excludeDimensions...)
1057+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, excludeDimensions...)
10581058
require.NoError(t, err)
10591059
traces := buildSampleTrace()
10601060

@@ -1184,7 +1184,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
11841184
wg.Add(len(wantDataPointCounts))
11851185

11861186
// Note: default dimension key cache size is 2.
1187-
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, nil)
1187+
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
11881188
require.NoError(t, err)
11891189
// Override the default no-op consumer with metrics sink for testing.
11901190
p.metricsConsumer = mcon
@@ -1269,7 +1269,7 @@ func TestConnectorConsumeTracesExpiredMetrics(t *testing.T) {
12691269
mcon := &consumertest.MetricsSink{}
12701270

12711271
// Creating a connector with a very short metricsTTL to ensure that the metrics are expired.
1272-
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, nil)
1272+
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000)
12731273
require.NoError(t, err)
12741274
// Override the default no-op consumer with metrics sink for testing.
12751275
p.metricsConsumer = mcon
@@ -1521,7 +1521,7 @@ func TestSpanMetrics_Events(t *testing.T) {
15211521
}
15221522
}
15231523
func TestExemplarsForSumMetrics(t *testing.T) {
1524-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, 0, []string{}, nil)
1524+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, 0, []string{}, 1000)
15251525
require.NoError(t, err)
15261526
traces := buildSampleTrace()
15271527

@@ -1579,7 +1579,7 @@ func TestTimestampsForUninterruptedStream(t *testing.T) {
15791579

15801580
for _, tt := range tests {
15811581
t.Run(tt.temporality, func(t *testing.T) {
1582-
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, nil)
1582+
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000)
15831583
require.NoError(t, err)
15841584
p.metricsConsumer = &consumertest.MetricsSink{}
15851585

@@ -1677,10 +1677,7 @@ func verifyAndCollectCommonTimestamps(t *testing.T, m pmetric.Metrics) (start pc
16771677

16781678
func TestDeltaTimestampCacheExpiry(t *testing.T) {
16791679
timestampCacheSize := 1
1680-
deltaConfig := &DeltaTemporalityConfig{
1681-
TimestampCacheSize: &timestampCacheSize,
1682-
}
1683-
p, _, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, deltaConfig)
1680+
p, _, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize)
16841681
require.NoError(t, err)
16851682
p.metricsConsumer = &consumertest.MetricsSink{}
16861683

connector/spanmetricsconnector/testdata/config.yaml

+2-8
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,11 @@ spanmetrics/resource_metrics_key_attributes:
8181

8282
spanmetrics/custom_delta_timestamp_cache_size:
8383
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"
84-
delta_temporality:
85-
metric_timestamp_cache_size: 123
84+
metric_timestamp_cache_size: 123
8685

8786
spanmetrics/invalid_delta_timestamp_cache_size:
8887
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"
89-
delta_temporality:
90-
metric_timestamp_cache_size: 0
88+
metric_timestamp_cache_size: 0
9189

9290
spanmetrics/default_delta_timestamp_cache_size:
9391
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"
94-
95-
spanmetrics/default_delta_timestamp_cache_size2:
96-
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"
97-
delta_temporality:

0 commit comments

Comments
 (0)