diff --git a/cmd/osm-controller/log_handler_test.go b/cmd/osm-controller/log_handler_test.go index 0e5eeb5958..5623b7657e 100644 --- a/cmd/osm-controller/log_handler_test.go +++ b/cmd/osm-controller/log_handler_test.go @@ -27,7 +27,7 @@ func TestGlobalLogLevelHandler(t *testing.T) { // Set log level through a meshconfig event mockConfigurator.EXPECT().GetOSMLogLevel().Return("warn").Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.MeshConfigUpdated, + Kind: announcements.MeshConfigUpdated, }) assert.Eventually(func() bool { @@ -37,7 +37,7 @@ func TestGlobalLogLevelHandler(t *testing.T) { // Reset back mockConfigurator.EXPECT().GetOSMLogLevel().Return("trace").Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.MeshConfigUpdated, + Kind: announcements.MeshConfigUpdated, }) assert.Eventually(func() bool { diff --git a/pkg/announcements/types.go b/pkg/announcements/types.go index ee1068e99d..1d78ffc3fe 100644 --- a/pkg/announcements/types.go +++ b/pkg/announcements/types.go @@ -2,184 +2,184 @@ // Kubernetes API server that are propagated internally within the control plane to trigger configuration changes. package announcements -// AnnouncementType is used to record the type of announcement -type AnnouncementType string +// Kind is used to record the kind of announcement +type Kind string -func (at AnnouncementType) String() string { +func (at Kind) String() string { return string(at) } const ( // ScheduleProxyBroadcast is used by other modules to request the dispatcher to schedule a global proxy broadcast - ScheduleProxyBroadcast AnnouncementType = "schedule-proxy-broadcast" + ScheduleProxyBroadcast Kind = "schedule-proxy-broadcast" // TickerStart starts Ticker to trigger time-based proxy updates - TickerStart AnnouncementType = "ticker-start" + TickerStart Kind = "ticker-start" // TickerStop stops Ticker to stop time-based proxy updates - TickerStop AnnouncementType = "ticker-stop" + TickerStop Kind = "ticker-stop" // ProxyBroadcast is used to notify all Proxy streams that they need to trigger an update - ProxyBroadcast AnnouncementType = "proxy-broadcast" + ProxyBroadcast Kind = "proxy-broadcast" // PodAdded is the type of announcement emitted when we observe an addition of a Kubernetes Pod - PodAdded AnnouncementType = "pod-added" + PodAdded Kind = "pod-added" // PodDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Pod - PodDeleted AnnouncementType = "pod-deleted" + PodDeleted Kind = "pod-deleted" // PodUpdated is the type of announcement emitted when we observe an update to a Kubernetes Pod - PodUpdated AnnouncementType = "pod-updated" + PodUpdated Kind = "pod-updated" // --- // EndpointAdded is the type of announcement emitted when we observe an addition of a Kubernetes Endpoint - EndpointAdded AnnouncementType = "endpoint-added" + EndpointAdded Kind = "endpoint-added" // EndpointDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Endpoint - EndpointDeleted AnnouncementType = "endpoint-deleted" + EndpointDeleted Kind = "endpoint-deleted" // EndpointUpdated is the type of announcement emitted when we observe an update to a Kubernetes Endpoint - EndpointUpdated AnnouncementType = "endpoint-updated" + EndpointUpdated Kind = "endpoint-updated" // --- // NamespaceAdded is the type of announcement emitted when we observe an addition of a Kubernetes Namespace - NamespaceAdded AnnouncementType = "namespace-added" + NamespaceAdded Kind = "namespace-added" // NamespaceDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Namespace - NamespaceDeleted AnnouncementType = "namespace-deleted" + NamespaceDeleted Kind = "namespace-deleted" // NamespaceUpdated is the type of announcement emitted when we observe an update to a Kubernetes Namespace - NamespaceUpdated AnnouncementType = "namespace-updated" + NamespaceUpdated Kind = "namespace-updated" // --- // ServiceAdded is the type of announcement emitted when we observe an addition of a Kubernetes Service - ServiceAdded AnnouncementType = "service-added" + ServiceAdded Kind = "service-added" // ServiceDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Service - ServiceDeleted AnnouncementType = "service-deleted" + ServiceDeleted Kind = "service-deleted" // ServiceUpdated is the type of announcement emitted when we observe an update to a Kubernetes Service - ServiceUpdated AnnouncementType = "service-updated" + ServiceUpdated Kind = "service-updated" // --- // ServiceAccountAdded is the type of announcement emitted when we observe an addition of a Kubernetes Service Account - ServiceAccountAdded AnnouncementType = "serviceaccount-added" + ServiceAccountAdded Kind = "serviceaccount-added" // ServiceAccountDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Service Account - ServiceAccountDeleted AnnouncementType = "serviceaccount-deleted" + ServiceAccountDeleted Kind = "serviceaccount-deleted" // ServiceAccountUpdated is the type of announcement emitted when we observe an update to a Kubernetes Service - ServiceAccountUpdated AnnouncementType = "serviceaccount-updated" + ServiceAccountUpdated Kind = "serviceaccount-updated" // --- // TrafficSplitAdded is the type of announcement emitted when we observe an addition of a Kubernetes TrafficSplit - TrafficSplitAdded AnnouncementType = "trafficsplit-added" + TrafficSplitAdded Kind = "trafficsplit-added" // TrafficSplitDeleted the type of announcement emitted when we observe the deletion of a Kubernetes TrafficSplit - TrafficSplitDeleted AnnouncementType = "trafficsplit-deleted" + TrafficSplitDeleted Kind = "trafficsplit-deleted" // TrafficSplitUpdated is the type of announcement emitted when we observe an update to a Kubernetes TrafficSplit - TrafficSplitUpdated AnnouncementType = "trafficsplit-updated" + TrafficSplitUpdated Kind = "trafficsplit-updated" // --- // RouteGroupAdded is the type of announcement emitted when we observe an addition of a Kubernetes RouteGroup - RouteGroupAdded AnnouncementType = "routegroup-added" + RouteGroupAdded Kind = "routegroup-added" // RouteGroupDeleted the type of announcement emitted when we observe the deletion of a Kubernetes RouteGroup - RouteGroupDeleted AnnouncementType = "routegroup-deleted" + RouteGroupDeleted Kind = "routegroup-deleted" // RouteGroupUpdated is the type of announcement emitted when we observe an update to a Kubernetes RouteGroup - RouteGroupUpdated AnnouncementType = "routegroup-updated" + RouteGroupUpdated Kind = "routegroup-updated" // --- // TCPRouteAdded is the type of announcement emitted when we observe an addition of a Kubernetes TCPRoute - TCPRouteAdded AnnouncementType = "tcproute-added" + TCPRouteAdded Kind = "tcproute-added" // TCPRouteDeleted the type of announcement emitted when we observe the deletion of a Kubernetes TCPRoute - TCPRouteDeleted AnnouncementType = "tcproute-deleted" + TCPRouteDeleted Kind = "tcproute-deleted" // TCPRouteUpdated is the type of announcement emitted when we observe an update to a Kubernetes TCPRoute - TCPRouteUpdated AnnouncementType = "tcproute-updated" + TCPRouteUpdated Kind = "tcproute-updated" // --- // TrafficTargetAdded is the type of announcement emitted when we observe an addition of a Kubernetes TrafficTarget - TrafficTargetAdded AnnouncementType = "traffictarget-added" + TrafficTargetAdded Kind = "traffictarget-added" // TrafficTargetDeleted the type of announcement emitted when we observe the deletion of a Kubernetes TrafficTarget - TrafficTargetDeleted AnnouncementType = "traffictarget-deleted" + TrafficTargetDeleted Kind = "traffictarget-deleted" // TrafficTargetUpdated is the type of announcement emitted when we observe an update to a Kubernetes TrafficTarget - TrafficTargetUpdated AnnouncementType = "traffictarget-updated" + TrafficTargetUpdated Kind = "traffictarget-updated" // --- // IngressAdded is the type of announcement emitted when we observe an addition of a Kubernetes Ingress - IngressAdded AnnouncementType = "ingress-added" + IngressAdded Kind = "ingress-added" // IngressDeleted the type of announcement emitted when we observe the deletion of a Kubernetes Ingress - IngressDeleted AnnouncementType = "ingress-deleted" + IngressDeleted Kind = "ingress-deleted" // IngressUpdated is the type of announcement emitted when we observe an update to a Kubernetes Ingress - IngressUpdated AnnouncementType = "ingress-updated" + IngressUpdated Kind = "ingress-updated" // --- // CertificateRotated is the type of announcement emitted when a certificate is rotated by the certificate provider - CertificateRotated AnnouncementType = "certificate-rotated" + CertificateRotated Kind = "certificate-rotated" // --- // MeshConfigAdded is the type of announcement emitted when we observe an addition of a Kubernetes MeshConfig - MeshConfigAdded AnnouncementType = "meshconfig-added" + MeshConfigAdded Kind = "meshconfig-added" // MeshConfigDeleted the type of announcement emitted when we observe the deletion of a Kubernetes MeshConfig - MeshConfigDeleted AnnouncementType = "meshconfig-deleted" + MeshConfigDeleted Kind = "meshconfig-deleted" // MeshConfigUpdated is the type of announcement emitted when we observe an update to a Kubernetes MeshConfig - MeshConfigUpdated AnnouncementType = "meshconfig-updated" + MeshConfigUpdated Kind = "meshconfig-updated" // --- policy.openservicemesh.io API events // EgressAdded is the type of announcement emitted when we observe an addition of egresses.policy.openservicemesh.io - EgressAdded AnnouncementType = "egress-added" + EgressAdded Kind = "egress-added" // EgressDeleted the type of announcement emitted when we observe a deletion of egresses.policy.openservicemesh.io - EgressDeleted AnnouncementType = "egress-deleted" + EgressDeleted Kind = "egress-deleted" // EgressUpdated is the type of announcement emitted when we observe an update to egresses.policy.openservicemesh.io - EgressUpdated AnnouncementType = "egress-updated" + EgressUpdated Kind = "egress-updated" // IngressBackendAdded is the type of announcement emitted when we observe an addition of ingressbackends.policy.openservicemesh.io - IngressBackendAdded AnnouncementType = "ingressbackend-added" + IngressBackendAdded Kind = "ingressbackend-added" // IngressBackendDeleted the type of announcement emitted when we observe a deletion of ingressbackends.policy.openservicemesh.io - IngressBackendDeleted AnnouncementType = "ingressbackend-deleted" + IngressBackendDeleted Kind = "ingressbackend-deleted" // IngressBackendUpdated is the type of announcement emitted when we observe an update to ingressbackends.policy.openservicemesh.io - IngressBackendUpdated AnnouncementType = "ingressbackend-updated" + IngressBackendUpdated Kind = "ingressbackend-updated" // --- // MultiClusterServiceAdded is the type of announcement emitted when we observe an addition of a multiclusterservice.config.openservicemesh.io - MultiClusterServiceAdded AnnouncementType = "multiclusterservice-added" + MultiClusterServiceAdded Kind = "multiclusterservice-added" // MultiClusterServiceDeleted is the type of announcement emitted when we observe an deletion of a multiclusterservice.config.openservicemesh.io - MultiClusterServiceDeleted AnnouncementType = "multiclusterservice-deleted" + MultiClusterServiceDeleted Kind = "multiclusterservice-deleted" // MultiClusterServiceUpdated is the type of announcement emitted when we observe an update of a multiclusterservice.config.openservicemesh.io - MultiClusterServiceUpdated AnnouncementType = "multiclusterservice-updated" + MultiClusterServiceUpdated Kind = "multiclusterservice-updated" ) // Announcement is a struct for messages between various components of OSM signaling a need for a change in Envoy proxy configuration type Announcement struct { - Type AnnouncementType + Type Kind ReferencedObjectID interface{} } diff --git a/pkg/catalog/dispatcher.go b/pkg/catalog/dispatcher.go index 37635233b9..10172e4c18 100644 --- a/pkg/catalog/dispatcher.go +++ b/pkg/catalog/dispatcher.go @@ -21,7 +21,7 @@ const ( // isDeltaUpdate assesses and returns if a pubsub message contains an actual delta in config func isDeltaUpdate(psubMsg events.PubSubMessage) bool { - return !(strings.HasSuffix(psubMsg.AnnouncementType.String(), "updated") && + return !(strings.HasSuffix(psubMsg.Kind.String(), "updated") && reflect.DeepEqual(psubMsg.OldObj, psubMsg.NewObj)) } @@ -76,12 +76,12 @@ func (mc *MeshCatalog) dispatcher() { // Identify if this is an actual delta, or just resync delta := isDeltaUpdate(psubMessage) - log.Debug().Msgf("[Pubsub] %s - delta: %v", psubMessage.AnnouncementType, delta) + log.Debug().Msgf("[Pubsub] %s - delta: %v", psubMessage.Kind, delta) // Schedule an envoy broadcast update if we either: // - detected a config delta // - another module requested a broadcast through ScheduleProxyBroadcast - if delta || psubMessage.AnnouncementType == a.ScheduleProxyBroadcast { + if delta || psubMessage.Kind == a.ScheduleProxyBroadcast { if !broadcastScheduled { broadcastScheduled = true chanMaxDeadline = time.After(maxBroadcastDeadlineTime) @@ -100,7 +100,7 @@ func (mc *MeshCatalog) dispatcher() { case <-chanMovingDeadline: log.Info().Msgf("Moving deadline trigger - Broadcast envoy update") events.Publish(events.PubSubMessage{ - AnnouncementType: a.ProxyBroadcast, + Kind: a.ProxyBroadcast, }) metricsstore.DefaultMetricsStore.ProxyBroadcastEventCount.Inc() @@ -112,7 +112,7 @@ func (mc *MeshCatalog) dispatcher() { case <-chanMaxDeadline: log.Info().Msgf("Max deadline trigger - Broadcast envoy update") events.Publish(events.PubSubMessage{ - AnnouncementType: a.ProxyBroadcast, + Kind: a.ProxyBroadcast, }) metricsstore.DefaultMetricsStore.ProxyBroadcastEventCount.Inc() diff --git a/pkg/certificate/providers/certmanager/certificate_manager.go b/pkg/certificate/providers/certmanager/certificate_manager.go index 25fe631ede..e214621195 100644 --- a/pkg/certificate/providers/certmanager/certificate_manager.go +++ b/pkg/certificate/providers/certmanager/certificate_manager.go @@ -100,9 +100,9 @@ func (cm *CertManager) RotateCertificate(cn certificate.CommonName) (certificate cm.cacheLock.Unlock() events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.CertificateRotated, - NewObj: newCert, - OldObj: oldCert, + Kind: announcements.CertificateRotated, + NewObj: newCert, + OldObj: oldCert, }) log.Debug().Msgf("Rotated certificate (old SerialNumber=%s) with new SerialNumber=%s; took %+v", oldCert.GetSerialNumber(), newCert.GetSerialNumber(), time.Since(start)) diff --git a/pkg/certificate/providers/tresor/certificate_manager.go b/pkg/certificate/providers/tresor/certificate_manager.go index 7945b63889..501a467e29 100644 --- a/pkg/certificate/providers/tresor/certificate_manager.go +++ b/pkg/certificate/providers/tresor/certificate_manager.go @@ -188,9 +188,9 @@ func (cm *CertManager) RotateCertificate(cn certificate.CommonName) (certificate cm.cache.Store(cn, newCert) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.CertificateRotated, - NewObj: newCert, - OldObj: oldCert.(certificate.Certificater), + Kind: announcements.CertificateRotated, + NewObj: newCert, + OldObj: oldCert.(certificate.Certificater), }) log.Debug().Msgf("Rotated certificate (old SerialNumber=%s) with new SerialNumber=%s took %+v", oldCert.(certificate.Certificater).GetSerialNumber(), newCert.GetSerialNumber(), time.Since(start)) diff --git a/pkg/certificate/providers/vault/certificate_manager.go b/pkg/certificate/providers/vault/certificate_manager.go index ad669f909e..058c83eb01 100644 --- a/pkg/certificate/providers/vault/certificate_manager.go +++ b/pkg/certificate/providers/vault/certificate_manager.go @@ -193,9 +193,9 @@ func (cm *CertManager) RotateCertificate(cn certificate.CommonName) (certificate cm.cache.Store(cn, newCert) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.CertificateRotated, - NewObj: newCert, - OldObj: oldCert.(certificate.Certificater), + Kind: announcements.CertificateRotated, + NewObj: newCert, + OldObj: oldCert.(certificate.Certificater), }) log.Debug().Msgf("Rotated certificate (old SerialNumber=%s) with new SerialNumber=%s took %+v", oldCert.(certificate.Certificater).GetSerialNumber(), newCert.GetSerialNumber(), time.Since(start)) diff --git a/pkg/configurator/client.go b/pkg/configurator/client.go index 0f14c1c679..9a05208e21 100644 --- a/pkg/configurator/client.go +++ b/pkg/configurator/client.go @@ -80,7 +80,7 @@ func (c *client) runMeshConfigListener(stop <-chan struct{}) { continue } - switch psubMsg.AnnouncementType { + switch psubMsg.Kind { case announcements.MeshConfigAdded: meshConfigAddedMessageHandler(&psubMsg) case announcements.MeshConfigDeleted: @@ -110,22 +110,22 @@ func (c *client) run(stop <-chan struct{}) { func meshConfigAddedMessageHandler(psubMsg *events.PubSubMessage) { log.Debug().Msgf("[%s] OSM MeshConfig added event triggered a global proxy broadcast", - psubMsg.AnnouncementType) + psubMsg.Kind) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, - OldObj: nil, - NewObj: nil, + Kind: announcements.ScheduleProxyBroadcast, + OldObj: nil, + NewObj: nil, }) } func meshConfigDeletedMessageHandler(psubMsg *events.PubSubMessage) { // Ignore deletion. We expect config to be present log.Debug().Msgf("[%s] OSM MeshConfig deleted event triggered a global proxy broadcast", - psubMsg.AnnouncementType) + psubMsg.Kind) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, - OldObj: nil, - NewObj: nil, + Kind: announcements.ScheduleProxyBroadcast, + OldObj: nil, + NewObj: nil, }) } @@ -135,7 +135,7 @@ func meshConfigUpdatedMessageHandler(psubMsg *events.PubSubMessage) { newMeshConfig, okNewCast := psubMsg.NewObj.(*v1alpha1.MeshConfig) if !okPrevCast || !okNewCast { log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMeshConfigStructCasting)).Msgf("[%s] Error casting old/new MeshConfigs objects (%v %v)", - psubMsg.AnnouncementType, okPrevCast, okNewCast) + psubMsg.Kind, okPrevCast, okNewCast) return } @@ -171,15 +171,15 @@ func meshConfigUpdatedMessageHandler(psubMsg *events.PubSubMessage) { if triggerGlobalBroadcast { log.Debug().Msgf("[%s] OSM MeshConfig update triggered global proxy broadcast", - psubMsg.AnnouncementType) + psubMsg.Kind) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, - OldObj: nil, - NewObj: nil, + Kind: announcements.ScheduleProxyBroadcast, + OldObj: nil, + NewObj: nil, }) } else { log.Trace().Msgf("[%s] OSM MeshConfig update, NOT triggering global proxy broadcast", - psubMsg.AnnouncementType) + psubMsg.Kind) } } diff --git a/pkg/envoy/registry/announcement_handlers.go b/pkg/envoy/registry/announcement_handlers.go index e7ef7c7e07..e0b9580871 100644 --- a/pkg/envoy/registry/announcement_handlers.go +++ b/pkg/envoy/registry/announcement_handlers.go @@ -42,9 +42,9 @@ func (pr *ProxyRegistry) ReleaseCertificateHandler(certManager certificate.Manag // Request a broadcast update, just for security. // Dispatcher code also handles PodDelete, so probably the two will get coalesced. events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, - NewObj: nil, - OldObj: nil, + Kind: announcements.ScheduleProxyBroadcast, + NewObj: nil, + OldObj: nil, }) } else { log.Warn().Msgf("Pod with UID %s not found in Mesh Catalog", podUID) diff --git a/pkg/envoy/registry/announcement_handlers_test.go b/pkg/envoy/registry/announcement_handlers_test.go index 56d155b69e..b04682772a 100644 --- a/pkg/envoy/registry/announcement_handlers_test.go +++ b/pkg/envoy/registry/announcement_handlers_test.go @@ -79,8 +79,8 @@ var _ = Describe("Test Announcement Handlers", func() { // Publish a podDeleted event events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodDeleted, - NewObj: nil, + Kind: announcements.PodDeleted, + NewObj: nil, OldObj: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID(podUID), @@ -116,8 +116,8 @@ var _ = Describe("Test Announcement Handlers", func() { // Publish some event unrelated to podDeleted events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.IngressAdded, - NewObj: nil, + Kind: announcements.IngressAdded, + NewObj: nil, OldObj: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID(proxy.PodMetadata.UID), diff --git a/pkg/envoy/registry/services.go b/pkg/envoy/registry/services.go index 8c9ac1a66e..10bf683e84 100644 --- a/pkg/envoy/registry/services.go +++ b/pkg/envoy/registry/services.go @@ -113,7 +113,7 @@ func (k *AsyncKubeProxyServiceMapper) Run(stop <-chan struct{}) { continue } k.cacheLock.Lock() - switch event.AnnouncementType { + switch event.Kind { case announcements.PodAdded, announcements.PodUpdated: pod := event.NewObj.(*v1.Pod) k.handlePodUpdate(pod) @@ -129,7 +129,7 @@ func (k *AsyncKubeProxyServiceMapper) Run(stop <-chan struct{}) { } k.cacheLock.Unlock() events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, + Kind: announcements.ScheduleProxyBroadcast, }) } } diff --git a/pkg/envoy/registry/services_test.go b/pkg/envoy/registry/services_test.go index d74c7a3156..1b6328bced 100644 --- a/pkg/envoy/registry/services_test.go +++ b/pkg/envoy/registry/services_test.go @@ -306,16 +306,16 @@ func TestAsyncKubeProxyServiceMapperRun(t *testing.T) { assert.NotEmpty(svcs) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ServiceDeleted, - OldObj: svc, + Kind: announcements.ServiceDeleted, + OldObj: svc, }) <-broadcast assert.Empty(k.servicesForCN[cn]) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodDeleted, - OldObj: pod, + Kind: announcements.PodDeleted, + OldObj: pod, }) <-broadcast @@ -323,8 +323,8 @@ func TestAsyncKubeProxyServiceMapperRun(t *testing.T) { kubeController.EXPECT().ListServices().Return(nil).Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodAdded, - NewObj: pod, + Kind: announcements.PodAdded, + NewObj: pod, }) <-broadcast @@ -334,8 +334,8 @@ func TestAsyncKubeProxyServiceMapperRun(t *testing.T) { kubeController.EXPECT().ListPods().Return([]*v1.Pod{pod}).Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ServiceAdded, - NewObj: svc, + Kind: announcements.ServiceAdded, + NewObj: svc, }) <-broadcast @@ -1777,23 +1777,23 @@ func TestAsyncKubeProxyServiceMapperRace(t *testing.T) { for i := 0; i < numWriteLoopIters; i++ { kubeController.EXPECT().ListServices().Return(nil).Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodAdded, - NewObj: pod, + Kind: announcements.PodAdded, + NewObj: pod, }) kubeController.EXPECT().ListPods().Return([]*v1.Pod{pod}).Times(1) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ServiceAdded, - NewObj: svc, + Kind: announcements.ServiceAdded, + NewObj: svc, }) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.ServiceDeleted, - OldObj: svc, + Kind: announcements.ServiceDeleted, + OldObj: svc, }) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodDeleted, - OldObj: pod, + Kind: announcements.PodDeleted, + OldObj: pod, }) } } diff --git a/pkg/ingress/gateway_test.go b/pkg/ingress/gateway_test.go index ff85439fe2..eda8e039e1 100644 --- a/pkg/ingress/gateway_test.go +++ b/pkg/ingress/gateway_test.go @@ -332,9 +332,9 @@ func TestHandleCertificateChange(t *testing.T) { if tc.updatedMeshConfig != nil { events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.MeshConfigUpdated, - NewObj: tc.updatedMeshConfig, - OldObj: tc.previousMeshConfig, + Kind: announcements.MeshConfigUpdated, + NewObj: tc.updatedMeshConfig, + OldObj: tc.previousMeshConfig, }) } diff --git a/pkg/k8s/announcement_handlers_test.go b/pkg/k8s/announcement_handlers_test.go index a3e322c485..62ad37d307 100644 --- a/pkg/k8s/announcement_handlers_test.go +++ b/pkg/k8s/announcement_handlers_test.go @@ -79,9 +79,9 @@ var _ = Describe("Test Announcement Handlers", func() { // Publish a podAdded event events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.PodAdded, - NewObj: &pod, - OldObj: nil, + Kind: announcements.PodAdded, + NewObj: &pod, + OldObj: nil, }) expectedOwnerReference := metav1.OwnerReference{ diff --git a/pkg/k8s/event_handlers.go b/pkg/k8s/event_handlers.go index 767b1d8afb..bba819c757 100644 --- a/pkg/k8s/event_handlers.go +++ b/pkg/k8s/event_handlers.go @@ -16,9 +16,9 @@ type observeFilter func(obj interface{}) bool // EventTypes is a struct helping pass the correct types to GetKubernetesEventHandlers type EventTypes struct { - Add announcements.AnnouncementType - Update announcements.AnnouncementType - Delete announcements.AnnouncementType + Add announcements.Kind + Update announcements.Kind + Delete announcements.Kind } // GetKubernetesEventHandlers creates Kubernetes events handlers. @@ -35,9 +35,9 @@ func GetKubernetesEventHandlers(informerName, providerName string, shouldObserve ns := getNamespace(obj) metricsstore.DefaultMetricsStore.K8sAPIEventCounter.WithLabelValues(eventTypes.Add.String(), ns).Inc() events.Publish(events.PubSubMessage{ - AnnouncementType: eventTypes.Add, - NewObj: obj, - OldObj: nil, + Kind: eventTypes.Add, + NewObj: obj, + OldObj: nil, }) }, @@ -48,9 +48,9 @@ func GetKubernetesEventHandlers(informerName, providerName string, shouldObserve ns := getNamespace(newObj) metricsstore.DefaultMetricsStore.K8sAPIEventCounter.WithLabelValues(eventTypes.Update.String(), ns).Inc() events.Publish(events.PubSubMessage{ - AnnouncementType: eventTypes.Update, - NewObj: newObj, - OldObj: oldObj, + Kind: eventTypes.Update, + NewObj: newObj, + OldObj: oldObj, }) }, @@ -61,9 +61,9 @@ func GetKubernetesEventHandlers(informerName, providerName string, shouldObserve ns := getNamespace(obj) metricsstore.DefaultMetricsStore.K8sAPIEventCounter.WithLabelValues(eventTypes.Delete.String(), ns).Inc() events.Publish(events.PubSubMessage{ - AnnouncementType: eventTypes.Delete, - NewObj: nil, - OldObj: obj, + Kind: eventTypes.Delete, + NewObj: nil, + OldObj: obj, }) }, } diff --git a/pkg/k8s/event_handlers_test.go b/pkg/k8s/event_handlers_test.go index 8aec74ccec..fb099af8f1 100644 --- a/pkg/k8s/event_handlers_test.go +++ b/pkg/k8s/event_handlers_test.go @@ -47,7 +47,7 @@ func TestGetKubernetesEventHandlers(t *testing.T) { // Pubsub msg pubsubMsg, castOk := an.(events.PubSubMessage) a.True(castOk) - a.Equal(pubsubMsg.AnnouncementType, announcements.PodAdded) + a.Equal(pubsubMsg.Kind, announcements.PodAdded) a.Nil(pubsubMsg.OldObj) // Cast New obj, expect v1.Pod diff --git a/pkg/k8s/events/event_pubsub.go b/pkg/k8s/events/event_pubsub.go index d3bebbc08b..63e3792990 100644 --- a/pkg/k8s/events/event_pubsub.go +++ b/pkg/k8s/events/event_pubsub.go @@ -17,7 +17,7 @@ var ( ) // Subscribe is the Subscribe implementation for PubSub -func Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} { +func Subscribe(aTypes ...announcements.Kind) chan interface{} { var subTypes []string for _, v := range aTypes { subTypes = append(subTypes, string(v)) @@ -28,7 +28,7 @@ func Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} { // Publish is the Publish implementation for PubSub func Publish(message PubSubMessage) { - getPubSubInstance().Pub(message, message.AnnouncementType.String()) + getPubSubInstance().Pub(message, message.Kind.String()) } // Unsub is the Unsub implementation for PubSub. diff --git a/pkg/k8s/events/event_pubsub_test.go b/pkg/k8s/events/event_pubsub_test.go index 992addf00d..666b668aff 100644 --- a/pkg/k8s/events/event_pubsub_test.go +++ b/pkg/k8s/events/event_pubsub_test.go @@ -17,16 +17,16 @@ func TestPubSubEvents(t *testing.T) { defer mockCtrl.Finish() testCases := []struct { - register announcements.AnnouncementType + register announcements.Kind publish PubSubMessage expectMessage bool }{ { register: announcements.EndpointAdded, publish: PubSubMessage{ - AnnouncementType: announcements.EndpointAdded, - NewObj: nil, - OldObj: "randomString", + Kind: announcements.EndpointAdded, + NewObj: nil, + OldObj: "randomString", }, expectMessage: true, }, @@ -61,7 +61,7 @@ func TestPubSubClose(t *testing.T) { // publish something Publish(PubSubMessage{ - AnnouncementType: announcements.EndpointUpdated, + Kind: announcements.EndpointUpdated, }) // make sure channel is drained and closed diff --git a/pkg/k8s/events/types.go b/pkg/k8s/events/types.go index 3c3735add9..2697d18e48 100644 --- a/pkg/k8s/events/types.go +++ b/pkg/k8s/events/types.go @@ -30,21 +30,21 @@ const ( // PubSubMessage represents a common messages abstraction to pass through the PubSub interface type PubSubMessage struct { - AnnouncementType announcements.AnnouncementType - OldObj interface{} - NewObj interface{} + Kind announcements.Kind + OldObj interface{} + NewObj interface{} } // PubSub is a simple interface to call for pubsub functionality in front of a pubsub implementation type PubSub interface { // Subscribe returns a channel subscribed to the specific type/s of announcement/s passed by parameter - Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} + Subscribe(...announcements.Kind) chan interface{} - // Publish publishes the message to all subscribers that have subscribed to topic - Publish(message PubSubMessage) + // Publish publishes the message to all subscribers that have subscribed to topic + Publish(PubSubMessage) // Unsub unsubscribes and closes the channel on pubsub backend // Note this is a necessary step to ensure a channel can be // garbage collected when it is freed. - Unsub(unsubChan chan interface{}) + Unsub(chan interface{}) } diff --git a/pkg/ticker/ticker.go b/pkg/ticker/ticker.go index 1d679838fb..257e0d9312 100644 --- a/pkg/ticker/ticker.go +++ b/pkg/ticker/ticker.go @@ -70,8 +70,8 @@ func tickerConfigListener(cfg configurator.Configurator, ready chan struct{}, st // Initial config if currentDuration >= minimumTickerDuration { events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.TickerStart, - NewObj: currentDuration, + Kind: announcements.TickerStart, + NewObj: currentDuration, }) } close(ready) @@ -90,15 +90,15 @@ func tickerConfigListener(cfg configurator.Configurator, ready chan struct{}, st // Notify to re/start ticker log.Warn().Msgf("Interval %s >= %s, issuing start ticker.", newResyncInterval, minimumTickerDuration) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.TickerStart, - NewObj: newResyncInterval, + Kind: announcements.TickerStart, + NewObj: newResyncInterval, }) } else { // Notify to ticker to stop log.Warn().Msgf("Interval %s < %s, issuing ticker stop.", newResyncInterval, minimumTickerDuration) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.TickerStop, - NewObj: newResyncInterval, + Kind: announcements.TickerStop, + NewObj: newResyncInterval, }) } currentDuration = newResyncInterval @@ -145,7 +145,7 @@ func ticker(ready chan struct{}, stop <-chan struct{}) { log.Info().Msgf("Ticker requesting broadcast proxy update") events.Publish( events.PubSubMessage{ - AnnouncementType: announcements.ScheduleProxyBroadcast, + Kind: announcements.ScheduleProxyBroadcast, }, ) case <-stop: diff --git a/pkg/ticker/ticker_test.go b/pkg/ticker/ticker_test.go index 17fccb2f1b..1d1584a1b7 100644 --- a/pkg/ticker/ticker_test.go +++ b/pkg/ticker/ticker_test.go @@ -67,8 +67,8 @@ func TestTicker(t *testing.T) { // Start ticker, tick at 100ms rate events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.TickerStart, - NewObj: time.Duration(100 * time.Millisecond), + Kind: announcements.TickerStart, + NewObj: time.Duration(100 * time.Millisecond), }) // broadcast events should increase in the next few seconds @@ -78,7 +78,7 @@ func TestTicker(t *testing.T) { // Stop the ticker events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.TickerStop, + Kind: announcements.TickerStop, }) // Should stop increasing @@ -128,7 +128,7 @@ func TestTickerConfigurator(t *testing.T) { // Simulate a meshconfig change, expect the right calls if it is enabled mockConfigurator.EXPECT().GetConfigResyncInterval().Return(test.mockTickerDurationVal) events.Publish(events.PubSubMessage{ - AnnouncementType: announcements.MeshConfigUpdated, + Kind: announcements.MeshConfigUpdated, }) receivedStartEvent := 0