Skip to content

[chore] extension/observer: move Endpoints{Lister,Watcher} to a new package #38416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions extension/observer/cfgardenobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
)

const (
Expand All @@ -34,7 +35,7 @@ const (
)

type cfGardenObserver struct {
*observer.EndpointsWatcher
*endpointswatcher.EndpointsWatcher
config *Config
doneChan chan struct{}
logger *zap.Logger
Expand All @@ -61,7 +62,7 @@ func newObserver(config *Config, logger *zap.Logger) (extension.Extension, error
apps: make(map[string]*resource.App),
doneChan: make(chan struct{}),
}
g.EndpointsWatcher = observer.NewEndpointsWatcher(g, config.RefreshInterval, logger)
g.EndpointsWatcher = endpointswatcher.New(g, config.RefreshInterval, logger)
return g, nil
}

Expand Down
10 changes: 5 additions & 5 deletions extension/observer/dockerobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
dcommon "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/docker"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
)
Expand All @@ -27,13 +28,12 @@ var (
)

var (
_ extension.Extension = (*dockerObserver)(nil)
_ observer.EndpointsLister = (*dockerObserver)(nil)
_ observer.Observable = (*dockerObserver)(nil)
_ extension.Extension = (*dockerObserver)(nil)
_ observer.Observable = (*dockerObserver)(nil)
)

type dockerObserver struct {
*observer.EndpointsWatcher
*endpointswatcher.EndpointsWatcher
logger *zap.Logger
config *Config
cancel context.CancelFunc
Expand All @@ -49,7 +49,7 @@ func newObserver(logger *zap.Logger, config *Config) (extension.Extension, error
// Safe value provided on initialisation
},
}
d.EndpointsWatcher = observer.NewEndpointsWatcher(d, time.Second, logger)
d.EndpointsWatcher = endpointswatcher.New(d, time.Second, logger)
return d, nil
}

Expand Down
8 changes: 4 additions & 4 deletions extension/observer/ecstaskobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil"
dcommon "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/docker"
)

const runningStatus = "RUNNING"

var (
_ extension.Extension = (*ecsTaskObserver)(nil)
_ observer.EndpointsLister = (*ecsTaskObserver)(nil)
_ observer.Observable = (*ecsTaskObserver)(nil)
_ extension.Extension = (*ecsTaskObserver)(nil)
_ observer.Observable = (*ecsTaskObserver)(nil)
)

type ecsTaskObserver struct {
extension.Extension
*observer.EndpointsWatcher
*endpointswatcher.EndpointsWatcher
config *Config
metadataProvider ecsutil.MetadataProvider
telemetry component.TelemetrySettings
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/ecstaskobserver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecstaskobserver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil"
)

Expand Down Expand Up @@ -63,7 +63,7 @@ func createExtension(
e.Extension = baseExtension{
ShutdownFunc: e.Shutdown,
}
e.EndpointsWatcher = observer.NewEndpointsWatcher(e, obsCfg.RefreshInterval, params.TelemetrySettings.Logger)
e.EndpointsWatcher = endpointswatcher.New(e, obsCfg.RefreshInterval, params.TelemetrySettings.Logger)

return e, nil
}
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/ecstaskobserver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/extension/extensiontest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
)

func TestFactoryCreatedExtensionIsEndpointsLister(t *testing.T) {
Expand All @@ -21,5 +21,5 @@ func TestFactoryCreatedExtensionIsEndpointsLister(t *testing.T) {
eto, err := etoFactory.Create(context.Background(), extensiontest.NewNopSettings(etoFactory.Type()), cfg)
require.NoError(t, err)
require.NotNil(t, eto)
require.Implements(t, (*observer.EndpointsLister)(nil), eto)
require.Implements(t, (*endpointswatcher.EndpointsLister)(nil), eto)
}
20 changes: 0 additions & 20 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"net"
"reflect"
)

type (
Expand Down Expand Up @@ -105,25 +104,6 @@ func (e *Endpoint) String() string {
return fmt.Sprintf("Endpoint{ID: %v, Target: %v, Details: %T%+v}", e.ID, e.Target, e.Details, e.Details)
}

func (e Endpoint) equals(other Endpoint) bool {
switch {
case e.ID != other.ID:
return false
case e.Target != other.Target:
return false
case e.Details == nil && other.Details != nil:
return false
case other.Details == nil && e.Details != nil:
return false
case e.Details == nil && other.Details == nil:
return true
case e.Details.Type() != other.Details.Type():
return false
default:
return reflect.DeepEqual(e.Details.Env(), other.Details.Env())
}
}

// K8sService is a discovered k8s service.
type K8sService struct {
// Name of the service.
Expand Down
175 changes: 0 additions & 175 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,178 +305,3 @@ func TestEndpointEnv(t *testing.T) {
})
}
}

func TestEndpointEquals(t *testing.T) {
tests := []struct {
name string
first Endpoint
second Endpoint
areEqual bool
}{
{
name: "equal empty endpoints",
first: Endpoint{}, second: Endpoint{},
areEqual: true,
},
{
name: "equal ID",
first: Endpoint{ID: "id"},
second: Endpoint{ID: "id"},
areEqual: true,
},
{
name: "unequal ID",
first: Endpoint{ID: "first"},
second: Endpoint{ID: "second"},
areEqual: false,
},
{
name: "equal Target",
first: Endpoint{Target: "target"},
second: Endpoint{Target: "target"},
areEqual: true,
},
{
name: "unequal Target",
first: Endpoint{Target: "first"},
second: Endpoint{Target: "second"},
areEqual: false,
},
{
name: "equal empty Port",
first: Endpoint{Details: &Port{}},
second: Endpoint{Details: &Port{}},
areEqual: true,
},
{
name: "equal Port Name",
first: Endpoint{Details: &Port{Name: "port_name"}},
second: Endpoint{Details: &Port{Name: "port_name"}},
areEqual: true,
},
{
name: "unequal Port Name",
first: Endpoint{Details: &Port{Name: "first"}},
second: Endpoint{Details: &Port{Name: "second"}},
areEqual: false,
},
{
name: "equal Port Port",
first: Endpoint{Details: &Port{Port: 2379}},
second: Endpoint{Details: &Port{Port: 2379}},
areEqual: true,
},
{
name: "unequal Port Port",
first: Endpoint{Details: &Port{Port: 0}},
second: Endpoint{Details: &Port{Port: 1}},
areEqual: false,
},
{
name: "equal Port Transport",
first: Endpoint{Details: &Port{Transport: "transport"}},
second: Endpoint{Details: &Port{Transport: "transport"}},
areEqual: true,
},
{
name: "unequal Port Transport",
first: Endpoint{Details: &Port{Transport: "first"}},
second: Endpoint{Details: &Port{Transport: "second"}},
areEqual: false,
},
{
name: "equal Port",
first: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
second: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
areEqual: true,
},
{
name: "unequal Port Pod Label",
first: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"key_one": "val_one",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
second: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"key_two": "val_two",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
areEqual: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.first.equals(tt.second), tt.areEqual)
require.Equal(t, tt.second.equals(tt.first), tt.areEqual)
})
}
}
7 changes: 7 additions & 0 deletions extension/observer/endpointswatcher/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package endpointswatcher provides a generic implementation of observer.Observable.
//
// This package is intended for observer implementations, and not observer consumers.
package endpointswatcher // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher"
Loading