diff --git a/extension/observer/cfgardenobserver/extension.go b/extension/observer/cfgardenobserver/extension.go index 5e247ab19a1e4..753011a3b3d18 100644 --- a/extension/observer/cfgardenobserver/extension.go +++ b/extension/observer/cfgardenobserver/extension.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" ) const ( @@ -34,7 +35,7 @@ const ( ) type cfGardenObserver struct { - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher config *Config doneChan chan struct{} logger *zap.Logger @@ -61,7 +62,7 @@ func newObserver(config *Config, logger *zap.Logger) (extension.Extension, error apps: make(map[string]*resource.App), doneChan: make(chan struct{}), } - g.EndpointsWatcher = observer.NewEndpointsWatcher(g, config.RefreshInterval, logger) + g.EndpointsWatcher = endpointswatcher.New(g, config.RefreshInterval, logger) return g, nil } diff --git a/extension/observer/dockerobserver/extension.go b/extension/observer/dockerobserver/extension.go index 4295fb97a3df0..2f81e19fa2e50 100644 --- a/extension/observer/dockerobserver/extension.go +++ b/extension/observer/dockerobserver/extension.go @@ -17,6 +17,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" dcommon "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/docker" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker" ) @@ -27,13 +28,12 @@ var ( ) var ( - _ extension.Extension = (*dockerObserver)(nil) - _ observer.EndpointsLister = (*dockerObserver)(nil) - _ observer.Observable = (*dockerObserver)(nil) + _ extension.Extension = (*dockerObserver)(nil) + _ observer.Observable = (*dockerObserver)(nil) ) type dockerObserver struct { - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher logger *zap.Logger config *Config cancel context.CancelFunc @@ -49,7 +49,7 @@ func newObserver(logger *zap.Logger, config *Config) (extension.Extension, error // Safe value provided on initialisation }, } - d.EndpointsWatcher = observer.NewEndpointsWatcher(d, time.Second, logger) + d.EndpointsWatcher = endpointswatcher.New(d, time.Second, logger) return d, nil } diff --git a/extension/observer/ecstaskobserver/extension.go b/extension/observer/ecstaskobserver/extension.go index e74394d227713..c74bbcd237b6c 100644 --- a/extension/observer/ecstaskobserver/extension.go +++ b/extension/observer/ecstaskobserver/extension.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil" dcommon "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/docker" ) @@ -20,14 +21,13 @@ import ( const runningStatus = "RUNNING" var ( - _ extension.Extension = (*ecsTaskObserver)(nil) - _ observer.EndpointsLister = (*ecsTaskObserver)(nil) - _ observer.Observable = (*ecsTaskObserver)(nil) + _ extension.Extension = (*ecsTaskObserver)(nil) + _ observer.Observable = (*ecsTaskObserver)(nil) ) type ecsTaskObserver struct { extension.Extension - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher config *Config metadataProvider ecsutil.MetadataProvider telemetry component.TelemetrySettings diff --git a/extension/observer/ecstaskobserver/factory.go b/extension/observer/ecstaskobserver/factory.go index 59ffd899368bd..609e35ea6cf68 100644 --- a/extension/observer/ecstaskobserver/factory.go +++ b/extension/observer/ecstaskobserver/factory.go @@ -11,8 +11,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecstaskobserver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil" ) @@ -63,7 +63,7 @@ func createExtension( e.Extension = baseExtension{ ShutdownFunc: e.Shutdown, } - e.EndpointsWatcher = observer.NewEndpointsWatcher(e, obsCfg.RefreshInterval, params.TelemetrySettings.Logger) + e.EndpointsWatcher = endpointswatcher.New(e, obsCfg.RefreshInterval, params.TelemetrySettings.Logger) return e, nil } diff --git a/extension/observer/ecstaskobserver/factory_test.go b/extension/observer/ecstaskobserver/factory_test.go index 035b60f7f8a68..a96891fb5f1a4 100644 --- a/extension/observer/ecstaskobserver/factory_test.go +++ b/extension/observer/ecstaskobserver/factory_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/extension/extensiontest" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" ) func TestFactoryCreatedExtensionIsEndpointsLister(t *testing.T) { @@ -21,5 +21,5 @@ func TestFactoryCreatedExtensionIsEndpointsLister(t *testing.T) { eto, err := etoFactory.Create(context.Background(), extensiontest.NewNopSettings(etoFactory.Type()), cfg) require.NoError(t, err) require.NotNil(t, eto) - require.Implements(t, (*observer.EndpointsLister)(nil), eto) + require.Implements(t, (*endpointswatcher.EndpointsLister)(nil), eto) } diff --git a/extension/observer/endpoints.go b/extension/observer/endpoints.go index 602d8ae2a14e7..f5e423fab8a9e 100644 --- a/extension/observer/endpoints.go +++ b/extension/observer/endpoints.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "net" - "reflect" ) type ( @@ -105,25 +104,6 @@ func (e *Endpoint) String() string { return fmt.Sprintf("Endpoint{ID: %v, Target: %v, Details: %T%+v}", e.ID, e.Target, e.Details, e.Details) } -func (e Endpoint) equals(other Endpoint) bool { - switch { - case e.ID != other.ID: - return false - case e.Target != other.Target: - return false - case e.Details == nil && other.Details != nil: - return false - case other.Details == nil && e.Details != nil: - return false - case e.Details == nil && other.Details == nil: - return true - case e.Details.Type() != other.Details.Type(): - return false - default: - return reflect.DeepEqual(e.Details.Env(), other.Details.Env()) - } -} - // K8sService is a discovered k8s service. type K8sService struct { // Name of the service. diff --git a/extension/observer/endpoints_test.go b/extension/observer/endpoints_test.go index 9dcb19f474bde..3c9f851f9083a 100644 --- a/extension/observer/endpoints_test.go +++ b/extension/observer/endpoints_test.go @@ -305,178 +305,3 @@ func TestEndpointEnv(t *testing.T) { }) } } - -func TestEndpointEquals(t *testing.T) { - tests := []struct { - name string - first Endpoint - second Endpoint - areEqual bool - }{ - { - name: "equal empty endpoints", - first: Endpoint{}, second: Endpoint{}, - areEqual: true, - }, - { - name: "equal ID", - first: Endpoint{ID: "id"}, - second: Endpoint{ID: "id"}, - areEqual: true, - }, - { - name: "unequal ID", - first: Endpoint{ID: "first"}, - second: Endpoint{ID: "second"}, - areEqual: false, - }, - { - name: "equal Target", - first: Endpoint{Target: "target"}, - second: Endpoint{Target: "target"}, - areEqual: true, - }, - { - name: "unequal Target", - first: Endpoint{Target: "first"}, - second: Endpoint{Target: "second"}, - areEqual: false, - }, - { - name: "equal empty Port", - first: Endpoint{Details: &Port{}}, - second: Endpoint{Details: &Port{}}, - areEqual: true, - }, - { - name: "equal Port Name", - first: Endpoint{Details: &Port{Name: "port_name"}}, - second: Endpoint{Details: &Port{Name: "port_name"}}, - areEqual: true, - }, - { - name: "unequal Port Name", - first: Endpoint{Details: &Port{Name: "first"}}, - second: Endpoint{Details: &Port{Name: "second"}}, - areEqual: false, - }, - { - name: "equal Port Port", - first: Endpoint{Details: &Port{Port: 2379}}, - second: Endpoint{Details: &Port{Port: 2379}}, - areEqual: true, - }, - { - name: "unequal Port Port", - first: Endpoint{Details: &Port{Port: 0}}, - second: Endpoint{Details: &Port{Port: 1}}, - areEqual: false, - }, - { - name: "equal Port Transport", - first: Endpoint{Details: &Port{Transport: "transport"}}, - second: Endpoint{Details: &Port{Transport: "transport"}}, - areEqual: true, - }, - { - name: "unequal Port Transport", - first: Endpoint{Details: &Port{Transport: "first"}}, - second: Endpoint{Details: &Port{Transport: "second"}}, - areEqual: false, - }, - { - name: "equal Port", - first: Endpoint{ - ID: EndpointID("port_id"), - Target: "192.68.73.2", - Details: &Port{ - Name: "port_name", - Pod: Pod{ - Name: "pod_name", - Labels: map[string]string{ - "label_key": "label_val", - }, - Annotations: map[string]string{ - "annotation_1": "value_1", - }, - Namespace: "pod-namespace", - UID: "pod-uid", - }, - Port: 2379, - Transport: ProtocolTCP, - }, - }, - second: Endpoint{ - ID: EndpointID("port_id"), - Target: "192.68.73.2", - Details: &Port{ - Name: "port_name", - Pod: Pod{ - Name: "pod_name", - Labels: map[string]string{ - "label_key": "label_val", - }, - Annotations: map[string]string{ - "annotation_1": "value_1", - }, - Namespace: "pod-namespace", - UID: "pod-uid", - }, - Port: 2379, - Transport: ProtocolTCP, - }, - }, - areEqual: true, - }, - { - name: "unequal Port Pod Label", - first: Endpoint{ - ID: EndpointID("port_id"), - Target: "192.68.73.2", - Details: &Port{ - Name: "port_name", - Pod: Pod{ - Name: "pod_name", - Labels: map[string]string{ - "key_one": "val_one", - }, - Annotations: map[string]string{ - "annotation_1": "value_1", - }, - Namespace: "pod-namespace", - UID: "pod-uid", - }, - Port: 2379, - Transport: ProtocolTCP, - }, - }, - second: Endpoint{ - ID: EndpointID("port_id"), - Target: "192.68.73.2", - Details: &Port{ - Name: "port_name", - Pod: Pod{ - Name: "pod_name", - Labels: map[string]string{ - "key_two": "val_two", - }, - Annotations: map[string]string{ - "annotation_1": "value_1", - }, - Namespace: "pod-namespace", - UID: "pod-uid", - }, - Port: 2379, - Transport: ProtocolTCP, - }, - }, - areEqual: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.first.equals(tt.second), tt.areEqual) - require.Equal(t, tt.second.equals(tt.first), tt.areEqual) - }) - } -} diff --git a/extension/observer/endpointswatcher/doc.go b/extension/observer/endpointswatcher/doc.go new file mode 100644 index 0000000000000..155c1ec9118ad --- /dev/null +++ b/extension/observer/endpointswatcher/doc.go @@ -0,0 +1,7 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package endpointswatcher provides a generic implementation of observer.Observable. +// +// This package is intended for observer implementations, and not observer consumers. +package endpointswatcher // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" diff --git a/extension/observer/endpointswatcher.go b/extension/observer/endpointswatcher/endpointswatcher.go similarity index 71% rename from extension/observer/endpointswatcher.go rename to extension/observer/endpointswatcher/endpointswatcher.go index 237d602f647a1..e968a5770d7c8 100644 --- a/extension/observer/endpointswatcher.go +++ b/extension/observer/endpointswatcher/endpointswatcher.go @@ -1,17 +1,20 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package observer // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +package endpointswatcher // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" import ( "encoding/json" + "reflect" "sync" "time" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" ) -var _ Observable = (*EndpointsWatcher)(nil) +var _ observer.Observable = (*EndpointsWatcher)(nil) // EndpointsWatcher provides a generic mechanism to run EndpointsLister.ListEndpoints every // RefreshInterval and report any new or removed endpoints to Notify instances registered @@ -31,7 +34,7 @@ type EndpointsWatcher struct { logger *zap.Logger } -func NewEndpointsWatcher(endpointsLister EndpointsLister, refreshInterval time.Duration, logger *zap.Logger) *EndpointsWatcher { +func New(endpointsLister EndpointsLister, refreshInterval time.Duration, logger *zap.Logger) *EndpointsWatcher { return &EndpointsWatcher{ EndpointsLister: endpointsLister, RefreshInterval: refreshInterval, @@ -45,7 +48,7 @@ func NewEndpointsWatcher(endpointsLister EndpointsLister, refreshInterval time.D // ListAndWatch runs EndpointsLister.ListEndpoints() on a regular interval and keeps track of the results // for alerting all subscribed Notify's of the based on the differences from the previous call. -func (ew *EndpointsWatcher) ListAndWatch(notify Notify) { +func (ew *EndpointsWatcher) ListAndWatch(notify observer.Notify) { ew.once.Do(func() { go func() { ticker := time.NewTicker(ew.RefreshInterval) @@ -56,9 +59,9 @@ func (ew *EndpointsWatcher) ListAndWatch(notify Notify) { case <-ew.stop: return case <-ticker.C: - var toNotify []NotifyID + var toNotify []observer.NotifyID ew.toNotify.Range(func(notifyID, _ any) bool { - toNotify = append(toNotify, notifyID.(NotifyID)) + toNotify = append(toNotify, notifyID.(observer.NotifyID)) return true }) ew.notifyOfLatestEndpoints(toNotify...) @@ -71,24 +74,24 @@ func (ew *EndpointsWatcher) ListAndWatch(notify Notify) { ew.notifyOfLatestEndpoints(notify.ID()) } -func (ew *EndpointsWatcher) Unsubscribe(notify Notify) { +func (ew *EndpointsWatcher) Unsubscribe(notify observer.Notify) { ew.toNotify.Delete(notify.ID()) ew.existingEndpoints.Delete(notify.ID()) } // notifyOfLatestEndpoints alerts subscribed Notify instances by their NotifyID of latest Endpoint events, // updating their internal store with results of ListEndpoints() call. -func (ew *EndpointsWatcher) notifyOfLatestEndpoints(notifyIDs ...NotifyID) { +func (ew *EndpointsWatcher) notifyOfLatestEndpoints(notifyIDs ...observer.NotifyID) { latestEndpoints := ew.EndpointsLister.ListEndpoints() wg := &sync.WaitGroup{} for _, notifyID := range notifyIDs { - var notify Notify + var notify observer.Notify if n, ok := ew.toNotify.Load(notifyID); !ok { // an Unsubscribe() must have occurred during this call ew.logger.Debug("notifyOfEndpoints() ignoring instruction to notify non-subscribed Notify", zap.Any("notify", notifyID)) continue - } else if notify, ok = n.(Notify); !ok { + } else if notify, ok = n.(observer.Notify); !ok { ew.logger.Warn("failed to obtain notify instance from EndpointsWatcher", zap.Any("notify", n)) continue } @@ -98,7 +101,7 @@ func (ew *EndpointsWatcher) notifyOfLatestEndpoints(notifyIDs ...NotifyID) { wg.Wait() } -func (ew *EndpointsWatcher) updateAndNotifyOfEndpoints(notify Notify, endpoints []Endpoint, done *sync.WaitGroup) { +func (ew *EndpointsWatcher) updateAndNotifyOfEndpoints(notify observer.Notify, endpoints []observer.Endpoint, done *sync.WaitGroup) { defer done.Done() removedEndpoints, addedEndpoints, changedEndpoints := ew.updateEndpoints(notify, endpoints) if len(removedEndpoints) > 0 { @@ -117,23 +120,23 @@ func (ew *EndpointsWatcher) updateAndNotifyOfEndpoints(notify Notify, endpoints } } -func (ew *EndpointsWatcher) updateEndpoints(notify Notify, endpoints []Endpoint) (removed, added, changed []Endpoint) { +func (ew *EndpointsWatcher) updateEndpoints(notify observer.Notify, endpoints []observer.Endpoint) (removed, added, changed []observer.Endpoint) { notifyID := notify.ID() // Create map from ID to endpoint for lookup. - endpointsMap := make(map[EndpointID]struct{}, len(endpoints)) + endpointsMap := make(map[observer.EndpointID]struct{}, len(endpoints)) for _, e := range endpoints { endpointsMap[e.ID] = struct{}{} } - le, _ := ew.existingEndpoints.LoadOrStore(notifyID, map[EndpointID]Endpoint{}) - var storedEndpoints map[EndpointID]Endpoint + le, _ := ew.existingEndpoints.LoadOrStore(notifyID, map[observer.EndpointID]observer.Endpoint{}) + var storedEndpoints map[observer.EndpointID]observer.Endpoint var ok bool - if storedEndpoints, ok = le.(map[EndpointID]Endpoint); !ok { + if storedEndpoints, ok = le.(map[observer.EndpointID]observer.Endpoint); !ok { ew.logger.Warn("failed to load Endpoint store from EndpointsWatcher", zap.Any("endpoints", le)) return } // copy to not modify sync.Map value directly (will be reloaded) - existingEndpoints := map[EndpointID]Endpoint{} + existingEndpoints := map[observer.EndpointID]observer.Endpoint{} for id, endpoint := range storedEndpoints { existingEndpoints[id] = endpoint } @@ -142,11 +145,11 @@ func (ew *EndpointsWatcher) updateEndpoints(notify Notify, endpoints []Endpoint) // to be added or updated in case it is not already available in existingEndpoints or doesn't match // the latest value. for _, e := range endpoints { - var existingEndpoint Endpoint + var existingEndpoint observer.Endpoint if existingEndpoint, ok = existingEndpoints[e.ID]; !ok { existingEndpoints[e.ID] = e added = append(added, e) - } else if !existingEndpoint.equals(e) { + } else if !endpointsEqual(e, existingEndpoint) { // Collect updated endpoints. existingEndpoints[e.ID] = e changed = append(changed, e) @@ -177,10 +180,10 @@ func (ew *EndpointsWatcher) StopListAndWatch() { type EndpointsLister interface { // ListEndpoints provides a list of endpoints and is expected to be // implemented by an observer looking for endpoints. - ListEndpoints() []Endpoint + ListEndpoints() []observer.Endpoint } -func (ew *EndpointsWatcher) logEndpointEvent(msg string, notify Notify, endpoints []Endpoint) { +func (ew *EndpointsWatcher) logEndpointEvent(msg string, notify observer.Notify, endpoints []observer.Endpoint) { if ce := ew.logger.Check(zap.DebugLevel, msg); ce != nil { fields := []zap.Field{zap.Any("notify", notify.ID())} for _, endpoint := range endpoints { @@ -193,3 +196,22 @@ func (ew *EndpointsWatcher) logEndpointEvent(msg string, notify Notify, endpoint ce.Write(fields...) } } + +func endpointsEqual(lhs, rhs observer.Endpoint) bool { + switch { + case lhs.ID != rhs.ID: + return false + case lhs.Target != rhs.Target: + return false + case lhs.Details == nil && rhs.Details != nil: + return false + case rhs.Details == nil && lhs.Details != nil: + return false + case lhs.Details == nil && rhs.Details == nil: + return true + case lhs.Details.Type() != rhs.Details.Type(): + return false + default: + return reflect.DeepEqual(lhs.Details.Env(), rhs.Details.Env()) + } +} diff --git a/extension/observer/endpointswatcher/endpointswatcher_test.go b/extension/observer/endpointswatcher/endpointswatcher_test.go new file mode 100644 index 0000000000000..bef23658cdd7e --- /dev/null +++ b/extension/observer/endpointswatcher/endpointswatcher_test.go @@ -0,0 +1,442 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package endpointswatcher + +import ( + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +var ( + expectedEndpointZero = observer.Endpoint{ID: observer.EndpointID(strconv.Itoa(0))} + expectedEndpointOne = observer.Endpoint{ID: observer.EndpointID(strconv.Itoa(1))} + expectedEndpointTwo = observer.Endpoint{ID: observer.EndpointID(strconv.Itoa(2))} + expectedEndpointThree = observer.Endpoint{ID: observer.EndpointID(strconv.Itoa(3))} + expectedEndpointFour = observer.Endpoint{ID: observer.EndpointID(strconv.Itoa(4))} +) + +func TestRefreshEndpointsOnStartup(t *testing.T) { + lister, watcher, notify := setup(t) + + lister.addEndpoint(0) + notify.On("OnAdd", []observer.Endpoint{expectedEndpointZero}) + watcher.ListAndWatch(notify) + watcher.StopListAndWatch() + + notify.AssertExpectations(t) + + // Endpoints available before the ListAndWatch call should be + // readily discovered. + expected := map[observer.EndpointID]observer.Endpoint{"0": {ID: "0"}} + require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) +} + +func TestNotifyOfLatestEndpoints(t *testing.T) { + lister, watcher, notify := setup(t) + defer watcher.StopListAndWatch() + + zeroAdded := notify.On("OnAdd", []observer.Endpoint{expectedEndpointZero}) + lister.addEndpoint(0) + watcher.ListAndWatch(notify) + notify.AssertExpectations(t) + zeroAdded.Unset() + + expected := map[observer.EndpointID]observer.Endpoint{"0": {ID: "0"}} + require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) + + oneAndTwoAdded := notify.On("OnAdd", []observer.Endpoint{expectedEndpointOne, expectedEndpointTwo}) + lister.addEndpoint(1) + lister.addEndpoint(2) + + zeroRemoved := notify.On("OnRemove", []observer.Endpoint{expectedEndpointZero}) + lister.removeEndpoint(0) + + watcher.notifyOfLatestEndpoints(notify.ID()) + notify.AssertExpectations(t) + oneAndTwoAdded.Unset() + zeroRemoved.Unset() + + expected["1"] = observer.Endpoint{ID: "1"} + expected["2"] = observer.Endpoint{ID: "2"} + delete(expected, "0") + require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) + + expectedUpdatedTwo := expectedEndpointTwo + expectedUpdatedTwo.Target = "updated_target" + twoChanged := notify.On("OnChange", []observer.Endpoint{expectedUpdatedTwo}) + lister.updateEndpoint(2, "updated_target") + + watcher.notifyOfLatestEndpoints(notify.ID()) + notify.AssertExpectations(t) + twoChanged.Unset() + + expected["2"] = observer.Endpoint{ID: "2", Target: "updated_target"} + require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) + + watcher.Unsubscribe(notify) + require.Nil(t, existingEndpoints(t, watcher, notify.ID())) +} + +func TestNotifyOfLatestEndpointsMultipleNotify(t *testing.T) { + lister, watcher, notifyOne := setup(t) + defer watcher.StopListAndWatch() + + lister.addEndpoint(0) + zeroAdded := notifyOne.On("OnAdd", []observer.Endpoint{expectedEndpointZero}) + watcher.ListAndWatch(notifyOne) + notifyOne.AssertExpectations(t) + zeroAdded.Unset() + + notifyTwo := &mockNotifier{id: "notify2"} + lister.addEndpoint(1) + zeroAndOneAdded := notifyTwo.On("OnAdd", []observer.Endpoint{expectedEndpointZero, expectedEndpointOne}) + watcher.ListAndWatch(notifyTwo) + notifyTwo.AssertExpectations(t) + zeroAndOneAdded.Unset() + + expectedOne := map[observer.EndpointID]observer.Endpoint{"0": {ID: "0"}} + require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) + expectedTwo := map[observer.EndpointID]observer.Endpoint{"0": {ID: "0"}, "1": {ID: "1"}} + require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) + + lister.addEndpoint(2) + lister.addEndpoint(3) + oneTwoAndThreeAdded := notifyOne.On("OnAdd", []observer.Endpoint{expectedEndpointOne, expectedEndpointTwo, expectedEndpointThree}) + twoAndThreeAdded := notifyTwo.On("OnAdd", []observer.Endpoint{expectedEndpointTwo, expectedEndpointThree}) + lister.removeEndpoint(0) + notifyOne.On("OnRemove", []observer.Endpoint{expectedEndpointZero}) + notifyTwo.On("OnRemove", []observer.Endpoint{expectedEndpointZero}) + watcher.notifyOfLatestEndpoints(notifyOne.ID(), notifyTwo.ID()) + notifyOne.AssertExpectations(t) + notifyTwo.AssertExpectations(t) + oneTwoAndThreeAdded.Unset() + twoAndThreeAdded.Unset() + + delete(expectedOne, "0") + expectedOne["1"] = observer.Endpoint{ID: "1"} + expectedOne["2"] = observer.Endpoint{ID: "2"} + expectedOne["3"] = observer.Endpoint{ID: "3"} + require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) + + delete(expectedTwo, "0") + expectedTwo["2"] = observer.Endpoint{ID: "2"} + expectedTwo["3"] = observer.Endpoint{ID: "3"} + require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) + + expectedUpdatedTwo := expectedEndpointTwo + expectedUpdatedTwo.Target = "updated_target" + oneTwoUpdated := notifyOne.On("OnChange", []observer.Endpoint{expectedUpdatedTwo}) + twoTwoUpdated := notifyTwo.On("OnChange", []observer.Endpoint{expectedUpdatedTwo}) + lister.updateEndpoint(2, "updated_target") + watcher.notifyOfLatestEndpoints(notifyOne.ID(), notifyTwo.ID()) + notifyOne.AssertExpectations(t) + notifyTwo.AssertExpectations(t) + oneTwoUpdated.Unset() + twoTwoUpdated.Unset() + + expectedOne["2"] = observer.Endpoint{ID: "2", Target: "updated_target"} + require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) + expectedTwo["2"] = observer.Endpoint{ID: "2", Target: "updated_target"} + require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) + + watcher.Unsubscribe(notifyOne) + require.Nil(t, existingEndpoints(t, watcher, notifyOne.ID())) + + lister.addEndpoint(4) + fourAdded := notifyTwo.On("OnAdd", []observer.Endpoint{expectedEndpointFour}) + watcher.ListAndWatch(notifyTwo) + notifyOne.AssertExpectations(t) + notifyTwo.AssertExpectations(t) + fourAdded.Unset() + + expectedTwo["4"] = observer.Endpoint{ID: "4"} + require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) + + watcher.Unsubscribe(notifyTwo) + require.Nil(t, existingEndpoints(t, watcher, notifyTwo.ID())) +} + +func existingEndpoints(tb testing.TB, watcher *EndpointsWatcher, id observer.NotifyID) map[observer.EndpointID]observer.Endpoint { + if existing, ok := watcher.existingEndpoints.Load(id); ok { + endpoints, ok := existing.(map[observer.EndpointID]observer.Endpoint) + assert.True(tb, ok) + return endpoints + } + return nil +} + +func setup(tb testing.TB) (*mockEndpointsLister, *EndpointsWatcher, *mockNotifier) { + ml := &mockEndpointsLister{ + endpointsMap: map[observer.EndpointID]observer.Endpoint{}, + } + + ew := New(ml, 2*time.Second, zaptest.NewLogger(tb)) + mn := &mockNotifier{id: "mockNotifier"} + + return ml, ew, mn +} + +var _ observer.Notify = (*mockNotifier)(nil) + +type mockNotifier struct { + mock.Mock + id string +} + +func (m *mockNotifier) ID() observer.NotifyID { + return observer.NotifyID(m.id) +} + +func (m *mockNotifier) OnAdd(added []observer.Endpoint) { + m.Called(sortEndpoints(added)) +} + +func (m *mockNotifier) OnRemove(removed []observer.Endpoint) { + m.Called(sortEndpoints(removed)) +} + +func (m *mockNotifier) OnChange(changed []observer.Endpoint) { + m.Called(sortEndpoints(changed)) +} + +func sortEndpoints(endpoints []observer.Endpoint) []observer.Endpoint { + sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].ID < endpoints[j].ID }) + return endpoints +} + +type mockEndpointsLister struct { + sync.Mutex + endpointsMap map[observer.EndpointID]observer.Endpoint +} + +func (m *mockEndpointsLister) addEndpoint(n int) { + m.Lock() + defer m.Unlock() + + id := observer.EndpointID(strconv.Itoa(n)) + e := observer.Endpoint{ID: id} + m.endpointsMap[id] = e +} + +func (m *mockEndpointsLister) removeEndpoint(n int) { + m.Lock() + defer m.Unlock() + + id := observer.EndpointID(strconv.Itoa(n)) + delete(m.endpointsMap, id) +} + +func (m *mockEndpointsLister) updateEndpoint(n int, target string) { + m.Lock() + defer m.Unlock() + + id := observer.EndpointID(strconv.Itoa(n)) + e := observer.Endpoint{ + ID: id, + Target: target, + } + m.endpointsMap[id] = e +} + +func (m *mockEndpointsLister) ListEndpoints() []observer.Endpoint { + m.Lock() + defer m.Unlock() + + out := make([]observer.Endpoint, len(m.endpointsMap)) + + i := 0 + for _, e := range m.endpointsMap { + out[i] = e + i++ + } + + return out +} + +var _ EndpointsLister = (*mockEndpointsLister)(nil) + +func TestEndpointsEqual(t *testing.T) { + tests := []struct { + name string + first observer.Endpoint + second observer.Endpoint + areEqual bool + }{ + { + name: "equal empty endpoints", + first: observer.Endpoint{}, second: observer.Endpoint{}, + areEqual: true, + }, + { + name: "equal ID", + first: observer.Endpoint{ID: "id"}, + second: observer.Endpoint{ID: "id"}, + areEqual: true, + }, + { + name: "unequal ID", + first: observer.Endpoint{ID: "first"}, + second: observer.Endpoint{ID: "second"}, + areEqual: false, + }, + { + name: "equal Target", + first: observer.Endpoint{Target: "target"}, + second: observer.Endpoint{Target: "target"}, + areEqual: true, + }, + { + name: "unequal Target", + first: observer.Endpoint{Target: "first"}, + second: observer.Endpoint{Target: "second"}, + areEqual: false, + }, + { + name: "equal empty observer.Port", + first: observer.Endpoint{Details: &observer.Port{}}, + second: observer.Endpoint{Details: &observer.Port{}}, + areEqual: true, + }, + { + name: "equal observer.Port Name", + first: observer.Endpoint{Details: &observer.Port{Name: "port_name"}}, + second: observer.Endpoint{Details: &observer.Port{Name: "port_name"}}, + areEqual: true, + }, + { + name: "unequal observer.Port Name", + first: observer.Endpoint{Details: &observer.Port{Name: "first"}}, + second: observer.Endpoint{Details: &observer.Port{Name: "second"}}, + areEqual: false, + }, + { + name: "equal observer.Port observer.Port", + first: observer.Endpoint{Details: &observer.Port{Port: 2379}}, + second: observer.Endpoint{Details: &observer.Port{Port: 2379}}, + areEqual: true, + }, + { + name: "unequal observer.Port observer.Port", + first: observer.Endpoint{Details: &observer.Port{Port: 0}}, + second: observer.Endpoint{Details: &observer.Port{Port: 1}}, + areEqual: false, + }, + { + name: "equal observer.Port Transport", + first: observer.Endpoint{Details: &observer.Port{Transport: "transport"}}, + second: observer.Endpoint{Details: &observer.Port{Transport: "transport"}}, + areEqual: true, + }, + { + name: "unequal observer.Port Transport", + first: observer.Endpoint{Details: &observer.Port{Transport: "first"}}, + second: observer.Endpoint{Details: &observer.Port{Transport: "second"}}, + areEqual: false, + }, + { + name: "equal observer.Port", + first: observer.Endpoint{ + ID: observer.EndpointID("port_id"), + Target: "192.68.73.2", + Details: &observer.Port{ + Name: "port_name", + Pod: observer.Pod{ + Name: "pod_name", + Labels: map[string]string{ + "label_key": "label_val", + }, + Annotations: map[string]string{ + "annotation_1": "value_1", + }, + Namespace: "pod-namespace", + UID: "pod-uid", + }, + Port: 2379, + Transport: observer.ProtocolTCP, + }, + }, + second: observer.Endpoint{ + ID: observer.EndpointID("port_id"), + Target: "192.68.73.2", + Details: &observer.Port{ + Name: "port_name", + Pod: observer.Pod{ + Name: "pod_name", + Labels: map[string]string{ + "label_key": "label_val", + }, + Annotations: map[string]string{ + "annotation_1": "value_1", + }, + Namespace: "pod-namespace", + UID: "pod-uid", + }, + Port: 2379, + Transport: observer.ProtocolTCP, + }, + }, + areEqual: true, + }, + { + name: "unequal observer.Port Pod Label", + first: observer.Endpoint{ + ID: observer.EndpointID("port_id"), + Target: "192.68.73.2", + Details: &observer.Port{ + Name: "port_name", + Pod: observer.Pod{ + Name: "pod_name", + Labels: map[string]string{ + "key_one": "val_one", + }, + Annotations: map[string]string{ + "annotation_1": "value_1", + }, + Namespace: "pod-namespace", + UID: "pod-uid", + }, + Port: 2379, + Transport: observer.ProtocolTCP, + }, + }, + second: observer.Endpoint{ + ID: observer.EndpointID("port_id"), + Target: "192.68.73.2", + Details: &observer.Port{ + Name: "port_name", + Pod: observer.Pod{ + Name: "pod_name", + Labels: map[string]string{ + "key_two": "val_two", + }, + Annotations: map[string]string{ + "annotation_1": "value_1", + }, + Namespace: "pod-namespace", + UID: "pod-uid", + }, + Port: 2379, + Transport: observer.ProtocolTCP, + }, + }, + areEqual: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, endpointsEqual(tt.first, tt.second), tt.areEqual) + require.Equal(t, endpointsEqual(tt.second, tt.first), tt.areEqual) + }) + } +} diff --git a/extension/observer/endpointswatcher_test.go b/extension/observer/endpointswatcher_test.go deleted file mode 100644 index 8e4db5ee976ba..0000000000000 --- a/extension/observer/endpointswatcher_test.go +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package observer - -import ( - "sort" - "strconv" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -var ( - expectedEndpointZero = Endpoint{ID: EndpointID(strconv.Itoa(0))} - expectedEndpointOne = Endpoint{ID: EndpointID(strconv.Itoa(1))} - expectedEndpointTwo = Endpoint{ID: EndpointID(strconv.Itoa(2))} - expectedEndpointThree = Endpoint{ID: EndpointID(strconv.Itoa(3))} - expectedEndpointFour = Endpoint{ID: EndpointID(strconv.Itoa(4))} -) - -func TestRefreshEndpointsOnStartup(t *testing.T) { - lister, watcher, notify := setup(t) - - lister.addEndpoint(0) - notify.On("OnAdd", []Endpoint{expectedEndpointZero}) - watcher.ListAndWatch(notify) - watcher.StopListAndWatch() - - notify.AssertExpectations(t) - - // Endpoints available before the ListAndWatch call should be - // readily discovered. - expected := map[EndpointID]Endpoint{"0": {ID: "0"}} - require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) -} - -func TestNotifyOfLatestEndpoints(t *testing.T) { - lister, watcher, notify := setup(t) - defer watcher.StopListAndWatch() - - zeroAdded := notify.On("OnAdd", []Endpoint{expectedEndpointZero}) - lister.addEndpoint(0) - watcher.ListAndWatch(notify) - notify.AssertExpectations(t) - zeroAdded.Unset() - - expected := map[EndpointID]Endpoint{"0": {ID: "0"}} - require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) - - oneAndTwoAdded := notify.On("OnAdd", []Endpoint{expectedEndpointOne, expectedEndpointTwo}) - lister.addEndpoint(1) - lister.addEndpoint(2) - - zeroRemoved := notify.On("OnRemove", []Endpoint{expectedEndpointZero}) - lister.removeEndpoint(0) - - watcher.notifyOfLatestEndpoints(notify.ID()) - notify.AssertExpectations(t) - oneAndTwoAdded.Unset() - zeroRemoved.Unset() - - expected["1"] = Endpoint{ID: "1"} - expected["2"] = Endpoint{ID: "2"} - delete(expected, "0") - require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) - - expectedUpdatedTwo := expectedEndpointTwo - expectedUpdatedTwo.Target = "updated_target" - twoChanged := notify.On("OnChange", []Endpoint{expectedUpdatedTwo}) - lister.updateEndpoint(2, "updated_target") - - watcher.notifyOfLatestEndpoints(notify.ID()) - notify.AssertExpectations(t) - twoChanged.Unset() - - expected["2"] = Endpoint{ID: "2", Target: "updated_target"} - require.Equal(t, expected, existingEndpoints(t, watcher, notify.ID())) - - watcher.Unsubscribe(notify) - require.Nil(t, existingEndpoints(t, watcher, notify.ID())) -} - -func TestNotifyOfLatestEndpointsMultipleNotify(t *testing.T) { - lister, watcher, notifyOne := setup(t) - defer watcher.StopListAndWatch() - - lister.addEndpoint(0) - zeroAdded := notifyOne.On("OnAdd", []Endpoint{expectedEndpointZero}) - watcher.ListAndWatch(notifyOne) - notifyOne.AssertExpectations(t) - zeroAdded.Unset() - - notifyTwo := &mockNotifier{id: "notify2"} - lister.addEndpoint(1) - zeroAndOneAdded := notifyTwo.On("OnAdd", []Endpoint{expectedEndpointZero, expectedEndpointOne}) - watcher.ListAndWatch(notifyTwo) - notifyTwo.AssertExpectations(t) - zeroAndOneAdded.Unset() - - expectedOne := map[EndpointID]Endpoint{"0": {ID: "0"}} - require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) - expectedTwo := map[EndpointID]Endpoint{"0": {ID: "0"}, "1": {ID: "1"}} - require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) - - lister.addEndpoint(2) - lister.addEndpoint(3) - oneTwoAndThreeAdded := notifyOne.On("OnAdd", []Endpoint{expectedEndpointOne, expectedEndpointTwo, expectedEndpointThree}) - twoAndThreeAdded := notifyTwo.On("OnAdd", []Endpoint{expectedEndpointTwo, expectedEndpointThree}) - lister.removeEndpoint(0) - notifyOne.On("OnRemove", []Endpoint{expectedEndpointZero}) - notifyTwo.On("OnRemove", []Endpoint{expectedEndpointZero}) - watcher.notifyOfLatestEndpoints(notifyOne.ID(), notifyTwo.ID()) - notifyOne.AssertExpectations(t) - notifyTwo.AssertExpectations(t) - oneTwoAndThreeAdded.Unset() - twoAndThreeAdded.Unset() - - delete(expectedOne, "0") - expectedOne["1"] = Endpoint{ID: "1"} - expectedOne["2"] = Endpoint{ID: "2"} - expectedOne["3"] = Endpoint{ID: "3"} - require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) - - delete(expectedTwo, "0") - expectedTwo["2"] = Endpoint{ID: "2"} - expectedTwo["3"] = Endpoint{ID: "3"} - require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) - - expectedUpdatedTwo := expectedEndpointTwo - expectedUpdatedTwo.Target = "updated_target" - oneTwoUpdated := notifyOne.On("OnChange", []Endpoint{expectedUpdatedTwo}) - twoTwoUpdated := notifyTwo.On("OnChange", []Endpoint{expectedUpdatedTwo}) - lister.updateEndpoint(2, "updated_target") - watcher.notifyOfLatestEndpoints(notifyOne.ID(), notifyTwo.ID()) - notifyOne.AssertExpectations(t) - notifyTwo.AssertExpectations(t) - oneTwoUpdated.Unset() - twoTwoUpdated.Unset() - - expectedOne["2"] = Endpoint{ID: "2", Target: "updated_target"} - require.Equal(t, expectedOne, existingEndpoints(t, watcher, notifyOne.ID())) - expectedTwo["2"] = Endpoint{ID: "2", Target: "updated_target"} - require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) - - watcher.Unsubscribe(notifyOne) - require.Nil(t, existingEndpoints(t, watcher, notifyOne.ID())) - - lister.addEndpoint(4) - fourAdded := notifyTwo.On("OnAdd", []Endpoint{expectedEndpointFour}) - watcher.ListAndWatch(notifyTwo) - notifyOne.AssertExpectations(t) - notifyTwo.AssertExpectations(t) - fourAdded.Unset() - - expectedTwo["4"] = Endpoint{ID: "4"} - require.Equal(t, expectedTwo, existingEndpoints(t, watcher, notifyTwo.ID())) - - watcher.Unsubscribe(notifyTwo) - require.Nil(t, existingEndpoints(t, watcher, notifyTwo.ID())) -} - -func existingEndpoints(tb testing.TB, watcher *EndpointsWatcher, id NotifyID) map[EndpointID]Endpoint { - if existing, ok := watcher.existingEndpoints.Load(id); ok { - endpoints, ok := existing.(map[EndpointID]Endpoint) - assert.True(tb, ok) - return endpoints - } - return nil -} - -func setup(tb testing.TB) (*mockEndpointsLister, *EndpointsWatcher, *mockNotifier) { - ml := &mockEndpointsLister{ - endpointsMap: map[EndpointID]Endpoint{}, - } - - ew := NewEndpointsWatcher(ml, 2*time.Second, zaptest.NewLogger(tb)) - mn := &mockNotifier{id: "mockNotifier"} - - return ml, ew, mn -} - -var _ Notify = (*mockNotifier)(nil) - -type mockNotifier struct { - mock.Mock - id string -} - -func (m *mockNotifier) ID() NotifyID { - return NotifyID(m.id) -} - -func (m *mockNotifier) OnAdd(added []Endpoint) { - m.Called(sortEndpoints(added)) -} - -func (m *mockNotifier) OnRemove(removed []Endpoint) { - m.Called(sortEndpoints(removed)) -} - -func (m *mockNotifier) OnChange(changed []Endpoint) { - m.Called(sortEndpoints(changed)) -} - -func sortEndpoints(endpoints []Endpoint) []Endpoint { - sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].ID < endpoints[j].ID }) - return endpoints -} - -type mockEndpointsLister struct { - sync.Mutex - endpointsMap map[EndpointID]Endpoint -} - -func (m *mockEndpointsLister) addEndpoint(n int) { - m.Lock() - defer m.Unlock() - - id := EndpointID(strconv.Itoa(n)) - e := Endpoint{ID: id} - m.endpointsMap[id] = e -} - -func (m *mockEndpointsLister) removeEndpoint(n int) { - m.Lock() - defer m.Unlock() - - id := EndpointID(strconv.Itoa(n)) - delete(m.endpointsMap, id) -} - -func (m *mockEndpointsLister) updateEndpoint(n int, target string) { - m.Lock() - defer m.Unlock() - - id := EndpointID(strconv.Itoa(n)) - e := Endpoint{ - ID: id, - Target: target, - } - m.endpointsMap[id] = e -} - -func (m *mockEndpointsLister) ListEndpoints() []Endpoint { - m.Lock() - defer m.Unlock() - - out := make([]Endpoint, len(m.endpointsMap)) - - i := 0 - for _, e := range m.endpointsMap { - out[i] = e - i++ - } - - return out -} - -var _ EndpointsLister = (*mockEndpointsLister)(nil) diff --git a/extension/observer/hostobserver/extension.go b/extension/observer/hostobserver/extension.go index b3964720d9ffc..781542715c93a 100644 --- a/extension/observer/hostobserver/extension.go +++ b/extension/observer/hostobserver/extension.go @@ -16,10 +16,11 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" ) type hostObserver struct { - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher } type endpointsLister struct { @@ -36,7 +37,7 @@ var _ extension.Extension = (*hostObserver)(nil) func newObserver(params extension.Settings, config *Config) (extension.Extension, error) { h := &hostObserver{ - EndpointsWatcher: observer.NewEndpointsWatcher( + EndpointsWatcher: endpointswatcher.New( endpointsLister{ logger: params.Logger, observerName: params.ID.String(), diff --git a/extension/observer/hostobserver/extension_test.go b/extension/observer/hostobserver/extension_test.go index 964136b84a4b2..d184634d95cee 100644 --- a/extension/observer/hostobserver/extension_test.go +++ b/extension/observer/hostobserver/extension_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" ) // Tests observer with real connections on system. @@ -173,7 +174,7 @@ func startAndStopObserver( require.NotNil(t, ml.getProcess) require.NotNil(t, ml.collectProcessDetails) - h := &hostObserver{EndpointsWatcher: observer.NewEndpointsWatcher(ml, 10*time.Second, zaptest.NewLogger(t))} + h := &hostObserver{EndpointsWatcher: endpointswatcher.New(ml, 10*time.Second, zaptest.NewLogger(t))} mn := mockNotifier{map[observer.EndpointID]observer.Endpoint{}} diff --git a/extension/observer/k8sobserver/extension.go b/extension/observer/k8sobserver/extension.go index 907b770e73fe6..cbf484e270ace 100644 --- a/extension/observer/k8sobserver/extension.go +++ b/extension/observer/k8sobserver/extension.go @@ -18,6 +18,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) @@ -27,7 +28,7 @@ var ( ) type k8sObserver struct { - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher telemetry component.TelemetrySettings podListerWatcher cache.ListerWatcher serviceListerWatcher cache.ListerWatcher @@ -138,7 +139,7 @@ func newObserver(config *Config, set extension.Settings) (extension.Extension, e } h := &handler{idNamespace: set.ID.String(), endpoints: &sync.Map{}, logger: set.TelemetrySettings.Logger} obs := &k8sObserver{ - EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger), + EndpointsWatcher: endpointswatcher.New(h, time.Second, set.TelemetrySettings.Logger), telemetry: set.TelemetrySettings, podListerWatcher: podListerWatcher, serviceListerWatcher: serviceListerWatcher, diff --git a/extension/observer/k8sobserver/extension_test.go b/extension/observer/k8sobserver/extension_test.go index 1e8b5e18ad5a3..aec382cb6c298 100644 --- a/extension/observer/k8sobserver/extension_test.go +++ b/extension/observer/k8sobserver/extension_test.go @@ -396,4 +396,5 @@ func TestExtensionObserveIngresses(t *testing.T) { }, sink.removed[0]) require.NoError(t, ext.Shutdown(context.Background())) + obs.StopListAndWatch() } diff --git a/extension/observer/k8sobserver/handler.go b/extension/observer/k8sobserver/handler.go index 7e1e64c03f219..60883e945122d 100644 --- a/extension/observer/k8sobserver/handler.go +++ b/extension/observer/k8sobserver/handler.go @@ -13,11 +13,12 @@ import ( "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" ) var ( - _ cache.ResourceEventHandler = (*handler)(nil) - _ observer.EndpointsLister = (*handler)(nil) + _ cache.ResourceEventHandler = (*handler)(nil) + _ endpointswatcher.EndpointsLister = (*handler)(nil) ) // handler handles k8s cache informer callbacks. diff --git a/extension/observer/kafkatopicsobserver/extension.go b/extension/observer/kafkatopicsobserver/extension.go index d0da797488f8f..7fb0601b5b8ba 100644 --- a/extension/observer/kafkatopicsobserver/extension.go +++ b/extension/observer/kafkatopicsobserver/extension.go @@ -15,17 +15,17 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" ) var ( - _ extension.Extension = (*kafkaTopicsObserver)(nil) - _ observer.EndpointsLister = (*kafkaTopicsObserver)(nil) - _ observer.Observable = (*kafkaTopicsObserver)(nil) + _ extension.Extension = (*kafkaTopicsObserver)(nil) + _ observer.Observable = (*kafkaTopicsObserver)(nil) ) type kafkaTopicsObserver struct { - *observer.EndpointsWatcher + *endpointswatcher.EndpointsWatcher logger *zap.Logger config *Config doneChan chan struct{} @@ -54,7 +54,7 @@ func newObserver(logger *zap.Logger, config *Config) (extension.Extension, error kafkaAdmin: admin, doneChan: make(chan struct{}), } - d.EndpointsWatcher = observer.NewEndpointsWatcher(d, time.Second, logger) + d.EndpointsWatcher = endpointswatcher.New(d, time.Second, logger) return d, nil } diff --git a/receiver/receivercreator/go.sum b/receiver/receivercreator/go.sum index 36c36e1ea18f2..0f2d8cbdd2ec4 100644 --- a/receiver/receivercreator/go.sum +++ b/receiver/receivercreator/go.sum @@ -104,8 +104,6 @@ github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wx github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=