Skip to content

Commit ca262b1

Browse files
authored
Merge pull request #241 from Nordix/tapa
Fix TAPA blocked Close if VIPs are updated at the same time
2 parents 7416aa5 + fe6dcc7 commit ca262b1

File tree

1 file changed

+46
-9
lines changed

1 file changed

+46
-9
lines changed

pkg/ambassador/tap/conduit/configuration.go

+46-9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import (
2828
"github.com/sirupsen/logrus"
2929
)
3030

31+
const (
32+
channelBufferSize = 1
33+
)
34+
3135
type Configuration interface {
3236
Watch()
3337
Stop()
@@ -39,8 +43,9 @@ type configurationImpl struct {
3943
Conduit *nspAPI.Conduit
4044
ConfigurationManagerClient nspAPI.ConfigurationManagerClient
4145
cancel context.CancelFunc
42-
wg sync.WaitGroup
4346
mu sync.Mutex
47+
vipChan chan []string
48+
streamChan chan []*nspAPI.Stream
4449
}
4550

4651
func newConfigurationImpl(setVips func([]string) error,
@@ -61,6 +66,10 @@ func (c *configurationImpl) Watch() {
6166
defer c.mu.Unlock()
6267
var ctx context.Context
6368
ctx, c.cancel = context.WithCancel(context.TODO())
69+
c.vipChan = make(chan []string, channelBufferSize)
70+
c.streamChan = make(chan []*nspAPI.Stream, channelBufferSize)
71+
go c.vipHandler(ctx)
72+
go c.streamHandler(ctx)
6473
go c.watchVIPs(ctx)
6574
go c.watchStreams(ctx)
6675
}
@@ -71,12 +80,34 @@ func (c *configurationImpl) Stop() {
7180
if c.cancel != nil {
7281
c.cancel()
7382
}
74-
c.wg.Wait()
83+
}
84+
85+
func (c *configurationImpl) vipHandler(ctx context.Context) {
86+
for {
87+
select {
88+
case vips := <-c.vipChan:
89+
err := c.SetVips(vips)
90+
if err != nil {
91+
logrus.Warnf("err set vips: %v", err) // todo
92+
}
93+
case <-ctx.Done():
94+
return
95+
}
96+
}
97+
}
98+
99+
func (c *configurationImpl) streamHandler(ctx context.Context) {
100+
for {
101+
select {
102+
case streams := <-c.streamChan:
103+
c.SetStreams(streams)
104+
case <-ctx.Done():
105+
return
106+
}
107+
}
75108
}
76109

77110
func (c *configurationImpl) watchVIPs(ctx context.Context) {
78-
c.wg.Add(1)
79-
defer c.wg.Done()
80111
err := retry.Do(func() error {
81112
toWatch := &nspAPI.Flow{
82113
Stream: &nspAPI.Stream{
@@ -95,10 +126,12 @@ func (c *configurationImpl) watchVIPs(ctx context.Context) {
95126
if err != nil {
96127
return err
97128
}
98-
err = c.SetVips(flowResponseToVIPSlice(response))
99-
if err != nil {
100-
logrus.Warnf("err set vips: %v", err) // todo
129+
// flush previous context in channel
130+
select {
131+
case <-c.vipChan:
132+
default:
101133
}
134+
c.vipChan <- flowResponseToVIPSlice(response)
102135
}
103136
return nil
104137
}, retry.WithContext(ctx),
@@ -110,8 +143,6 @@ func (c *configurationImpl) watchVIPs(ctx context.Context) {
110143
}
111144

112145
func (c *configurationImpl) watchStreams(ctx context.Context) {
113-
c.wg.Add(1)
114-
defer c.wg.Done()
115146
err := retry.Do(func() error {
116147
vipsToWatch := &nspAPI.Stream{
117148
Conduit: c.Conduit,
@@ -129,6 +160,12 @@ func (c *configurationImpl) watchStreams(ctx context.Context) {
129160
return err
130161
}
131162
c.SetStreams(streamResponse.GetStreams())
163+
// flush previous context in channel
164+
select {
165+
case <-c.streamChan:
166+
default:
167+
}
168+
c.streamChan <- streamResponse.GetStreams()
132169
}
133170
return nil
134171
}, retry.WithContext(ctx),

0 commit comments

Comments
 (0)