Skip to content

Commit c208ea2

Browse files
Add statsdreceiver Unixgram Support (#36608)
#### Description Adds `unixgram` transport for the `statsdreceiver`. Additionally, creates a new `packetServer` base class for both `UDS` and `UDP*` transport types #### Link to tracking issue #21385 #### Testing Added a unit test --------- Co-authored-by: Christos Markou <[email protected]>
1 parent 23306ea commit c208ea2

File tree

9 files changed

+204
-64
lines changed

9 files changed

+204
-64
lines changed

.chloggen/statsdreceiver-uds.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: statsdreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add UDS support to statsdreceiver
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [21385]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/statsdreceiver/README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ Use case: it does not support horizontal pool of collectors. Desired work case i
1919

2020
## Configuration
2121

22-
The following settings are required:
23-
24-
- `endpoint` (default = `localhost:8125`): Address and port to listen on.
22+
The Following settings are optional:
2523

24+
- `endpoint`: Address and port to listen on.
25+
- For `udp` and `tcp` based `transport`, this config will default to `localhost:8125`
26+
- For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock`
2627

27-
The Following settings are optional:
28+
- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/internal/transport/transport.go).
2829

2930
- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)
3031

receiver/statsdreceiver/internal/transport/client/client.go

+9
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ func (s *StatsD) connect() error {
5151
if err != nil {
5252
return err
5353
}
54+
case "unixgram":
55+
unixAddr, err := net.ResolveUnixAddr(s.transport, s.address)
56+
if err != nil {
57+
return err
58+
}
59+
s.conn, err = net.DialUnix(s.transport, nil, unixAddr)
60+
if err != nil {
61+
return err
62+
}
5463
default:
5564
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
5665
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
5+
6+
import (
7+
"errors"
8+
"net"
9+
10+
"go.opentelemetry.io/collector/consumer"
11+
)
12+
13+
type packetServer struct {
14+
packetConn net.PacketConn
15+
transport Transport
16+
}
17+
18+
// ListenAndServe starts the server ready to receive metrics.
19+
func (u *packetServer) ListenAndServe(
20+
nextConsumer consumer.Metrics,
21+
reporter Reporter,
22+
transferChan chan<- Metric,
23+
) error {
24+
if nextConsumer == nil || reporter == nil {
25+
return errNilListenAndServeParameters
26+
}
27+
28+
buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
29+
for {
30+
n, addr, err := u.packetConn.ReadFrom(buf)
31+
if addr == nil && u.transport == UDS {
32+
addr = &udsAddr{
33+
network: u.transport.String(),
34+
address: u.packetConn.LocalAddr().String(),
35+
}
36+
}
37+
38+
if n > 0 {
39+
u.handlePacket(n, buf, addr, transferChan)
40+
}
41+
if err != nil {
42+
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
43+
u.transport,
44+
u.packetConn.LocalAddr(),
45+
err)
46+
var netErr net.Error
47+
if errors.As(err, &netErr) {
48+
if netErr.Timeout() {
49+
continue
50+
}
51+
}
52+
return err
53+
}
54+
}
55+
}
56+
57+
// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
58+
func (u *packetServer) handlePacket(
59+
numBytes int,
60+
data []byte,
61+
addr net.Addr,
62+
transferChan chan<- Metric,
63+
) {
64+
splitPacket := NewSplitBytes(data[:numBytes], '\n')
65+
for splitPacket.Next() {
66+
chunk := splitPacket.Chunk()
67+
if len(chunk) > 0 {
68+
transferChan <- Metric{string(chunk), addr}
69+
}
70+
}
71+
}
72+
73+
type udsAddr struct {
74+
network string
75+
address string
76+
}
77+
78+
func (u *udsAddr) Network() string {
79+
return u.network
80+
}
81+
82+
func (u *udsAddr) String() string {
83+
return u.address
84+
}

receiver/statsdreceiver/internal/transport/transport.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
TCP Transport = "tcp"
2222
TCP4 Transport = "tcp4"
2323
TCP6 Transport = "tcp6"
24+
UDS Transport = "unixgram"
2425
)
2526

2627
// NewTransport creates a Transport based on the transport string or returns an empty Transport.
@@ -31,14 +32,16 @@ func NewTransport(ts string) Transport {
3132
return trans
3233
case TCP, TCP4, TCP6:
3334
return trans
35+
case UDS:
36+
return trans
3437
}
3538
return Transport("")
3639
}
3740

3841
// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
3942
func (trans Transport) String() string {
4043
switch trans {
41-
case UDP, UDP4, UDP6, TCP, TCP4, TCP6:
44+
case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS:
4245
return string(trans)
4346
}
4447
return ""
@@ -47,7 +50,7 @@ func (trans Transport) String() string {
4750
// IsPacketTransport returns true if the transport is packet based.
4851
func (trans Transport) IsPacketTransport() bool {
4952
switch trans {
50-
case UDP, UDP4, UDP6:
53+
case UDP, UDP4, UDP6, UDS:
5154
return true
5255
}
5356
return false

receiver/statsdreceiver/internal/transport/udp_server.go

+5-55
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@
44
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
55

66
import (
7-
"errors"
87
"fmt"
98
"net"
10-
11-
"go.opentelemetry.io/collector/consumer"
129
)
1310

1411
type udpServer struct {
15-
packetConn net.PacketConn
16-
transport Transport
12+
packetServer
1713
}
1814

1915
// Ensure that Server is implemented on UDP Server.
@@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) {
3127
}
3228

3329
return &udpServer{
34-
packetConn: conn,
35-
transport: transport,
30+
packetServer: packetServer{
31+
packetConn: conn,
32+
transport: transport,
33+
},
3634
}, nil
3735
}
3836

39-
// ListenAndServe starts the server ready to receive metrics.
40-
func (u *udpServer) ListenAndServe(
41-
nextConsumer consumer.Metrics,
42-
reporter Reporter,
43-
transferChan chan<- Metric,
44-
) error {
45-
if nextConsumer == nil || reporter == nil {
46-
return errNilListenAndServeParameters
47-
}
48-
49-
buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
50-
for {
51-
n, addr, err := u.packetConn.ReadFrom(buf)
52-
if n > 0 {
53-
u.handlePacket(n, buf, addr, transferChan)
54-
}
55-
if err != nil {
56-
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
57-
u.transport,
58-
u.packetConn.LocalAddr(),
59-
err)
60-
var netErr net.Error
61-
if errors.As(err, &netErr) {
62-
if netErr.Timeout() {
63-
continue
64-
}
65-
}
66-
return err
67-
}
68-
}
69-
}
70-
7137
// Close closes the server.
7238
func (u *udpServer) Close() error {
7339
return u.packetConn.Close()
7440
}
75-
76-
// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
77-
func (u *udpServer) handlePacket(
78-
numBytes int,
79-
data []byte,
80-
addr net.Addr,
81-
transferChan chan<- Metric,
82-
) {
83-
splitPacket := NewSplitBytes(data[:numBytes], '\n')
84-
for splitPacket.Next() {
85-
chunk := splitPacket.Chunk()
86-
if len(chunk) > 0 {
87-
transferChan <- Metric{string(chunk), addr}
88-
}
89-
}
90-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
5+
6+
import (
7+
"fmt"
8+
"net"
9+
"os"
10+
)
11+
12+
type udsServer struct {
13+
packetServer
14+
}
15+
16+
// Ensure that Server is implemented on UDS Server.
17+
var _ (Server) = (*udsServer)(nil)
18+
19+
// NewUDSServer creates a transport.Server using Unixgram as its transport.
20+
func NewUDSServer(transport Transport, socketPath string) (Server, error) {
21+
if !transport.IsPacketTransport() {
22+
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
23+
}
24+
25+
conn, err := net.ListenPacket(transport.String(), socketPath)
26+
if err != nil {
27+
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
28+
}
29+
30+
return &udsServer{
31+
packetServer: packetServer{
32+
packetConn: conn,
33+
transport: transport,
34+
},
35+
}, nil
36+
}
37+
38+
// Close closes the server.
39+
func (u *udsServer) Close() error {
40+
os.Remove(u.packetConn.LocalAddr().String())
41+
return u.packetConn.Close()
42+
}

receiver/statsdreceiver/receiver.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,21 @@ func newReceiver(
4646
config Config,
4747
nextConsumer consumer.Metrics,
4848
) (receiver.Metrics, error) {
49+
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
50+
4951
if config.NetAddr.Endpoint == "" {
50-
config.NetAddr.Endpoint = "localhost:8125"
52+
if trans == transport.UDS {
53+
config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock"
54+
} else {
55+
config.NetAddr.Endpoint = "localhost:8125"
56+
}
5157
}
5258

5359
rep, err := newReporter(set)
5460
if err != nil {
5561
return nil, err
5662
}
5763

58-
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
5964
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
6065
LongLivedCtx: true,
6166
ReceiverID: set.ID,
@@ -80,13 +85,14 @@ func newReceiver(
8085
}
8186

8287
func buildTransportServer(config Config) (transport.Server, error) {
83-
// TODO: Add unix socket transport implementations
8488
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
8589
switch trans {
8690
case transport.UDP, transport.UDP4, transport.UDP6:
8791
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
8892
case transport.TCP, transport.TCP4, transport.TCP6:
8993
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
94+
case transport.UDS:
95+
return transport.NewUDSServer(trans, config.NetAddr.Endpoint)
9096
}
9197

9298
return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))

receiver/statsdreceiver/receiver_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
106106
return c
107107
},
108108
},
109+
{
110+
name: "UDS server with 4s interval",
111+
addr: "/tmp/statsd_test.sock",
112+
configFn: func() *Config {
113+
return &Config{
114+
NetAddr: confignet.AddrConfig{
115+
Endpoint: "/tmp/statsd_test.sock",
116+
Transport: confignet.TransportTypeUnixgram,
117+
},
118+
AggregationInterval: 4 * time.Second,
119+
}
120+
},
121+
clientFn: func(t *testing.T, addr string) *client.StatsD {
122+
c, err := client.NewStatsD("unixgram", addr)
123+
require.NoError(t, err)
124+
return c
125+
},
126+
},
109127
}
110128
for _, tt := range tests {
111129
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)