Skip to content

Commit 2a2a44b

Browse files
committed
feature: use watch instead of client get
Signed-off-by: googs1025 <[email protected]>
1 parent 4b74359 commit 2a2a44b

File tree

2 files changed

+54
-19
lines changed

2 files changed

+54
-19
lines changed

pkg/controller/inference/service_controller.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"reflect"
23+
"sync"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
@@ -39,6 +40,7 @@ import (
3940
"sigs.k8s.io/controller-runtime/pkg/handler"
4041
"sigs.k8s.io/controller-runtime/pkg/log"
4142
"sigs.k8s.io/controller-runtime/pkg/predicate"
43+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4244
lws "sigs.k8s.io/lws/api/leaderworkerset/v1"
4345
applyconfigurationv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1"
4446

@@ -52,8 +54,10 @@ import (
5254
// ServiceReconciler reconciles a Service object
5355
type ServiceReconciler struct {
5456
client.Client
55-
Scheme *runtime.Scheme
56-
Record record.EventRecorder
57+
Scheme *runtime.Scheme
58+
Record record.EventRecorder
59+
GlobalConfigMutex sync.RWMutex
60+
GlobalConfig *helper.GlobalConfig
5761
}
5862

5963
func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *ServiceReconciler {
@@ -86,24 +90,21 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
8690

8791
logger.V(10).Info("reconcile Service", "Service", klog.KObj(service))
8892

89-
cm := &corev1.ConfigMap{}
90-
if err := r.Get(ctx, types.NamespacedName{Name: "llmaz-global-config", Namespace: "llmaz-system"}, cm); err != nil {
91-
if client.IgnoreNotFound(err) != nil {
92-
return ctrl.Result{}, fmt.Errorf("failed to get llmaz-global-config configmap: %w", err)
93-
}
94-
}
95-
configs, err := helper.ParseGlobalConfigmap(cm)
96-
if err != nil {
97-
return ctrl.Result{}, fmt.Errorf("failed to parse global configurations: %w", err)
93+
r.GlobalConfigMutex.RLock()
94+
config := r.GlobalConfig
95+
r.GlobalConfigMutex.RUnlock()
96+
97+
if config == nil {
98+
return ctrl.Result{}, fmt.Errorf("globel configs not init")
9899
}
99100

100101
// Set the global configurations to the service.
101-
if configs.SchedulerName != "" {
102+
if config.SchedulerName != "" {
102103
if service.Spec.WorkloadTemplate.LeaderTemplate != nil && service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName == "" {
103-
service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName = configs.SchedulerName
104+
service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName = config.SchedulerName
104105
}
105106
if service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName == "" {
106-
service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName = configs.SchedulerName
107+
service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName = config.SchedulerName
107108
}
108109

109110
if err := r.Client.Update(ctx, service); err != nil {
@@ -156,9 +157,36 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
156157
return !reflect.DeepEqual(oldBar.Status, newBar.Status)
157158
},
158159
})).
160+
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.updateGlobalConfig),
161+
builder.WithPredicates(predicate.Funcs{
162+
UpdateFunc: func(e event.UpdateEvent) bool {
163+
cm := e.ObjectOld.(*corev1.ConfigMap)
164+
return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace
165+
},
166+
CreateFunc: func(e event.CreateEvent) bool {
167+
cm := e.Object.(*corev1.ConfigMap)
168+
return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace
169+
},
170+
})).
159171
Complete(r)
160172
}
161173

174+
func (r *ServiceReconciler) updateGlobalConfig(ctx context.Context, obj client.Object) []reconcile.Request {
175+
logger := log.FromContext(ctx)
176+
cm := obj.(*corev1.ConfigMap)
177+
178+
newConfig, err := helper.ParseGlobalConfigmap(cm)
179+
if err != nil {
180+
logger.Error(err, "failed to parse global config")
181+
return nil
182+
}
183+
r.GlobalConfigMutex.Lock()
184+
defer r.GlobalConfigMutex.Unlock()
185+
r.GlobalConfig = newConfig
186+
logger.Info("global config updated", "config", newConfig)
187+
return nil
188+
}
189+
162190
func buildWorkloadApplyConfiguration(service *inferenceapi.Service, models []*coreapi.OpenModel) *applyconfigurationv1.LeaderWorkerSetApplyConfiguration {
163191
workload := applyconfigurationv1.LeaderWorkerSet(service.Name, service.Namespace)
164192

pkg/controller_helper/configmap.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,29 @@ import (
2323
corev1 "k8s.io/api/core/v1"
2424
)
2525

26-
type GlobalConfigs struct {
26+
const (
27+
GlobalConfigMapName = "llmaz-global-config"
28+
GlobalConfigMapNamespace = "llmaz-system"
29+
)
30+
31+
// GlobalConfig defines the global configuration parameters used across services.
32+
// These configurations are typically provided via a ConfigMap named "llmaz-global-config"
33+
type GlobalConfig struct {
2734
SchedulerName string `yaml:"scheduler-name"`
2835
InitContainerImage string `yaml:"init-container-image"`
2936
}
3037

31-
func ParseGlobalConfigmap(cm *corev1.ConfigMap) (*GlobalConfigs, error) {
38+
func ParseGlobalConfigmap(cm *corev1.ConfigMap) (*GlobalConfig, error) {
3239
rawConfig, ok := cm.Data["config.data"]
3340
if !ok {
3441
return nil, fmt.Errorf("config.data not found in ConfigMap")
3542
}
3643

37-
var configs GlobalConfigs
38-
err := yaml.Unmarshal([]byte(rawConfig), &configs)
44+
var config GlobalConfig
45+
err := yaml.Unmarshal([]byte(rawConfig), &config)
3946
if err != nil {
4047
return nil, fmt.Errorf("failed to unmarshal config.data: %v", err)
4148
}
4249

43-
return &configs, nil
50+
return &config, nil
4451
}

0 commit comments

Comments
 (0)