Skip to content

feat(inputs.chrony): Add probing #16861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/includes/probing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This plugin supports probing through the following configuration:

```toml
startup_error_behavior = "probe"
```
8 changes: 8 additions & 0 deletions plugins/inputs/chrony/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Probing <!-- @/docs/includes/probing.md -->

This plugin supports probing through the following configuration:

```toml
startup_error_behavior = "probe"
```

## Configuration

```toml @sample.conf
Expand Down
9 changes: 9 additions & 0 deletions plugins/inputs/chrony/chrony.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
return nil
}

func (c *Chrony) Probe() error {
sourcesReq := fbchrony.NewSourcesPacket()
_, err := c.client.Communicate(sourcesReq)
if err != nil {
return fmt.Errorf("querying sources failed: %w", err)
}
Comment on lines +150 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, err := c.client.Communicate(sourcesReq)
if err != nil {
return fmt.Errorf("querying sources failed: %w", err)
}
if _, err := c.client.Communicate(sourcesReq); err != nil {
return fmt.Errorf("querying sources failed: %w", err)
}

return nil
}

func (c *Chrony) Gather(acc telegraf.Accumulator) error {
for _, m := range c.Metrics {
switch m {
Expand Down
124 changes: 124 additions & 0 deletions plugins/inputs/chrony/chrony_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,63 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
)

func TestProbe(t *testing.T) {
for _, tt := range []struct {
name string
shutdownServer bool
expectError bool
}{
{
name: "probe success",
shutdownServer: false,
expectError: false,
},
{
name: "probe error",
shutdownServer: true,
expectError: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
server := Server{}
addr, err := server.Listen(t)
require.NoError(t, err)
defer server.Shutdown()

// Setup the plugin
plugin := &Chrony{
Server: "udp://" + addr,
Metrics: []string{"activity"},
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())

model := models.NewRunningInput(plugin, &models.InputConfig{
Name: "chrony",
StartupErrorBehavior: "probe",
})

var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))
defer plugin.Stop()

if tt.shutdownServer {
server.Shutdown()
}
err = model.Probe()
if tt.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func TestGatherActivity(t *testing.T) {
// Setup a mock server
server := Server{
Expand Down Expand Up @@ -78,6 +132,76 @@ func TestGatherActivity(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, actual, options...)
}

func TestProbeFailure(t *testing.T) {
// We start the server to make sure that the initial dial succeeds so that
// Start() does not fail. A failure of `Start()` when `startup_error_behavior=probe`
// is specified would be treated as an ignore anyway, but that's not what
// we're testing here.
server := Server{}
addr, err := server.Listen(t)
require.NoError(t, err)
defer server.Shutdown()

// Setup the plugin
plugin := &Chrony{
Server: "udp://" + addr,
Metrics: []string{"tracking"},
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Shutdown the server
server.Shutdown()

// Perform the actual test. Probing should fail here.
require.Error(t, plugin.Probe())
}

func TestProbeSuccess(t *testing.T) {
// Setup a mock server
server := Server{
SourcesInfo: []source{
{
name: "ntp1.my.org",
data: &fbchrony.SourceData{
IPAddr: net.IPv4(192, 168, 0, 1),
Poll: 64,
Stratum: 16,
State: fbchrony.SourceStateSync,
Mode: fbchrony.SourceModePeer,
Flags: 0,
Reachability: 0,
SinceSample: 0,
OrigLatestMeas: 1.22354,
LatestMeas: 1.22354,
LatestMeasErr: 0.00423,
},
},
},
}
addr, err := server.Listen(t)
require.NoError(t, err)
defer server.Shutdown()

// Setup the plugin
plugin := &Chrony{
Server: "udp://" + addr,
Metrics: []string{"tracking"},
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

require.NoError(t, plugin.Probe())
}

func TestGatherTracking(t *testing.T) {
// Setup a mock server
server := Server{
Expand Down
Loading