Skip to content

Commit 9b27d5c

Browse files
committed
making services multi thread safe
1 parent dfa084c commit 9b27d5c

File tree

5 files changed

+62
-13
lines changed

5 files changed

+62
-13
lines changed

netmaster/daemon.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,5 +441,5 @@ func (d *daemon) services(id string) ([]core.State, error) {
441441
return []core.State{core.State(svc)}, nil
442442
}
443443

444-
return nil, core.Errorf("Unexpected code path")
444+
return nil, err
445445
}

netmaster/master/api.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
363363
provider.Labels[k] = v
364364
epCfg.Labels[k] = v
365365
}
366+
366367
//maintain the containerId in endpointstat for recovery
367368
epCfg.ContainerID = svcProvUpdReq.ContainerID
368369

@@ -379,7 +380,9 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
379380
}
380381

381382
//update provider db
383+
mastercfg.SvcMutex.Lock()
382384
mastercfg.ProviderDb[providerDbID] = provider
385+
383386
for serviceID, service := range mastercfg.ServiceLBDb {
384387
count := 0
385388
if service.Tenant == svcProvUpdReq.Tenant {
@@ -395,19 +398,18 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
395398
append(mastercfg.ProviderDb[providerDbID].Services, serviceID)
396399
//Update ServiceDB
397400
mastercfg.ServiceLBDb[serviceID].Providers[providerID] = provider
398-
stateDriver, err := utils.GetStateDriver()
399-
if err != nil {
400-
return nil, err
401-
}
401+
402402
serviceLbState := &mastercfg.CfgServiceLBState{}
403403
serviceLbState.StateDriver = stateDriver
404404
err = serviceLbState.Read(serviceID)
405405
if err != nil {
406+
mastercfg.SvcMutex.Unlock()
406407
return nil, err
407408
}
408409
serviceLbState.Providers[providerID] = provider
409410
serviceLbState.Write()
410411
SvcProviderUpdate(serviceID, false)
412+
break
411413
}
412414
}
413415
}
@@ -416,13 +418,16 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
416418
} else if svcProvUpdReq.Event == "die" {
417419
//Received a container die event. If it was a service provider -
418420
//clear the provider db and the service db and change the etcd state
421+
419422
providerDbID := svcProvUpdReq.ContainerID
420423
if providerDbID == "" {
421424
return nil, fmt.Errorf("Invalid containerID in SvcProvUpdateRequest:(nil)")
422425
}
423426

427+
mastercfg.SvcMutex.Lock()
424428
provider := mastercfg.ProviderDb[providerDbID]
425429
if provider == nil {
430+
mastercfg.SvcMutex.Unlock()
426431
// It is not a provider . Ignore event
427432
return nil, nil
428433
}
@@ -431,6 +436,7 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
431436
service := mastercfg.ServiceLBDb[serviceID]
432437
providerID := getProviderID(provider)
433438
if providerID == "" {
439+
mastercfg.SvcMutex.Unlock()
434440
return nil, fmt.Errorf("Invalid ProviderID from providerInfo:{%v}", provider)
435441
}
436442
if service.Providers[providerID] != nil {
@@ -440,16 +446,19 @@ func ServiceProviderUpdateHandler(w http.ResponseWriter, r *http.Request, vars m
440446
serviceLbState.StateDriver = stateDriver
441447
err = serviceLbState.Read(serviceID)
442448
if err != nil {
449+
mastercfg.SvcMutex.Unlock()
443450
return nil, err
444451
}
445452
delete(serviceLbState.Providers, providerID)
446453
serviceLbState.Write()
447454
delete(mastercfg.ProviderDb, providerDbID)
448455
SvcProviderUpdate(serviceID, false)
456+
449457
}
450458
}
451459

452460
}
461+
mastercfg.SvcMutex.Unlock()
453462
srvUpdResp := &SvcProvUpdateResponse{
454463
IPAddress: svcProvUpdReq.IPAddress,
455464
}

netmaster/master/servicelb.go

+31-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@ func CreateServiceLB(stateDriver core.StateDriver, serviceLbCfg *intent.ConfigSe
3030

3131
var providersPresent bool
3232
serviceIP := serviceLbCfg.IPAddress
33+
3334
log.Infof("Recevied Create Service Load Balancer config {%v}", serviceLbCfg)
35+
36+
//Check if service already exists.
3437
svcID := getServiceID(serviceLbCfg.ServiceName, serviceLbCfg.Tenant)
38+
39+
mastercfg.SvcMutex.RLock()
3540
oldServiceInfo := mastercfg.ServiceLBDb[svcID]
41+
mastercfg.SvcMutex.RUnlock()
42+
3643
if oldServiceInfo != nil {
3744
//ServiceInfo Exists
3845
if reflect.DeepEqual(oldServiceInfo.Ports, serviceLbCfg.Ports) &&
@@ -43,6 +50,8 @@ func CreateServiceLB(stateDriver core.StateDriver, serviceLbCfg *intent.ConfigSe
4350
serviceIP = oldServiceInfo.IPAddress
4451
DeleteServiceLB(stateDriver, oldServiceInfo.ServiceName, oldServiceInfo.Tenant)
4552
}
53+
54+
//New Service
4655
serviceLbState := &mastercfg.CfgServiceLBState{}
4756
serviceLbState.ServiceName = serviceLbCfg.ServiceName
4857
serviceLbState.Tenant = serviceLbCfg.Tenant
@@ -73,10 +82,11 @@ func CreateServiceLB(stateDriver core.StateDriver, serviceLbCfg *intent.ConfigSe
7382
return err
7483
}
7584
serviceLbState.IPAddress = addr
76-
85+
mastercfg.SvcMutex.Lock()
7786
err = serviceLbState.Write()
7887

7988
if err != nil {
89+
mastercfg.SvcMutex.Unlock()
8090
return err
8191
}
8292

@@ -112,10 +122,14 @@ func CreateServiceLB(stateDriver core.StateDriver, serviceLbCfg *intent.ConfigSe
112122
}
113123
}
114124

125+
//Write into cluster store
115126
err = serviceLbState.Write()
127+
116128
if err != nil {
129+
mastercfg.SvcMutex.Unlock()
117130
return err
118131
}
132+
mastercfg.SvcMutex.Unlock()
119133

120134
if providersPresent {
121135
err = SvcProviderUpdate(serviceID, false)
@@ -130,16 +144,21 @@ func CreateServiceLB(stateDriver core.StateDriver, serviceLbCfg *intent.ConfigSe
130144

131145
//DeleteServiceLB deletes from etcd state
132146
func DeleteServiceLB(stateDriver core.StateDriver, serviceName string, tenantName string) error {
133-
log.Infof("Receiver Delete Service Load Balancer %s on %s", serviceName, tenantName)
147+
148+
log.Infof("Received Delete Service Load Balancer %s on %s", serviceName, tenantName)
134149
serviceLBState := &mastercfg.CfgServiceLBState{}
135150
serviceLBState.StateDriver = stateDriver
136151
serviceLBState.ID = getServiceID(serviceName, tenantName)
137152

153+
mastercfg.SvcMutex.RLock()
138154
err := serviceLBState.Read(serviceLBState.ID)
139155
if err != nil {
156+
mastercfg.SvcMutex.RUnlock()
140157
log.Errorf("Error reading service lb config for service %s in tenant %s", serviceName, tenantName)
141158
return err
142159
}
160+
mastercfg.SvcMutex.RUnlock()
161+
143162
// find the network from network id
144163
nwCfg := &mastercfg.CfgNetworkState{}
145164
nwCfg.StateDriver = stateDriver
@@ -156,6 +175,7 @@ func DeleteServiceLB(stateDriver core.StateDriver, serviceName string, tenantNam
156175

157176
serviceID := getServiceID(serviceLBState.ServiceName, serviceLBState.Tenant)
158177

178+
mastercfg.SvcMutex.Lock()
159179
//Remove the service ID from the provider cache
160180
for _, providerInfo := range mastercfg.ServiceLBDb[serviceID].Providers {
161181
containerID := providerInfo.ContainerID
@@ -174,9 +194,12 @@ func DeleteServiceLB(stateDriver core.StateDriver, serviceName string, tenantNam
174194

175195
err = serviceLBState.Clear()
176196
if err != nil {
197+
mastercfg.SvcMutex.Unlock()
198+
177199
log.Errorf("Error deleing service lb config for service %s in tenant %s", serviceName, tenantName)
178200
return err
179201
}
202+
mastercfg.SvcMutex.Unlock()
180203

181204
return nil
182205

@@ -199,7 +222,9 @@ func RestoreServiceProviderLBDb() {
199222
}
200223
svcLBState.StateDriver = stateDriver
201224
svcLBCfgs, err := svcLBState.ReadAll()
225+
202226
if err == nil {
227+
mastercfg.SvcMutex.Lock()
203228
for _, svcLBCfg := range svcLBCfgs {
204229
svcLB := svcLBCfg.(*mastercfg.CfgServiceLBState)
205230
//mastercfg.ServiceLBDb = make(map[string]*mastercfg.ServiceLBInfo)
@@ -225,9 +250,11 @@ func RestoreServiceProviderLBDb() {
225250
mastercfg.ProviderDb[providerDBId] = providerInfo
226251
}
227252
}
253+
mastercfg.SvcMutex.Unlock()
228254
} else {
229255
log.Errorf("Error reading service load balancer state from cluster store")
230256
}
257+
231258
//Recover from endpoint state as well .
232259
epCfgState := mastercfg.CfgEndpointState{}
233260
epCfgState.StateDriver = stateDriver
@@ -248,7 +275,9 @@ func RestoreServiceProviderLBDb() {
248275
for k, v := range ep.Labels {
249276
providerInfo.Labels[k] = v
250277
}
278+
mastercfg.SvcMutex.Lock()
251279
mastercfg.ProviderDb[providerDBId] = providerInfo
280+
mastercfg.SvcMutex.Unlock()
252281
}
253282
}
254283
} else {

netmaster/mastercfg/providerState.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,21 @@ type Provider struct {
4545
Network string
4646
Services []string
4747
Container string //container endpoint id
48+
4849
}
4950

5051
// Write the state
5152
func (s *SvcProvider) Write() error {
5253
key := fmt.Sprintf(svcProviderPath, s.ID)
53-
return s.StateDriver.WriteState(key, s, json.Marshal)
54+
err := s.StateDriver.WriteState(key, s, json.Marshal)
55+
return err
5456
}
5557

5658
// Read the state in for a given ID.
5759
func (s *SvcProvider) Read(id string) error {
5860
key := fmt.Sprintf(svcProviderPath, id)
59-
return s.StateDriver.ReadState(key, s, json.Unmarshal)
61+
err := s.StateDriver.ReadState(key, s, json.Unmarshal)
62+
return err
6063
}
6164

6265
// ReadAll reads all the state for master bgp configurations and returns it.
@@ -67,7 +70,8 @@ func (s *SvcProvider) ReadAll() ([]core.State, error) {
6770
// Clear removes the configuration from the state store.
6871
func (s *SvcProvider) Clear() error {
6972
key := fmt.Sprintf(svcProviderPath, s.ID)
70-
return s.StateDriver.ClearState(key)
73+
err := s.StateDriver.ClearState(key)
74+
return err
7175
}
7276

7377
// WatchAll state transitions and send them through the channel.

netmaster/mastercfg/servicelbState.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/json"
2020
"fmt"
2121
"github.com/contiv/netplugin/core"
22+
"sync"
2223
)
2324

2425
const (
@@ -40,6 +41,9 @@ type ServiceLBInfo struct {
4041
//ServiceLBDb is map of all services
4142
var ServiceLBDb = make(map[string]*ServiceLBInfo) //DB for all services keyed by servicename.tenant
4243

44+
//SvcDbMutex is mutex for service transaction
45+
var SvcMutex sync.RWMutex
46+
4347
// CfgServiceLBState is the service object configuration
4448
type CfgServiceLBState struct {
4549
core.CommonState
@@ -55,13 +59,15 @@ type CfgServiceLBState struct {
5559
// Write the state
5660
func (s *CfgServiceLBState) Write() error {
5761
key := fmt.Sprintf(serviceLBConfigPath, s.ID)
58-
return s.StateDriver.WriteState(key, s, json.Marshal)
62+
err := s.StateDriver.WriteState(key, s, json.Marshal)
63+
return err
5964
}
6065

6166
// Read the state in for a given ID.
6267
func (s *CfgServiceLBState) Read(id string) error {
6368
key := fmt.Sprintf(serviceLBConfigPath, id)
64-
return s.StateDriver.ReadState(key, s, json.Unmarshal)
69+
err := s.StateDriver.ReadState(key, s, json.Unmarshal)
70+
return err
6571
}
6672

6773
// ReadAll reads all the state for master bgp configurations and returns it.
@@ -72,7 +78,8 @@ func (s *CfgServiceLBState) ReadAll() ([]core.State, error) {
7278
// Clear removes the configuration from the state store.
7379
func (s *CfgServiceLBState) Clear() error {
7480
key := fmt.Sprintf(serviceLBConfigPath, s.ID)
75-
return s.StateDriver.ClearState(key)
81+
err := s.StateDriver.ClearState(key)
82+
return err
7683
}
7784

7885
// WatchAll state transitions and send them through the channel.

0 commit comments

Comments
 (0)