Skip to content

Commit 2d94011

Browse files
authored
Complete ADS snapshot cache implementation (openservicemesh#5017)
Signed-off-by: Sean Teeling <[email protected]>
1 parent 5270634 commit 2d94011

File tree

6 files changed

+48
-50
lines changed

6 files changed

+48
-50
lines changed

pkg/envoy/ads/cache_callbacks.go

+44-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
88

99
"github.com/openservicemesh/osm/pkg/envoy"
10+
"github.com/openservicemesh/osm/pkg/messaging"
1011
"github.com/openservicemesh/osm/pkg/metricsstore"
1112
"github.com/openservicemesh/osm/pkg/utils"
1213
)
@@ -44,18 +45,59 @@ func (s *Server) OnStreamOpen(ctx context.Context, streamID int64, typ string) e
4445
}
4546

4647
s.proxyRegistry.RegisterProxy(proxy)
48+
go func() {
49+
// Register for proxy config updates broadcasted by the message broker
50+
proxyUpdatePubSub := s.msgBroker.GetProxyUpdatePubSub()
51+
proxyUpdateChan := proxyUpdatePubSub.Sub(messaging.ProxyUpdateTopic, messaging.GetPubSubTopicForProxyUUID(proxy.UUID.String()))
52+
defer s.msgBroker.Unsub(proxyUpdatePubSub, proxyUpdateChan)
53+
54+
certRotations, unsubRotations := s.certManager.SubscribeRotations(proxy.Identity.String())
55+
defer unsubRotations()
56+
57+
for {
58+
select {
59+
case <-proxyUpdateChan:
60+
log.Debug().Str("proxy", proxy.String()).Msg("Broadcast update received")
61+
s.update(proxy)
62+
case <-certRotations:
63+
log.Debug().Str("proxy", proxy.String()).Msg("Certificate has been updated for proxy")
64+
s.update(proxy)
65+
case <-ctx.Done():
66+
return
67+
}
68+
}
69+
}()
4770
return nil
4871
}
4972

73+
func (s *Server) update(proxy *envoy.Proxy) {
74+
ch := s.workqueues.AddJob(&proxyResponseJob{
75+
proxy: proxy,
76+
xdsServer: s,
77+
typeURIs: envoy.XDSResponseOrder,
78+
done: make(chan struct{}),
79+
})
80+
<-ch
81+
close(ch)
82+
}
83+
5084
// OnStreamClosed is called on stream closed
5185
func (s *Server) OnStreamClosed(streamID int64) {
5286
log.Debug().Msgf("OnStreamClosed id: %d", streamID)
5387
s.proxyRegistry.UnregisterProxy(streamID)
88+
89+
metricsstore.DefaultMetricsStore.ProxyConnectCount.Dec()
5490
}
5591

56-
// OnStreamRequest is called when a request happens on an open string
57-
func (s *Server) OnStreamRequest(a int64, req *discovery.DiscoveryRequest) error {
92+
// OnStreamRequest is called when a request happens on an open connection
93+
func (s *Server) OnStreamRequest(streamID int64, req *discovery.DiscoveryRequest) error {
5894
log.Debug().Msgf("OnStreamRequest node: %s, type: %s, v: %s, nonce: %s, resNames: %s", req.Node.Id, req.TypeUrl, req.VersionInfo, req.ResponseNonce, req.ResourceNames)
95+
96+
proxy := s.proxyRegistry.GetConnectedProxy(streamID)
97+
if proxy != nil {
98+
metricsstore.DefaultMetricsStore.ProxyXDSRequestCount.WithLabelValues(proxy.UUID.String(), proxy.Identity.String(), req.TypeUrl).Inc()
99+
}
100+
59101
return nil
60102
}
61103

pkg/envoy/ads/cache_stream.go

-38
Original file line numberDiff line numberDiff line change
@@ -8,46 +8,8 @@ import (
88
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
99

1010
"github.com/openservicemesh/osm/pkg/envoy"
11-
"github.com/openservicemesh/osm/pkg/messaging"
1211
)
1312

14-
// Routine which fulfills listening to proxy broadcasts
15-
func (s *Server) watchForUpdates(ctx context.Context) {
16-
// Register for proxy config updates broadcasted by the message broker
17-
proxyUpdatePubSub := s.msgBroker.GetProxyUpdatePubSub()
18-
proxyUpdateChan := proxyUpdatePubSub.Sub(messaging.ProxyUpdateTopic)
19-
defer s.msgBroker.Unsub(proxyUpdatePubSub, proxyUpdateChan)
20-
21-
for {
22-
select {
23-
case <-proxyUpdateChan:
24-
s.allPodUpdater()
25-
// TODO(2683): listen to specific pod updates and cert rotations
26-
case <-ctx.Done():
27-
return
28-
}
29-
}
30-
}
31-
32-
func (s *Server) allPodUpdater() {
33-
for _, proxy := range s.proxyRegistry.ListConnectedProxies() {
34-
s.update(proxy)
35-
}
36-
}
37-
38-
func (s *Server) update(proxy *envoy.Proxy) {
39-
// Queue update for this proxy/pod
40-
job := proxyResponseJob{
41-
proxy: proxy,
42-
adsStream: nil, // Since it goes in the cache, stream is not needed
43-
request: nil, // No request is used, as we fill all verticals
44-
xdsServer: s,
45-
typeURIs: envoy.XDSResponseOrder,
46-
done: make(chan struct{}),
47-
}
48-
s.workqueues.AddJob(&job)
49-
}
50-
5113
// RecordFullSnapshot stores a group of resources as a new Snapshot with a new version in the cache.
5214
// It also runs a consistency check on the snapshot (will warn if there are missing resources referenced in
5315
// the snapshot)

pkg/envoy/ads/jobs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type proxyResponseJob struct {
2222
}
2323

2424
// GetDoneCh returns the channel, which when closed, indicates the job has been finished.
25-
func (proxyJob *proxyResponseJob) GetDoneCh() <-chan struct{} {
25+
func (proxyJob *proxyResponseJob) GetDoneCh() chan struct{} {
2626
return proxyJob.done
2727
}
2828

pkg/envoy/ads/server.go

-6
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,6 @@ func (s *Server) Start(ctx context.Context, cancel context.CancelFunc, port int,
8787
return fmt.Errorf("error starting ADS server: %w", err)
8888
}
8989

90-
if s.cacheEnabled {
91-
// Start broadcast listener thread when cache is enabled and we are ready to start handling
92-
// proxy broadcast updates
93-
go s.watchForUpdates(ctx)
94-
}
95-
9690
s.ready = true
9791

9892
return nil

pkg/workerpool/workerpool.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type Job interface {
2929
Run()
3030

3131
// GetDoneCh returns the channel, which when closed, indicates that the job was finished.
32-
GetDoneCh() <-chan struct{}
32+
GetDoneCh() chan struct{}
3333
}
3434

3535
// NewWorkerPool creates a new work group.
@@ -60,7 +60,7 @@ func NewWorkerPool(nWorkers int) *WorkerPool {
6060

6161
// AddJob posts the job on a worker queue
6262
// Uses Hash underneath to choose worker to post the job to
63-
func (wp *WorkerPool) AddJob(job Job) <-chan struct{} {
63+
func (wp *WorkerPool) AddJob(job Job) chan struct{} {
6464
wp.jobs <- job
6565
return job.GetDoneCh()
6666
}

pkg/workerpool/workerpool_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type testJob struct {
2525
hash uint64
2626
}
2727

28-
func (tj *testJob) GetDoneCh() <-chan struct{} {
28+
func (tj *testJob) GetDoneCh() chan struct{} {
2929
return tj.jobDone
3030
}
3131

0 commit comments

Comments
 (0)