Skip to content

Commit 494a1bb

Browse files
committed
add a SyncChangeSource for BGP peer watcher, fire it from status_file_watcher
1 parent 0f7bf32 commit 494a1bb

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

confd/pkg/backends/calico/client.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func NewCalicoClient(confdConfig *config.Config) (*client, error) {
192192
c.localBGPPeerWatcher.Start()
193193

194194
// Create a conditional that we use to wake up all of the watcher threads when there
195-
// may some actionable updates.
195+
// may be some actionable updates.
196196
c.watcherCond = sync.NewCond(&c.cacheLock)
197197

198198
// Increment the waitForSync wait group. This blocks the GetValues call until the
@@ -292,9 +292,11 @@ func NewCalicoClient(confdConfig *config.Config) (*client, error) {
292292
return c, nil
293293
}
294294

295+
// Strings for keying in-sync messages by their upstream source.
295296
var (
296-
SourceSyncer string = "SourceSyncer"
297-
SourceRouteGenerator string = "SourceRouteGenerator"
297+
SourceSyncer string = "SourceSyncer"
298+
SourceRouteGenerator string = "SourceRouteGenerator"
299+
SourceLocalBGPPeerWatcher string = "LocalBGPPeerWatcher"
298300
)
299301

300302
// client implements the StoreClient interface for confd, and also implements the
@@ -425,7 +427,7 @@ func (c *client) ExcludeServiceAdvertisement() bool {
425427
return false
426428
}
427429

428-
// OnInSync handles multiplexing in-sync messages from multiple data sources
430+
// OnSyncChange handles multiplexing in-sync messages from multiple data sources
429431
// into a single representation of readiness.
430432
func (c *client) OnSyncChange(source string, ready bool) {
431433
c.cacheLock.Lock()
@@ -439,13 +441,13 @@ func (c *client) OnSyncChange(source string, ready bool) {
439441
log.Infof("Source %v readiness changed, ready=%v", source, ready)
440442

441443
// Check if we are fully in sync, before applying this change.
442-
oldFullSync := c.sourceReady[SourceSyncer] && c.sourceReady[SourceRouteGenerator]
444+
oldFullSync := c.sourceReady[SourceSyncer] && c.sourceReady[SourceRouteGenerator] && c.sourceReady[SourceLocalBGPPeerWatcher]
443445

444446
// Apply the change.
445447
c.sourceReady[source] = ready
446448

447449
// Check if we are fully in sync now.
448-
newFullSync := c.sourceReady[SourceSyncer] && c.sourceReady[SourceRouteGenerator]
450+
newFullSync := c.sourceReady[SourceSyncer] && c.sourceReady[SourceRouteGenerator] && c.sourceReady[SourceLocalBGPPeerWatcher]
449451

450452
if newFullSync == oldFullSync {
451453
log.Debugf("No change to full sync status (%v)", newFullSync)

confd/pkg/backends/calico/local_bgp_peer_watcher.go

+7
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func NewLocalBGPPeerWatcher(client *client, prefix string, pollIntervalSeconds i
5353
OnFileCreation: w.OnFileCreation,
5454
OnFileUpdate: w.OnFileUpdate,
5555
OnFileDeletion: w.OnFileDeletion,
56+
OnInSync: w.OnInSync,
5657
})
5758

5859
return w, nil
@@ -99,6 +100,12 @@ func (w *localBGPPeerWatcher) OnFileDeletion(fileName string) {
99100
w.client.recheckPeerConfig()
100101
}
101102

103+
func (w *localBGPPeerWatcher) OnInSync(inSync bool) {
104+
log.WithField("newValue", inSync).Debug("Received new inSync msg from upstream")
105+
106+
w.client.OnSyncChange(SourceLocalBGPPeerWatcher, inSync)
107+
}
108+
102109
func (w *localBGPPeerWatcher) updateEpStatus(fileName string, epStatus *epstatus.WorkloadEndpointStatus) {
103110
w.mutex.Lock()
104111
defer w.mutex.Unlock()

libcalico-go/lib/epstatusfile/status_file_watcher.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Callbacks struct {
3232
OnFileCreation func(fileName string)
3333
OnFileUpdate func(fileName string)
3434
OnFileDeletion func(fileName string)
35+
OnInSync func(inSync bool)
3536
}
3637

3738
type FileWatcher struct {
@@ -101,11 +102,6 @@ func (w *FileWatcher) newFsnotifyWatcher() error {
101102
}
102103

103104
func (w *FileWatcher) runFsnotifyWatcher(watcher *fsnotify.Watcher) error {
104-
w.fsnotifyActive = true
105-
106-
// Get current state of the directory and emit initial events.
107-
w.scanDirectory()
108-
109105
// Listen for events and loop until error occurs.
110106
for {
111107
select {
@@ -138,6 +134,7 @@ func (w *FileWatcher) runFsnotifyWatcher(watcher *fsnotify.Watcher) error {
138134
}
139135
}
140136
}
137+
w.callbacks.OnInSync(true)
141138

142139
case err, ok := <-watcher.Errors:
143140
if !ok {
@@ -168,6 +165,9 @@ func (w *FileWatcher) runWatcher() {
168165
}
169166

170167
if w.fsWatcher != nil {
168+
// Get current state of the directory and emit initial events.
169+
w.fsnotifyActive = true
170+
w.scanDirectory()
171171
// Run fsnotify watcher loop if possible.
172172
err := w.runFsnotifyWatcher(w.fsWatcher)
173173
w.fsnotifyActive = false
@@ -239,6 +239,7 @@ func (w *FileWatcher) scanDirectory() {
239239

240240
// Update lastState for next iteration.
241241
w.lastState = currentState
242+
w.callbacks.OnInSync(true)
242243
}
243244

244245
// Stop stops the watcher.

0 commit comments

Comments
 (0)