Skip to content

Commit 73af200

Browse files
authored
fix: races in filter protocol and tests (#260)
* Fix races in filter protocol and tests * only RLock needed
1 parent 8e295a2 commit 73af200

File tree

3 files changed

+33
-23
lines changed

3 files changed

+33
-23
lines changed

waku/v2/protocol/filter/filter_subscribers.go

+18
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@ func (sub *Subscribers) Append(s Subscriber) int {
3636
return len(sub.subscribers)
3737
}
3838

39+
func (subs *Subscribers) SubscriberHasContentTopic(sub Subscriber, topic string) bool {
40+
subs.RLock()
41+
defer subs.RUnlock()
42+
for _, filter := range sub.filter.ContentFilters {
43+
if filter.ContentTopic == topic {
44+
return true
45+
}
46+
}
47+
return false
48+
}
49+
3950
func (sub *Subscribers) Items() <-chan Subscriber {
4051
c := make(chan Subscriber)
4152

@@ -59,6 +70,13 @@ func (sub *Subscribers) Length() int {
5970
return len(sub.subscribers)
6071
}
6172

73+
func (sub *Subscribers) IsFailedPeer(peerID peer.ID) bool {
74+
sub.RLock()
75+
defer sub.RUnlock()
76+
_, ok := sub.failedPeers[peerID]
77+
return ok
78+
}
79+
6280
func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) {
6381
sub.Lock()
6482
defer sub.Unlock()

waku/v2/protocol/filter/waku_filter.go

+11-15
Original file line numberDiff line numberDiff line change
@@ -199,21 +199,17 @@ func (wf *WakuFilter) FilterListener() {
199199
continue
200200
}
201201

202-
for _, filter := range subscriber.filter.ContentFilters {
203-
if msg.ContentTopic == filter.ContentTopic {
204-
logger.Info("found matching content topic", zap.Stringer("filter", filter))
205-
// Do a message push to light node
206-
logger.Info("pushing message to light node")
207-
g.Go(func() (err error) {
208-
err = wf.pushMessage(subscriber, msg)
209-
if err != nil {
210-
logger.Error("pushing message", zap.Error(err))
211-
}
212-
return err
213-
})
214-
// Break if we have found a match
215-
break
216-
}
202+
if wf.subscribers.SubscriberHasContentTopic(subscriber, msg.ContentTopic) {
203+
logger.Info("found matching content topic", zap.String("contentTopic", msg.ContentTopic))
204+
// Do a message push to light node
205+
logger.Info("pushing message to light node")
206+
g.Go(func() (err error) {
207+
err = wf.pushMessage(subscriber, msg)
208+
if err != nil {
209+
logger.Error("pushing message", zap.Error(err))
210+
}
211+
return err
212+
})
217213
}
218214
}
219215

waku/v2/protocol/filter/waku_filter_test.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
175175
// Sleep to make sure the filter is subscribed
176176
time.Sleep(2 * time.Second)
177177

178-
_, ok := node2Filter.subscribers.failedPeers[host1.ID()]
179-
require.True(t, ok)
178+
require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
180179

181180
var wg sync.WaitGroup
182181

@@ -187,8 +186,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
187186
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
188187

189188
// Failure is removed
190-
_, ok := node2Filter.subscribers.failedPeers[host1.ID()]
191-
require.False(t, ok)
189+
require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
192190

193191
}()
194192

@@ -207,17 +205,15 @@ func TestWakuFilterPeerFailure(t *testing.T) {
207205

208206
// TODO: find out how to eliminate this sleep
209207
time.Sleep(1 * time.Second)
210-
_, ok = node2Filter.subscribers.failedPeers[host1.ID()]
211-
require.True(t, ok)
208+
require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
212209

213210
time.Sleep(3 * time.Second)
214211

215212
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
216213
require.NoError(t, err)
217214

218215
time.Sleep(1 * time.Second)
219-
_, ok = node2Filter.subscribers.failedPeers[host1.ID()]
220-
require.False(t, ok) // Failed peer has been removed
216+
require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed
221217

222218
for subscriber := range node2Filter.subscribers.Items() {
223219
if subscriber.peer == node1.h.ID() {

0 commit comments

Comments
 (0)