Skip to content
This repository was archived by the owner on Jul 22, 2024. It is now read-only.

Commit 52cfc85

Browse files
committed
Code refactor
1 parent cb009e5 commit 52cfc85

File tree

3 files changed

+319
-239
lines changed

3 files changed

+319
-239
lines changed

pkg/per-node-resources/grpc_server.go

+35-158
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,18 @@ import (
2424
"time"
2525

2626
"google.golang.org/grpc"
27-
corev1 "k8s.io/api/core/v1"
28-
"k8s.io/client-go/informers"
2927
"k8s.io/client-go/kubernetes"
30-
"k8s.io/client-go/tools/cache"
3128
ctrl "sigs.k8s.io/controller-runtime"
3229

3330
resourcemonitors "github.com/liqotech/liqo/pkg/liqo-controller-manager/resource-request-controller/resource-monitors"
34-
"github.com/liqotech/liqo/pkg/utils"
35-
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
3631
)
3732

3833
type server struct {
3934
Server *grpc.Server
4035
resourcemonitors.ResourceReaderServer
41-
subscribers sync.Map
42-
allocatable corev1.ResourceList
43-
nodesHandler *NodesHandler
44-
createPodInformer func(nodeName string)
36+
subscribers sync.Map
37+
nodesHandler *NodesHandler
38+
notifyChan chan bool
4539
}
4640

4741
// ListenAndServeGRPCServer creates the gRPC server and makes it listen on the given port.
@@ -56,48 +50,20 @@ func ListenAndServeGRPCServer(port int, clientset *kubernetes.Clientset,
5650
}
5751

5852
s := server{
59-
Server: grpc.NewServer(),
60-
allocatable: corev1.ResourceList{},
61-
nodesHandler: NewNodesHandler(),
53+
Server: grpc.NewServer(),
54+
notifyChan: make(chan bool, 1),
6255
}
56+
nh := NewNodesHandler(ctx, clientset, resyncPeriod, &s)
57+
s.nodesHandler = nh
6358

64-
nodeFactory := informers.NewSharedInformerFactoryWithOptions(
65-
clientset, resyncPeriod, informers.WithTweakListOptions(noVirtualNodesFilter),
66-
)
67-
nodeInformer := nodeFactory.Core().V1().Nodes().Informer()
68-
69-
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
70-
AddFunc: s.onNodeAdd,
71-
UpdateFunc: s.onNodeUpdate,
72-
DeleteFunc: s.onNodeDelete,
73-
})
74-
75-
nodeFactory.Start(ctx.Done())
76-
nodeFactory.WaitForCacheSync(ctx.Done())
77-
78-
s.createPodInformer = func(nodeName string) {
79-
informerCtx, cancel := context.WithCancel(ctx)
80-
podFactory := informers.NewSharedInformerFactoryWithOptions(
81-
clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter),
82-
informers.WithTweakListOptions(filterByNodeName(nodeName)),
83-
)
84-
podInformer := podFactory.Core().V1().Pods().Informer()
85-
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
86-
AddFunc: s.onPodAdd,
87-
// We do not care about update events, since resources are immutable.
88-
DeleteFunc: s.onPodDelete,
89-
})
90-
//TODO: is the cancel function still required?
91-
s.nodesHandler.setCancelFunctionFor(nodeName, cancel)
92-
podFactory.Start(informerCtx.Done())
93-
// wait to synch the cache before write the resource and notify
94-
podFactory.WaitForCacheSync(informerCtx.Done())
95-
s.nodesHandler.setPodInformerReadyFor(nodeName)
96-
}
59+
// This is a custom implementation, it waits for pods informers cache to be ready
60+
nh.WaitForCacheSync()
9761

9862
// register this server using the register interface defined in liqo
9963
resourcemonitors.RegisterResourceReaderServer(s.Server, &s)
10064
log.Printf("info: external resource monitor listening at %v", lis.Addr())
65+
go s.notifier()
66+
log.Printf("info: notifier started")
10167
if err := s.Server.Serve(lis); err != nil {
10268
return fmt.Errorf("grpc server failed to serve: %w", err)
10369
}
@@ -134,133 +100,44 @@ func (s *server) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.Res
134100
}
135101
}
136102

137-
// NotifyChange uses the cached streams to notify liqo that some resources changed. This method receives a clusterID inside req
138-
// which can be a real clusterID or resourcemonitors.AllClusterIDs which tells to liqo to refresh all the resources
139-
// of all the peered clusters.
140-
func (s *server) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error {
141-
log.Printf("info: sending notification to liqo controller manager for cluster %q", req.ClusterID)
142-
var err error
143-
144-
s.subscribers.Range(func(key, value interface{}) bool {
145-
stream := value.(resourcemonitors.ResourceReader_SubscribeServer)
146-
147-
err = stream.Send(req)
148-
if err != nil {
149-
err = fmt.Errorf("error: error during sending a notification %w", err)
150-
}
151-
return true
152-
})
153-
if err != nil {
154-
fmt.Printf("%s", err)
155-
return err
156-
}
157-
158-
log.Printf("info: notification sent to liqo controller manager for cluster %q", req.ClusterID)
159-
return err
160-
}
161-
162-
func (s *server) NotifyAll() error {
163-
err := s.NotifyChange(context.Background(), &resourcemonitors.ClusterIdentity{ClusterID: resourcemonitors.AllClusterIDs})
164-
if err != nil {
165-
return err
166-
}
167-
return nil
168-
}
169-
170103
// RemoveCluster is useful to clean cluster's information if it exists when a cluster is upeered. This method receives
171104
// a clusterID which identifies the cluster that has been removed. We believe that this method is useful in custom
172105
// implementation, for example where a database is involved in the implementation.
173106
func (s *server) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) {
174107
log.Printf("info: removing cluster having clusterID %s", req.ClusterID)
175108

176-
s.nodesHandler.deleteClusterByClusterID(req.ClusterID)
109+
s.nodesHandler.deletePodsByClusterID(req.ClusterID)
177110
return &resourcemonitors.Empty{}, nil
178111
}
179112

180-
// react to a Node Creation/First informer run.
181-
func (s *server) onNodeAdd(obj interface{}) {
182-
node := obj.(*corev1.Node)
183-
go s.createPodInformer(node.Name)
184-
if utils.IsNodeReady(node) {
185-
log.Printf("Adding Node %s", node.Name)
186-
s.nodesHandler.insertNewReadyNode(node.Name, node.Status.Allocatable)
187-
err := s.NotifyAll()
113+
// Notify uses the cached streams to notify liqo that some resources changed.
114+
// It notifies Liqo every second if an event that requires a resources update occurred.
115+
func (s *server) notifier() {
116+
log.Printf("info: sending notification to liqo controller manager for all clusters")
117+
var err error
118+
for {
119+
<-s.notifyChan
120+
s.subscribers.Range(func(key, value interface{}) bool {
121+
stream := value.(resourcemonitors.ResourceReader_SubscribeServer)
122+
123+
err = stream.Send(&resourcemonitors.ClusterIdentity{ClusterID: resourcemonitors.AllClusterIDs})
124+
if err != nil {
125+
err = fmt.Errorf("error: error during sending a notification %w", err)
126+
}
127+
return true
128+
})
188129
if err != nil {
189-
log.Printf("error: error during notifying a change")
190-
}
191-
}
192-
}
193-
194-
// react to a Node Update.
195-
func (s *server) onNodeUpdate(oldObj, newObj interface{}) {
196-
oldNode := oldObj.(*corev1.Node)
197-
newNode := newObj.(*corev1.Node)
198-
newNodeResources := newNode.Status.Allocatable
199-
log.Printf("Updating Node %s", oldNode.Name)
200-
if utils.IsNodeReady(newNode) {
201-
// node was already Ready, update with possible new resources.
202-
if utils.IsNodeReady(oldNode) {
203-
s.nodesHandler.setAllocatableForNode(oldNode.Name, newNodeResources)
204-
} else {
205-
s.nodesHandler.insertNewReadyNode(newNode.Name, newNodeResources)
206-
go s.createPodInformer(newNode.Name)
130+
fmt.Printf("%s", err)
207131
}
208-
// node is terminating or stopping, delete all its resources.
209-
} else if utils.IsNodeReady(oldNode) && !utils.IsNodeReady(newNode) {
210-
s.nodesHandler.turnNodeOff(oldNode.Name)
211-
}
212-
err := s.NotifyAll()
213-
if err != nil {
214-
log.Printf("error: error during notifying a change")
215-
}
216-
}
217-
218-
// react to a Node Delete.
219-
func (s *server) onNodeDelete(obj interface{}) {
220-
node := obj.(*corev1.Node)
221-
if utils.IsNodeReady(node) {
222-
log.Printf("info: Deleting Node %s", node.Name)
223-
s.nodesHandler.turnNodeOff(node.Name)
224-
}
225-
err := s.NotifyAll()
226-
if err != nil {
227-
log.Printf("error: error during notifying a change")
228-
}
229-
}
230132

231-
func (s *server) onPodAdd(obj interface{}) {
232-
// Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running.
233-
podAdded := obj.(*corev1.Pod)
234-
log.Printf("info: OnPodAdd: Add for pod %s:%s", podAdded.Namespace, podAdded.Name)
235-
236-
podResources := extractPodResources(podAdded)
237-
s.nodesHandler.decreaseAllocatableForNode(podAdded.Spec.NodeName, podResources)
238-
239-
if clusterID := podAdded.Labels[forge.LiqoOriginClusterIDKey]; clusterID != "" {
240-
log.Printf("info: OnPodAdd: Pod %s:%s passed ClusterID check. ClusterID = %s", podAdded.Namespace, podAdded.Name, clusterID)
241-
s.nodesHandler.addPodToNode(podAdded.Spec.NodeName, clusterID, podResources)
242-
}
243-
err := s.NotifyAll()
244-
if err != nil {
245-
log.Printf("error: error during notifying a change")
133+
log.Printf("info: notification sent to liqo controller manager for all clusters")
134+
time.Sleep(time.Second)
246135
}
247136
}
248137

249-
func (s *server) onPodDelete(obj interface{}) {
250-
// Thanks to the filters at the informer level, delete events are received only when
251-
// pods previously running on a physical node are no longer running.
252-
podDeleted := obj.(*corev1.Pod)
253-
log.Printf("info: OnPodDelete: Delete for pod %s:%s", podDeleted.Namespace, podDeleted.Name)
254-
255-
podResources := extractPodResources(podDeleted)
256-
s.nodesHandler.increaseAllocatableForNode(podDeleted.Spec.NodeName, podResources)
257-
258-
if clusterID := podDeleted.Labels[forge.LiqoOriginClusterIDKey]; clusterID != "" {
259-
log.Printf("info: OnPodDelete: Pod %s:%s passed ClusterID check. ClusterID = %s", podDeleted.Namespace, podDeleted.Name, clusterID)
260-
s.nodesHandler.deletePodFromNode(podDeleted.Spec.NodeName, clusterID, podResources)
261-
}
262-
err := s.NotifyAll()
263-
if err != nil {
264-
log.Printf("error: error during notifying a change")
138+
// Notify send a message in the notifyChannel to let the notifier know that it must send a notification.
139+
func (s *server) Notify() {
140+
if len(s.notifyChan) == 0 {
141+
s.notifyChan <- false
265142
}
266143
}

0 commit comments

Comments
 (0)