Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

pkg/*: fix data races in PubSub and ticker test #4155

Merged
merged 1 commit into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/k8s/events/event_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"sync"

"github.com/cskr/pubsub"

"github.com/openservicemesh/osm/pkg/announcements"
Expand All @@ -14,6 +16,9 @@ const (
var (
// Globally accessible instance, through singleton pattern using getPubSubInstance()
pubSubInstance *pubsub.PubSub

// pubSubOnce is used to ensure PubSub object creation happens just once
pubSubOnce sync.Once
)

// Subscribe is the Subscribe implementation for PubSub
Expand Down Expand Up @@ -59,8 +64,8 @@ func Unsub(unsubChan chan interface{}) {
// Note that spawning the instance is not thread-safe. First call should happen on
// a single-routine context to avoid races.
func getPubSubInstance() *pubsub.PubSub {
if pubSubInstance == nil {
pubSubOnce.Do(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

pubSubInstance = pubsub.New(defaultAnnouncementChannelSize)
}
})
return pubSubInstance
}
8 changes: 8 additions & 0 deletions pkg/ticker/ticker_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ticker

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -44,14 +45,17 @@ func TestTicker(t *testing.T) {
broadcastEvents := events.Subscribe(announcements.ScheduleProxyBroadcast)
defer events.Unsub(broadcastEvents)

var counterMutex sync.Mutex
broadcastsReceived := 0
stop := make(chan struct{})
defer close(stop)
go func() {
for {
select {
case <-broadcastEvents:
counterMutex.Lock()
broadcastsReceived++
counterMutex.Unlock()
case <-stop:
return
}
Expand All @@ -73,6 +77,8 @@ func TestTicker(t *testing.T) {

// broadcast events should increase in the next few seconds
assert.Eventually(func() bool {
counterMutex.Lock()
defer counterMutex.Unlock()
return broadcastsReceived > 0
}, 3*time.Second, 500*time.Millisecond)

Expand All @@ -83,6 +89,8 @@ func TestTicker(t *testing.T) {

// Should stop increasing
assert.Eventually(func() bool {
counterMutex.Lock()
defer counterMutex.Unlock()
firstRead := broadcastsReceived
time.Sleep(1 * time.Second)
secondRead := broadcastsReceived
Expand Down