Skip to content

Commit 399c860

Browse files
authored
Memberlist now forwards only changes, not full original received message (#4419)
* Memberlist now forwards only changes, not full original received message. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md entry Signed-off-by: Peter Štibraný <[email protected]> * Ignore linter here. Signed-off-by: Peter Štibraný <[email protected]>
1 parent ea0633e commit 399c860

File tree

3 files changed

+97
-14
lines changed

3 files changed

+97
-14
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Users only have control of the HTTP header when Cortex is not frontend by an auth proxy validating the tenant IDs
2121
* [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394
2222
* [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
23+
* [CHANGE] Memberlist: forward only changes, not entire original message. #4419
2324
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
2425
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
2526
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342

pkg/ring/kv/memberlist/memberlist_client.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -980,20 +980,8 @@ func (m *KV) NotifyMsg(msg []byte) {
980980
} else if version > 0 {
981981
m.notifyWatchers(kvPair.Key)
982982

983-
m.addSentMessage(message{
984-
Time: time.Now(),
985-
Size: len(msg),
986-
Pair: kvPair,
987-
Version: version,
988-
Changes: changes,
989-
})
990-
991-
// Forward this message
992-
// Memberlist will modify message once this function returns, so we need to make a copy
993-
msgCopy := append([]byte(nil), msg...)
994-
995-
// forward this message further
996-
m.queueBroadcast(kvPair.Key, mod.MergeContent(), version, msgCopy)
983+
// Don't resend original message, but only changes.
984+
m.broadcastNewValue(kvPair.Key, mod, version, codec)
997985
}
998986
}
999987

pkg/ring/kv/memberlist/memberlist_client_test.go

+94
Original file line numberDiff line numberDiff line change
@@ -1074,3 +1074,97 @@ func TestMessageBuffer(t *testing.T) {
10741074
assert.Len(t, buf, 2)
10751075
assert.Equal(t, size, 75)
10761076
}
1077+
1078+
func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
1079+
codec := dataCodec{}
1080+
1081+
cfg := KVConfig{}
1082+
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
1083+
cfg.RetransmitMult = 1
1084+
cfg.Codecs = append(cfg.Codecs, codec)
1085+
1086+
kv := NewKV(cfg, log.NewNopLogger())
1087+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
1088+
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck
1089+
1090+
client, err := NewClient(kv, codec)
1091+
require.NoError(t, err)
1092+
1093+
// No broadcast messages from KV at the beginning.
1094+
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))
1095+
1096+
now := time.Now()
1097+
1098+
require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) {
1099+
d := getOrCreateData(in)
1100+
d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING}
1101+
d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING}
1102+
return d, true, nil
1103+
}))
1104+
1105+
// Check that new instance is broadcasted about just once.
1106+
assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32)))
1107+
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))
1108+
1109+
kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{
1110+
Members: map[string]member{
1111+
"a": {Timestamp: now.Unix() - 5, State: ACTIVE},
1112+
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
1113+
"c": {Timestamp: now.Unix(), State: ACTIVE},
1114+
}}))
1115+
1116+
// Check two things here:
1117+
// 1) state of value in KV store
1118+
// 2) broadcast message only has changed members
1119+
1120+
d := getData(t, client, key)
1121+
assert.Equal(t, &data{
1122+
Members: map[string]member{
1123+
"a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old
1124+
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
1125+
"c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}},
1126+
}}, d)
1127+
1128+
bs := kv.GetBroadcasts(0, math.MaxInt32)
1129+
require.Equal(t, 1, len(bs))
1130+
1131+
d = decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec)
1132+
assert.Equal(t, &data{
1133+
Members: map[string]member{
1134+
// "a" is not here, because it wasn't changed by the message.
1135+
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
1136+
"c": {Timestamp: now.Unix(), State: ACTIVE},
1137+
}}, d)
1138+
}
1139+
1140+
func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
1141+
kvp := KeyValuePair{}
1142+
require.NoError(t, kvp.Unmarshal(marshalledKVP))
1143+
require.Equal(t, key, kvp.Key)
1144+
1145+
val, err := codec.Decode(kvp.Value)
1146+
require.NoError(t, err)
1147+
d, ok := val.(*data)
1148+
require.True(t, ok)
1149+
return d
1150+
}
1151+
1152+
func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value interface{}) []byte {
1153+
data, err := codec.Encode(value)
1154+
require.NoError(t, err)
1155+
1156+
kvp := KeyValuePair{Key: key, Codec: codec.CodecID(), Value: data}
1157+
data, err = kvp.Marshal()
1158+
require.NoError(t, err)
1159+
return data
1160+
}
1161+
1162+
func getOrCreateData(in interface{}) *data {
1163+
// Modify value that was passed as a parameter.
1164+
// Client takes care of concurrent modifications.
1165+
r, ok := in.(*data)
1166+
if !ok || r == nil {
1167+
return &data{Members: map[string]member{}}
1168+
}
1169+
return r
1170+
}

0 commit comments

Comments
 (0)