Skip to content

Commit fd8f448

Browse files
author
Thibault Gilles
committed
Add log in case softWatchLimit is exceeded during a blocking query
1 parent baf672a commit fd8f448

File tree

10 files changed

+69
-18
lines changed

10 files changed

+69
-18
lines changed

agent/config/builder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
809809
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
810810
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
811811
Watches: c.Watches,
812-
WatchSoftLimit: b.intValWithDefault(c.WatchSoftLimit, consul.DefaultSoftWatchLimit),
812+
WatchSoftLimit: b.intValWithDefault(c.Performance.WatchSoftLimit, consul.DefaultSoftWatchLimit),
813813
}
814814

815815
if rt.BootstrapExpect == 1 {

agent/config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,6 @@ type Config struct {
254254
VerifyOutgoing *bool `json:"verify_outgoing,omitempty" hcl:"verify_outgoing" mapstructure:"verify_outgoing"`
255255
VerifyServerHostname *bool `json:"verify_server_hostname,omitempty" hcl:"verify_server_hostname" mapstructure:"verify_server_hostname"`
256256
Watches []map[string]interface{} `json:"watches,omitempty" hcl:"watches" mapstructure:"watches"`
257-
WatchSoftLimit *int `json:"watch_soft_limit,omitempty" hcl:"watch_soft_limit" mapstructure:"watch_soft_limit"`
258257

259258
// This isn't used by Consul but we've documented a feature where users
260259
// can deploy their snapshot agent configs alongside their Consul configs
@@ -554,6 +553,7 @@ type Performance struct {
554553
LeaveDrainTime *string `json:"leave_drain_time,omitempty" hcl:"leave_drain_time" mapstructure:"leave_drain_time"`
555554
RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
556555
RPCHoldTimeout *string `json:"rpc_hold_timeout" hcl:"rpc_hold_timeout" mapstructure:"rpc_hold_timeout"`
556+
WatchSoftLimit *int `json:"watch_soft_limit,omitempty" hcl:"watch_soft_limit" mapstructure:"watch_soft_limit"`
557557
}
558558

559559
type Telemetry struct {

agent/config/runtime_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -3007,7 +3007,8 @@ func TestFullConfig(t *testing.T) {
30073007
"performance": {
30083008
"leave_drain_time": "8265s",
30093009
"raft_multiplier": 5,
3010-
"rpc_hold_timeout": "15707s"
3010+
"rpc_hold_timeout": "15707s",
3011+
"watch_soft_limit": ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
30113012
},
30123013
"pid_file": "43xN80Km",
30133014
"ports": {
@@ -3333,8 +3334,7 @@ func TestFullConfig(t *testing.T) {
33333334
"key": "sl3Dffu7",
33343335
"args": ["dltjDJ2a", "flEa7C2d"]
33353336
}
3336-
],
3337-
"watch_soft_limit": ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
3337+
]
33383338
}`,
33393339
"hcl": `
33403340
acl_agent_master_token = "furuQD0b"
@@ -3541,6 +3541,7 @@ func TestFullConfig(t *testing.T) {
35413541
leave_drain_time = "8265s"
35423542
raft_multiplier = 5
35433543
rpc_hold_timeout = "15707s"
3544+
watch_soft_limit = ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
35443545
}
35453546
pid_file = "43xN80Km"
35463547
ports {
@@ -3864,8 +3865,7 @@ func TestFullConfig(t *testing.T) {
38643865
datacenter = "fYrl3F5d"
38653866
key = "sl3Dffu7"
38663867
args = ["dltjDJ2a", "flEa7C2d"]
3867-
}],
3868-
watch_soft_limit = ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
3868+
}]
38693869
`}
38703870

38713871
tail := map[string][]Source{

agent/connect/ca/provider_consul_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) error {
4646
}
4747

4848
func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
49-
s, err := state.NewStateStore(nil, 1024)
49+
s, err := state.NewStateStore(nil, 1024, nil)
5050
if err != nil {
5151
t.Fatalf("err: %s", err)
5252
}

agent/consul/fsm/fsm.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,16 @@ type FSM struct {
6969

7070
// New is used to construct a new FSM with a blank state.
7171
func New(gc *state.TombstoneGC, watchLimit int, logOutput io.Writer) (*FSM, error) {
72-
stateNew, err := state.NewStateStore(gc, watchLimit)
72+
logger := log.New(logOutput, "", log.LstdFlags)
73+
74+
stateNew, err := state.NewStateStore(gc, watchLimit, logger)
7375
if err != nil {
7476
return nil, err
7577
}
7678

7779
fsm := &FSM{
7880
logOutput: logOutput,
79-
logger: log.New(logOutput, "", log.LstdFlags),
81+
logger: logger,
8082
apply: make(map[structs.MessageType]command),
8183
state: stateNew,
8284
gc: gc,
@@ -142,7 +144,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
142144
defer old.Close()
143145

144146
// Create a new state store.
145-
stateNew, err := state.NewStateStore(c.gc, c.watchLimit)
147+
stateNew, err := state.NewStateStore(c.gc, c.watchLimit, c.logger)
146148
if err != nil {
147149
return err
148150
}

agent/consul/state/catalog.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,11 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
837837
results[service] = append(results[service], tag)
838838
}
839839
}
840+
841+
if len(ws) >= s.watchLimit {
842+
s.warnSoftLimitReached("service by node meta")
843+
}
844+
840845
return idx, results, nil
841846
}
842847

@@ -905,6 +910,11 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
905910
if err != nil {
906911
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
907912
}
913+
914+
if len(ws) >= s.watchLimit {
915+
s.warnSoftLimitReached("service %s", serviceName)
916+
}
917+
908918
return idx, results, nil
909919
}
910920

@@ -1446,7 +1456,16 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
14461456
}
14471457
ws.Add(iter.WatchCh())
14481458

1449-
return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters)
1459+
idx, checks, err := s.parseChecksByNodeMeta(tx, ws, idx, iter, filters)
1460+
if err != nil {
1461+
return 0, nil, err
1462+
}
1463+
1464+
if len(ws) >= s.watchLimit {
1465+
s.warnSoftLimitReached("service %s", serviceName)
1466+
}
1467+
1468+
return idx, checks, nil
14501469
}
14511470

14521471
// ChecksInState is used to query the state store for all checks
@@ -1780,7 +1799,17 @@ func (s *Store) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDu
17801799
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
17811800
}
17821801
ws.Add(nodes.WatchCh())
1783-
return s.parseNodes(tx, ws, idx, nodes)
1802+
1803+
idx, nodeDump, err := s.parseNodes(tx, ws, idx, nodes)
1804+
if err != nil {
1805+
return 0, nil, err
1806+
}
1807+
1808+
if len(ws) >= s.watchLimit {
1809+
s.warnSoftLimitReached("node %s", node)
1810+
}
1811+
1812+
return idx, nodeDump, nil
17841813
}
17851814

17861815
// NodeDump is used to generate a dump of all nodes. This call is expensive
@@ -1863,3 +1892,12 @@ func (s *Store) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
18631892
}
18641893
return idx, results, nil
18651894
}
1895+
1896+
func (s *Store) warnSoftLimitReached(f string, a ...interface{}) {
1897+
if s.watchLimitWarnCounter%100000 > 0 {
1898+
return
1899+
}
1900+
1901+
s.logger.Printf("[WARN] consul: exceeded soft watch limit of %d for %s, falling back to coarse grained watch", s.watchLimit, fmt.Sprintf(f, a...))
1902+
s.watchLimitWarnCounter++
1903+
}

agent/consul/state/catalog_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,7 @@ func TestStateStore_GetNodes(t *testing.T) {
10151015
}
10161016

10171017
func BenchmarkGetNodes(b *testing.B) {
1018-
s, err := NewStateStore(nil, testWatchLimit)
1018+
s, err := NewStateStore(nil, testWatchLimit, nil)
10191019
if err != nil {
10201020
b.Fatalf("err: %s", err)
10211021
}
@@ -3051,7 +3051,7 @@ func TestStateStore_CheckConnectServiceNodes(t *testing.T) {
30513051
}
30523052

30533053
func BenchmarkCheckServiceNodes(b *testing.B) {
3054-
s, err := NewStateStore(nil, testWatchLimit)
3054+
s, err := NewStateStore(nil, testWatchLimit, nil)
30553055
if err != nil {
30563056
b.Fatalf("err: %s", err)
30573057
}

agent/consul/state/kvs_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestStateStore_GC(t *testing.T) {
2121

2222
// Enable it and attach it to the state store.
2323
gc.SetEnabled(true)
24-
s, err := NewStateStore(gc, testWatchLimit)
24+
s, err := NewStateStore(gc, testWatchLimit, nil)
2525
if err != nil {
2626
t.Fatalf("err: %s", err)
2727
}

agent/consul/state/state_store.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package state
33
import (
44
"errors"
55
"fmt"
6+
"log"
7+
"os"
68

79
"github.com/hashicorp/consul/types"
810
"github.com/hashicorp/go-memdb"
@@ -65,6 +67,10 @@ type Store struct {
6567
// Given the current size of aFew == 32 in memdb's watch_few.go,
6668
// this will allow for up to ~ watchLimit/32 goroutines per blocking query.
6769
watchLimit int
70+
71+
watchLimitWarnCounter uint
72+
73+
logger *log.Logger
6874
}
6975

7076
// Snapshot is used to provide a point-in-time snapshot. It
@@ -99,14 +105,18 @@ type sessionCheck struct {
99105
}
100106

101107
// NewStateStore creates a new in-memory state storage layer.
102-
func NewStateStore(gc *TombstoneGC, watchLimit int) (*Store, error) {
108+
func NewStateStore(gc *TombstoneGC, watchLimit int, logger *log.Logger) (*Store, error) {
103109
// Create the in-memory DB.
104110
schema := stateStoreSchema()
105111
db, err := memdb.NewMemDB(schema)
106112
if err != nil {
107113
return nil, fmt.Errorf("Failed setting up state store: %s", err)
108114
}
109115

116+
if logger == nil {
117+
logger = log.New(os.Stderr, "", log.LstdFlags)
118+
}
119+
110120
// Create and return the state store.
111121
s := &Store{
112122
schema: schema,
@@ -115,6 +125,7 @@ func NewStateStore(gc *TombstoneGC, watchLimit int) (*Store, error) {
115125
kvsGraveyard: NewGraveyard(gc),
116126
lockDelay: NewDelay(),
117127
watchLimit: watchLimit,
128+
logger: logger,
118129
}
119130
return s, nil
120131
}

agent/consul/state/state_store_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func testUUID() string {
3030
}
3131

3232
func testStateStore(t *testing.T) *Store {
33-
s, err := NewStateStore(nil, testWatchLimit)
33+
s, err := NewStateStore(nil, testWatchLimit, nil)
3434
if err != nil {
3535
t.Fatalf("err: %s", err)
3636
}

0 commit comments

Comments
 (0)