Skip to content

Commit d8f19d8

Browse files
[AGNTLOG-174 ] Add telemetry for log compression kind (#36612)
Co-authored-by: DeForest Richards <[email protected]>
1 parent f6a4d4f commit d8f19d8

File tree

6 files changed

+72
-5
lines changed

6 files changed

+72
-5
lines changed

comp/core/agenttelemetry/impl/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ var defaultProfiles = `
213213
- name: logs.decoded
214214
- name: logs.dropped
215215
- name: logs.encoded_bytes_sent
216+
aggregate_tags:
217+
- compression_kind
216218
- name: logs.sender_latency
217219
- name: logs.auto_multi_line_aggregator_flush
218220
aggregate_tags:

pkg/logs/client/http/destination.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
303303
}
304304
metrics.BytesSent.Add(int64(payload.UnencodedSize))
305305
var sourceTag string
306+
compressionKind := "none"
307+
308+
if d.endpoint.UseCompression {
309+
compressionKind = d.endpoint.CompressionKind
310+
}
306311

307312
if strings.Contains(d.Metadata().TelemetryName(), "logs") {
308313
sourceTag = "logs"
@@ -312,7 +317,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
312317

313318
metrics.TlmBytesSent.Add(float64(payload.UnencodedSize), sourceTag)
314319
metrics.EncodedBytesSent.Add(int64(len(payload.Encoded)))
315-
metrics.TlmEncodedBytesSent.Add(float64(len(payload.Encoded)), sourceTag)
320+
metrics.TlmEncodedBytesSent.Add(float64(len(payload.Encoded)), sourceTag, compressionKind)
316321

317322
req, err := http.NewRequest("POST", d.url, bytes.NewReader(payload.Encoded))
318323
if err != nil {

pkg/logs/client/http/destination_test.go

+56-2
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ func TestDestinationSourceTagBasedOnTelemetryName(t *testing.T) {
573573
// Create telemetry mock
574574
telemetryMock := fxutil.Test[telemetry.Component](t, telemetryimpl.MockModule())
575575
metrics.TlmBytesSent = telemetryMock.NewCounter("logs", "bytes_sent", []string{"source"}, "")
576-
metrics.TlmEncodedBytesSent = telemetryMock.NewCounter("logs", "encoded_bytes_sent", []string{"source"}, "")
576+
metrics.TlmEncodedBytesSent = telemetryMock.NewCounter("logs", "encoded_bytes_sent", []string{"source", "compression_kind"}, "")
577577

578578
// Create a new server
579579
server := NewTestServer(200, cfg)
@@ -610,7 +610,7 @@ func TestDestinationSourceTagEPForwarder(t *testing.T) {
610610
// Create telemetry mock
611611
telemetryMock := fxutil.Test[telemetry.Component](t, telemetryimpl.MockModule())
612612
metrics.TlmBytesSent = telemetryMock.NewCounter("logs", "bytes_sent", []string{"source"}, "")
613-
metrics.TlmEncodedBytesSent = telemetryMock.NewCounter("logs", "encoded_bytes_sent", []string{"source"}, "")
613+
metrics.TlmEncodedBytesSent = telemetryMock.NewCounter("logs", "encoded_bytes_sent", []string{"source", "compression_kind"}, "")
614614

615615
// Create a new server
616616
server := NewTestServer(200, cfg)
@@ -638,3 +638,57 @@ func TestDestinationSourceTagEPForwarder(t *testing.T) {
638638
assert.Len(t, metric, 1)
639639
assert.Equal(t, "epforwarder", metric[0].Tags()["source"])
640640
}
641+
642+
// TestDestinationCompression tests what the compression kind is set when compression is used
643+
func TestDestinationCompression(t *testing.T) {
644+
cfg := configmock.New(t)
645+
646+
// Create telemetry mock
647+
telemetryMock := fxutil.Test[telemetry.Component](t, telemetryimpl.MockModule())
648+
metrics.TlmEncodedBytesSent = telemetryMock.NewCounter("logs", "encoded_bytes_sent", []string{"source", "compression_kind"}, "")
649+
650+
// Create a new server with compression enabled
651+
server := NewTestServer(200, cfg)
652+
defer server.httpServer.Close()
653+
654+
// Enable compression and set zstdcompression kind
655+
server.Destination.endpoint.UseCompression = true
656+
server.Destination.endpoint.CompressionKind = "zstd"
657+
658+
// Test case 1: Telemetry uses zstd compression
659+
server.Destination.destMeta = client.NewDestinationMetadata("dbm", "1", "reliable", "0")
660+
payload := &message.Payload{
661+
Encoded: []byte("payload"),
662+
UnencodedSize: 7, // len("payload")
663+
}
664+
665+
err := server.Destination.unconditionalSend(payload)
666+
assert.Nil(t, err)
667+
assert.Equal(t, "dbm_1_reliable_0", server.Destination.destMeta.TelemetryName())
668+
669+
// Verify the compression tag is set correctly
670+
metric, err := telemetryMock.(telemetry.Mock).GetCountMetric("logs", "encoded_bytes_sent")
671+
assert.NoError(t, err)
672+
assert.Len(t, metric, 1)
673+
assert.Equal(t, "zstd", metric[0].Tags()["compression_kind"])
674+
675+
// Test case 2: Telemetry uses gzip compression
676+
// Enable compression and set gzip compression kind
677+
server.Destination.endpoint.CompressionKind = "gzip"
678+
679+
server.Destination.destMeta = client.NewDestinationMetadata("dbm", "2", "reliable", "0")
680+
payload2 := &message.Payload{
681+
Encoded: []byte("payload"),
682+
UnencodedSize: 7,
683+
}
684+
685+
err = server.Destination.unconditionalSend(payload2)
686+
assert.Nil(t, err)
687+
assert.Equal(t, "dbm_2_reliable_0", server.Destination.destMeta.TelemetryName())
688+
689+
// Verify the compression tag is set correctly
690+
metric, err = telemetryMock.(telemetry.Mock).GetCountMetric("logs", "encoded_bytes_sent")
691+
assert.NoError(t, err)
692+
assert.Len(t, metric, 2)
693+
assert.Equal(t, "gzip", metric[0].Tags()["compression_kind"])
694+
}

pkg/logs/client/tcp/destination.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag
124124

125125
// TCP is only used for logs data, so we always use "logs" as the source tag
126126
sourceTag := "logs"
127+
// Default Compression for TCP is none
128+
compressionKind := "none"
127129

128130
metrics.TlmBytesSent.Add(float64(payload.UnencodedSize), sourceTag)
129131
metrics.EncodedBytesSent.Add(int64(len(payload.Encoded)))
130-
metrics.TlmEncodedBytesSent.Add(float64(len(payload.Encoded)), sourceTag)
132+
metrics.TlmEncodedBytesSent.Add(float64(len(payload.Encoded)), sourceTag, compressionKind)
131133
output <- payload
132134

133135
if d.connManager.ShouldReset(d.connCreationTime) {

pkg/logs/metrics/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var (
5757
EncodedBytesSent = expvar.Int{}
5858
// TlmEncodedBytesSent is the total number of sent bytes after encoding if any
5959
TlmEncodedBytesSent = telemetry.NewCounter("logs", "encoded_bytes_sent",
60-
[]string{"source"}, "Total number of sent bytes after encoding if any")
60+
[]string{"source", "compression_kind"}, "Total number of sent bytes after encoding if any")
6161
// BytesMissed is the number of bytes lost before they could be consumed by the agent, such as after a log rotation
6262
BytesMissed = expvar.Int{}
6363
// TlmBytesMissed is the number of bytes lost before they could be consumed by the agent, such as after log rotation
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
enhancements:
3+
- |
4+
Adds a compression_kind tag to the ``logs.encoded_bytes_sent`` telemetry metric, enabling aggregation and monitoring of log compression type usage during rollout.

0 commit comments

Comments
 (0)