Skip to content

Commit 2d61c30

Browse files
committed
lrs: use JSON for locality's String representation (#4135)
1 parent f60ed8a commit 2d61c30

File tree

7 files changed

+105
-40
lines changed

7 files changed

+105
-40
lines changed

xds/internal/balancer/edsbalancer/eds_impl.go

+24-9
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,17 @@ func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMe
139139
continue
140140
}
141141
for lid, config := range bgwc.configs {
142+
lidJSON, err := lid.ToString()
143+
if err != nil {
144+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
145+
continue
146+
}
142147
// TODO: (eds) add support to balancer group to support smoothly
143148
// switching sub-balancers (keep old balancer around until new
144149
// balancer becomes ready).
145-
bgwc.bg.Remove(lid.String())
146-
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
147-
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
150+
bgwc.bg.Remove(lidJSON)
151+
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
152+
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
148153
ResolverState: resolver.State{Addresses: config.addrs},
149154
})
150155
// This doesn't need to manually update picker, because the new
@@ -282,6 +287,11 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
282287
// One balancer for each locality.
283288

284289
lid := locality.ID
290+
lidJSON, err := lid.ToString()
291+
if err != nil {
292+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
293+
continue
294+
}
285295
newLocalitiesSet[lid] = struct{}{}
286296

287297
newWeight := locality.Weight
@@ -316,8 +326,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
316326
config, ok := bgwc.configs[lid]
317327
if !ok {
318328
// A new balancer, add it to balancer group and balancer map.
319-
bgwc.stateAggregator.Add(lid.String(), newWeight)
320-
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
329+
bgwc.stateAggregator.Add(lidJSON, newWeight)
330+
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
321331
config = &localityConfig{
322332
weight: newWeight,
323333
}
@@ -340,23 +350,28 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
340350

341351
if weightChanged {
342352
config.weight = newWeight
343-
bgwc.stateAggregator.UpdateWeight(lid.String(), newWeight)
353+
bgwc.stateAggregator.UpdateWeight(lidJSON, newWeight)
344354
rebuildStateAndPicker = true
345355
}
346356

347357
if addrsChanged {
348358
config.addrs = newAddrs
349-
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
359+
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
350360
ResolverState: resolver.State{Addresses: newAddrs},
351361
})
352362
}
353363
}
354364

355365
// Delete localities that are removed in the latest response.
356366
for lid := range bgwc.configs {
367+
lidJSON, err := lid.ToString()
368+
if err != nil {
369+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
370+
continue
371+
}
357372
if _, ok := newLocalitiesSet[lid]; !ok {
358-
bgwc.stateAggregator.Remove(lid.String())
359-
bgwc.bg.Remove(lid.String())
373+
bgwc.stateAggregator.Remove(lidJSON)
374+
bgwc.bg.Remove(lidJSON)
360375
delete(bgwc.configs, lid)
361376
edsImpl.logger.Infof("Locality %v deleted", lid)
362377
rebuildStateAndPicker = true

xds/internal/balancer/edsbalancer/eds_impl_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -726,12 +726,14 @@ func (s) TestEDS_LoadReport(t *testing.T) {
726726
// We expect the 10 picks to be split between the localities since they are
727727
// of equal weight. And since we only mark the picks routed to sc2 as done,
728728
// the picks on sc1 should show up as inProgress.
729+
locality1JSON, _ := locality1.ToString()
730+
locality2JSON, _ := locality2.ToString()
729731
wantStoreData := []*load.Data{{
730732
Cluster: testClusterNames[0],
731733
Service: "",
732734
LocalityStats: map[string]load.LocalityData{
733-
locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
734-
locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
735+
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
736+
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
735737
},
736738
}}
737739
for i := 0; i < 10; i++ {

xds/internal/balancer/lrs/balancer.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"google.golang.org/grpc/balancer"
2929
"google.golang.org/grpc/internal/grpclog"
3030
"google.golang.org/grpc/serviceconfig"
31-
"google.golang.org/grpc/xds/internal"
3231
xdsinternal "google.golang.org/grpc/xds/internal"
3332
"google.golang.org/grpc/xds/internal/client/load"
3433
)
@@ -93,7 +92,12 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
9392
if b.lb != nil {
9493
b.lb.Close()
9594
}
96-
b.lb = bb.Build(newCCWrapper(b.cc, b.client.loadStore(), newConfig.Locality), b.buildOpts)
95+
lidJSON, err := newConfig.Locality.ToString()
96+
if err != nil {
97+
return fmt.Errorf("failed to marshal LocalityID: %#v", newConfig.Locality)
98+
}
99+
ccWrapper := newCCWrapper(b.cc, b.client.loadStore(), lidJSON)
100+
b.lb = bb.Build(ccWrapper, b.buildOpts)
97101
}
98102
b.config = newConfig
99103

@@ -126,20 +130,20 @@ func (b *lrsBalancer) Close() {
126130

127131
type ccWrapper struct {
128132
balancer.ClientConn
129-
loadStore load.PerClusterReporter
130-
localityID *internal.LocalityID
133+
loadStore load.PerClusterReporter
134+
localityIDJSON string
131135
}
132136

133-
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityID *internal.LocalityID) *ccWrapper {
137+
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityIDJSON string) *ccWrapper {
134138
return &ccWrapper{
135-
ClientConn: cc,
136-
loadStore: loadStore,
137-
localityID: localityID,
139+
ClientConn: cc,
140+
loadStore: loadStore,
141+
localityIDJSON: localityIDJSON,
138142
}
139143
}
140144

141145
func (ccw *ccWrapper) UpdateState(s balancer.State) {
142-
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
146+
s.Picker = newLoadReportPicker(s.Picker, ccw.localityIDJSON, ccw.loadStore)
143147
ccw.ClientConn.UpdateState(s)
144148
}
145149

xds/internal/balancer/lrs/balancer_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ func TestLoadReporting(t *testing.T) {
124124
if sd.Cluster != testClusterName || sd.Service != testServiceName {
125125
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
126126
}
127-
localityData, ok := sd.LocalityStats[testLocality.String()]
127+
testLocalityJSON, _ := testLocality.ToString()
128+
localityData, ok := sd.LocalityStats[testLocalityJSON]
128129
if !ok {
129130
t.Fatalf("loads for %v not found in store", testLocality)
130131
}

xds/internal/balancer/lrs/picker.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package lrs
2121
import (
2222
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
2323
"google.golang.org/grpc/balancer"
24-
"google.golang.org/grpc/xds/internal"
2524
)
2625

2726
const (
@@ -43,10 +42,10 @@ type loadReportPicker struct {
4342
loadStore loadReporter
4443
}
4544

46-
func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore loadReporter) *loadReportPicker {
45+
func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
4746
return &loadReportPicker{
4847
p: p,
49-
locality: id.String(),
48+
locality: id,
5049
loadStore: loadStore,
5150
}
5251
}

xds/internal/internal.go

+15-16
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package internal
2121

2222
import (
23+
"encoding/json"
2324
"fmt"
24-
"strings"
2525
)
2626

2727
type clientID string
@@ -41,23 +41,22 @@ type LocalityID struct {
4141
SubZone string `json:"subZone,omitempty"`
4242
}
4343

44-
// String generates a string representation of LocalityID by adding ":" between
45-
// the components of the LocalityID.
46-
func (l LocalityID) String() string {
47-
return fmt.Sprintf("%s:%s:%s", l.Region, l.Zone, l.SubZone)
44+
// ToString generates a string representation of LocalityID by marshalling it into
45+
// json. Not calling it String() so printf won't call it.
46+
func (l LocalityID) ToString() (string, error) {
47+
b, err := json.Marshal(l)
48+
if err != nil {
49+
return "", err
50+
}
51+
return string(b), nil
4852
}
4953

50-
// LocalityIDFromString converts a string representation of locality, of the
51-
// form region:zone:sub-zone (as generated by the above String() method), into a
54+
// LocalityIDFromString converts a json representation of locality, into a
5255
// LocalityID struct.
53-
func LocalityIDFromString(l string) (LocalityID, error) {
54-
parts := strings.Split(l, ":")
55-
if len(parts) != 3 {
56-
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID", l)
56+
func LocalityIDFromString(s string) (ret LocalityID, _ error) {
57+
err := json.Unmarshal([]byte(s), &ret)
58+
if err != nil {
59+
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID, error: %v", s, err)
5760
}
58-
return LocalityID{
59-
Region: parts[0],
60-
Zone: parts[1],
61-
SubZone: parts[2],
62-
}, nil
61+
return ret, nil
6362
}

xds/internal/internal_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,48 @@ func (s) TestLocalityMatchProtoMessage(t *testing.T) {
7070
t.Fatalf("internal type and proto message have different fields: (-got +want):\n%+v", diff)
7171
}
7272
}
73+
74+
func TestLocalityToAndFromJSON(t *testing.T) {
75+
tests := []struct {
76+
name string
77+
localityID LocalityID
78+
str string
79+
wantErr bool
80+
}{
81+
{
82+
name: "3 fields",
83+
localityID: LocalityID{Region: "r:r", Zone: "z#z", SubZone: "s^s"},
84+
str: `{"region":"r:r","zone":"z#z","subZone":"s^s"}`,
85+
},
86+
{
87+
name: "2 fields",
88+
localityID: LocalityID{Region: "r:r", Zone: "z#z"},
89+
str: `{"region":"r:r","zone":"z#z"}`,
90+
},
91+
{
92+
name: "1 field",
93+
localityID: LocalityID{Region: "r:r"},
94+
str: `{"region":"r:r"}`,
95+
},
96+
}
97+
for _, tt := range tests {
98+
t.Run(tt.name, func(t *testing.T) {
99+
gotStr, err := tt.localityID.ToString()
100+
if err != nil {
101+
t.Errorf("failed to marshal LocalityID: %#v", tt.localityID)
102+
}
103+
if gotStr != tt.str {
104+
t.Errorf("%#v.String() = %q, want %q", tt.localityID, gotStr, tt.str)
105+
}
106+
107+
gotID, err := LocalityIDFromString(tt.str)
108+
if (err != nil) != tt.wantErr {
109+
t.Errorf("LocalityIDFromString(%q) error = %v, wantErr %v", tt.str, err, tt.wantErr)
110+
return
111+
}
112+
if diff := cmp.Diff(gotID, tt.localityID); diff != "" {
113+
t.Errorf("LocalityIDFromString() got = %v, want %v, diff: %s", gotID, tt.localityID, diff)
114+
}
115+
})
116+
}
117+
}

0 commit comments

Comments
 (0)