Skip to content

Commit af06f08

Browse files
Cherry-picks for 2.11.5-RC.3 (#7002)
Includes the following: - #6990 - #6967 - #6995 - #6999 - #7001 Signed-off-by: Neil Twigg <[email protected]>
2 parents fc9c7e9 + f419a94 commit af06f08

20 files changed

+684
-108
lines changed

server/accounts.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,15 @@ var maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold
5050
// Account are subject namespace definitions. By default no messages are shared between accounts.
5151
// You can share via Exports and Imports of Streams and Services.
5252
type Account struct {
53-
stats
53+
// Total stats for the account.
54+
stats struct {
55+
sync.Mutex
56+
stats // Totals
57+
gw stats // Gateways
58+
rt stats // Routes
59+
ln stats // Leafnodes
60+
}
61+
5462
gwReplyMapping
5563
Name string
5664
Nkey string

server/client.go

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,14 +1446,25 @@ func (c *client) readLoop(pre []byte) {
14461446
// Updates stats for client and server that were collected
14471447
// from parsing through the buffer.
14481448
if c.in.msgs > 0 {
1449-
atomic.AddInt64(&c.inMsgs, int64(c.in.msgs))
1450-
atomic.AddInt64(&c.inBytes, int64(c.in.bytes))
1449+
inMsgs := int64(c.in.msgs)
1450+
inBytes := int64(c.in.bytes)
1451+
1452+
atomic.AddInt64(&c.inMsgs, inMsgs)
1453+
atomic.AddInt64(&c.inBytes, inBytes)
1454+
14511455
if acc != nil {
1452-
atomic.AddInt64(&acc.inMsgs, int64(c.in.msgs))
1453-
atomic.AddInt64(&acc.inBytes, int64(c.in.bytes))
1456+
acc.stats.Lock()
1457+
acc.stats.inMsgs += inMsgs
1458+
acc.stats.inBytes += inBytes
1459+
if c.kind == LEAF {
1460+
acc.stats.ln.inMsgs += int64(inMsgs)
1461+
acc.stats.ln.inBytes += int64(inBytes)
1462+
}
1463+
acc.stats.Unlock()
14541464
}
1455-
atomic.AddInt64(&s.inMsgs, int64(c.in.msgs))
1456-
atomic.AddInt64(&s.inBytes, int64(c.in.bytes))
1465+
1466+
atomic.AddInt64(&s.inMsgs, inMsgs)
1467+
atomic.AddInt64(&s.inBytes, inBytes)
14571468
}
14581469

14591470
// Signal to writeLoop to flush to socket.
@@ -1806,7 +1817,9 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
18061817
c.srv.scStats.leafs.Add(1)
18071818
}
18081819
if c.acc != nil {
1809-
atomic.AddInt64(&c.acc.slowConsumers, 1)
1820+
c.acc.stats.Lock()
1821+
c.acc.stats.slowConsumers++
1822+
c.acc.stats.Unlock()
18101823
}
18111824
c.Noticef("Slow Consumer %s: WriteDeadline of %v exceeded with %d chunks of %d total bytes.",
18121825
scState, c.out.wdl, numChunks, attempted)
@@ -2356,7 +2369,9 @@ func (c *client) queueOutbound(data []byte) {
23562369
atomic.AddInt64(&c.srv.slowConsumers, 1)
23572370
c.srv.scStats.clients.Add(1)
23582371
if c.acc != nil {
2359-
atomic.AddInt64(&c.acc.slowConsumers, 1)
2372+
c.acc.stats.Lock()
2373+
c.acc.stats.slowConsumers++
2374+
c.acc.stats.Unlock()
23602375
}
23612376
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
23622377
c.markConnAsClosed(SlowConsumerPendingBytes)
@@ -4735,6 +4750,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
47354750
// by having an extra size
47364751
var dlvMsgs int64
47374752
var dlvExtraSize int64
4753+
var dlvRouteMsgs int64
4754+
var dlvLeafMsgs int64
47384755

47394756
// We need to know if this is a MQTT producer because they send messages
47404757
// without CR_LF (we otherwise remove the size of CR_LF from message size).
@@ -4744,15 +4761,33 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
47444761
if dlvMsgs == 0 {
47454762
return
47464763
}
4764+
47474765
totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize
4766+
routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize
4767+
leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize
4768+
47484769
// For non MQTT producers, remove the CR_LF * number of messages
47494770
if !prodIsMQTT {
47504771
totalBytes -= dlvMsgs * int64(LEN_CR_LF)
4772+
routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF)
4773+
leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF)
47514774
}
4775+
47524776
if acc != nil {
4753-
atomic.AddInt64(&acc.outMsgs, dlvMsgs)
4754-
atomic.AddInt64(&acc.outBytes, totalBytes)
4777+
acc.stats.Lock()
4778+
acc.stats.outMsgs += dlvMsgs
4779+
acc.stats.outBytes += totalBytes
4780+
if dlvRouteMsgs > 0 {
4781+
acc.stats.rt.outMsgs += dlvRouteMsgs
4782+
acc.stats.rt.outBytes += routeBytes
4783+
}
4784+
if dlvLeafMsgs > 0 {
4785+
acc.stats.ln.outMsgs += dlvLeafMsgs
4786+
acc.stats.ln.outBytes += leafBytes
4787+
}
4788+
acc.stats.Unlock()
47554789
}
4790+
47564791
if srv := c.srv; srv != nil {
47574792
atomic.AddInt64(&srv.outMsgs, dlvMsgs)
47584793
atomic.AddInt64(&srv.outBytes, totalBytes)
@@ -5073,6 +5108,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
50735108
// Update only if not skipped.
50745109
if !skipDelivery && sub.icb == nil {
50755110
dlvMsgs++
5111+
switch sub.client.kind {
5112+
case ROUTER:
5113+
dlvRouteMsgs++
5114+
case LEAF:
5115+
dlvLeafMsgs++
5116+
}
50765117
}
50775118
// Do the rest even when message delivery was skipped.
50785119
didDeliver = true
@@ -5163,6 +5204,12 @@ sendToRoutesOrLeafs:
51635204
if c.deliverMsg(prodIsMQTT, rt.sub, acc, subject, reply, mh, dmsg, false) {
51645205
if rt.sub.icb == nil {
51655206
dlvMsgs++
5207+
switch dc.kind {
5208+
case ROUTER:
5209+
dlvRouteMsgs++
5210+
case LEAF:
5211+
dlvLeafMsgs++
5212+
}
51665213
dlvExtraSize += int64(len(dmsg) - len(msg))
51675214
}
51685215
didDeliver = true

server/consumer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3304,11 +3304,10 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
33043304
// Check if we are filtered, and if so check if this is even applicable to us.
33053305
if isFiltered {
33063306
if subj == _EMPTY_ {
3307-
var svp StoreMsg
3308-
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
3307+
var err error
3308+
if subj, err = o.mset.store.SubjectForSeq(sseq); err != nil {
33093309
return false
33103310
}
3311-
subj = svp.subj
33123311
}
33133312
if !o.isFilteredMatch(subj) {
33143313
return false

server/events.go

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -400,12 +400,19 @@ type GatewayStat struct {
400400
NumInbound int `json:"inbound_connections"`
401401
}
402402

403-
// DataStats reports how may msg and bytes. Applicable for both sent and received.
404-
type DataStats struct {
403+
type dataStats struct {
405404
Msgs int64 `json:"msgs"`
406405
Bytes int64 `json:"bytes"`
407406
}
408407

408+
// DataStats reports how may msg and bytes. Applicable for both sent and received.
409+
type DataStats struct {
410+
dataStats
411+
Gateways dataStats `json:"gateways,omitempty"`
412+
Routes dataStats `json:"routes,omitempty"`
413+
Leafs dataStats `json:"leafs,omitempty"`
414+
}
415+
409416
// Used for internally queueing up messages that the server wants to send.
410417
type pubMsg struct {
411418
c *client
@@ -842,12 +849,16 @@ func routeStat(r *client) *RouteStat {
842849
rs := &RouteStat{
843850
ID: r.cid,
844851
Sent: DataStats{
845-
Msgs: r.outMsgs,
846-
Bytes: r.outBytes,
852+
dataStats: dataStats{
853+
Msgs: r.outMsgs,
854+
Bytes: r.outBytes,
855+
},
847856
},
848857
Received: DataStats{
849-
Msgs: atomic.LoadInt64(&r.inMsgs),
850-
Bytes: atomic.LoadInt64(&r.inBytes),
858+
dataStats: dataStats{
859+
Msgs: atomic.LoadInt64(&r.inMsgs),
860+
Bytes: atomic.LoadInt64(&r.inBytes),
861+
},
851862
},
852863
Pending: int(r.out.pb),
853864
}
@@ -946,8 +957,10 @@ func (s *Server) sendStatsz(subj string) {
946957
// Note that *client.out[Msgs|Bytes] are not set using atomic,
947958
// unlike the in[Msgs|bytes].
948959
gs.Sent = DataStats{
949-
Msgs: c.outMsgs,
950-
Bytes: c.outBytes,
960+
dataStats: dataStats{
961+
Msgs: c.outMsgs,
962+
Bytes: c.outBytes,
963+
},
951964
}
952965
c.mu.Unlock()
953966
// Gather matching inbound connections
@@ -2402,22 +2415,57 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
24022415
func (a *Account) statz() *AccountStat {
24032416
localConns := a.numLocalConnections()
24042417
leafConns := a.numLocalLeafNodes()
2405-
return &AccountStat{
2406-
Account: a.Name,
2407-
Name: a.getNameTagLocked(),
2408-
Conns: localConns,
2409-
LeafNodes: leafConns,
2410-
TotalConns: localConns + leafConns,
2411-
NumSubs: a.sl.Count(),
2412-
Received: DataStats{
2413-
Msgs: atomic.LoadInt64(&a.inMsgs),
2414-
Bytes: atomic.LoadInt64(&a.inBytes),
2418+
2419+
a.stats.Lock()
2420+
received := DataStats{
2421+
dataStats: dataStats{
2422+
Msgs: a.stats.inMsgs,
2423+
Bytes: a.stats.inBytes,
24152424
},
2416-
Sent: DataStats{
2417-
Msgs: atomic.LoadInt64(&a.outMsgs),
2418-
Bytes: atomic.LoadInt64(&a.outBytes),
2425+
Gateways: dataStats{
2426+
Msgs: a.stats.gw.inMsgs,
2427+
Bytes: a.stats.gw.inBytes,
2428+
},
2429+
Routes: dataStats{
2430+
Msgs: a.stats.rt.inMsgs,
2431+
Bytes: a.stats.rt.inBytes,
2432+
},
2433+
Leafs: dataStats{
2434+
Msgs: a.stats.ln.inMsgs,
2435+
Bytes: a.stats.ln.inBytes,
24192436
},
2420-
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
2437+
}
2438+
sent := DataStats{
2439+
dataStats: dataStats{
2440+
Msgs: a.stats.outMsgs,
2441+
Bytes: a.stats.outBytes,
2442+
},
2443+
Gateways: dataStats{
2444+
Msgs: a.stats.gw.outMsgs,
2445+
Bytes: a.stats.gw.outBytes,
2446+
},
2447+
Routes: dataStats{
2448+
Msgs: a.stats.rt.outMsgs,
2449+
Bytes: a.stats.rt.outBytes,
2450+
},
2451+
Leafs: dataStats{
2452+
Msgs: a.stats.ln.outMsgs,
2453+
Bytes: a.stats.ln.outBytes,
2454+
},
2455+
}
2456+
slowConsumers := a.stats.slowConsumers
2457+
a.stats.Unlock()
2458+
2459+
return &AccountStat{
2460+
Account: a.Name,
2461+
Name: a.getNameTagLocked(),
2462+
Conns: localConns,
2463+
LeafNodes: leafConns,
2464+
TotalConns: localConns + leafConns,
2465+
NumSubs: a.sl.Count(),
2466+
Received: received,
2467+
Sent: sent,
2468+
SlowConsumers: slowConsumers,
24212469
}
24222470
}
24232471

@@ -2533,12 +2581,16 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
25332581
MQTTClient: c.getMQTTClientID(),
25342582
},
25352583
Sent: DataStats{
2536-
Msgs: atomic.LoadInt64(&c.inMsgs),
2537-
Bytes: atomic.LoadInt64(&c.inBytes),
2584+
dataStats: dataStats{
2585+
Msgs: atomic.LoadInt64(&c.inMsgs),
2586+
Bytes: atomic.LoadInt64(&c.inBytes),
2587+
},
25382588
},
25392589
Received: DataStats{
2540-
Msgs: c.outMsgs,
2541-
Bytes: c.outBytes,
2590+
dataStats: dataStats{
2591+
Msgs: c.outMsgs,
2592+
Bytes: c.outBytes,
2593+
},
25422594
},
25432595
Reason: reason,
25442596
}
@@ -2587,12 +2639,16 @@ func (s *Server) sendAuthErrorEvent(c *client) {
25872639
MQTTClient: c.getMQTTClientID(),
25882640
},
25892641
Sent: DataStats{
2590-
Msgs: c.inMsgs,
2591-
Bytes: c.inBytes,
2642+
dataStats: dataStats{
2643+
Msgs: c.inMsgs,
2644+
Bytes: c.inBytes,
2645+
},
25922646
},
25932647
Received: DataStats{
2594-
Msgs: c.outMsgs,
2595-
Bytes: c.outBytes,
2648+
dataStats: dataStats{
2649+
Msgs: c.outMsgs,
2650+
Bytes: c.outBytes,
2651+
},
25962652
},
25972653
Reason: AuthenticationViolation.String(),
25982654
}
@@ -2645,12 +2701,16 @@ func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason strin
26452701
MQTTClient: c.getMQTTClientID(),
26462702
},
26472703
Sent: DataStats{
2648-
Msgs: c.inMsgs,
2649-
Bytes: c.inBytes,
2704+
dataStats: dataStats{
2705+
Msgs: c.inMsgs,
2706+
Bytes: c.inBytes,
2707+
},
26502708
},
26512709
Received: DataStats{
2652-
Msgs: c.outMsgs,
2653-
Bytes: c.outBytes,
2710+
dataStats: dataStats{
2711+
Msgs: c.outMsgs,
2712+
Bytes: c.outBytes,
2713+
},
26542714
},
26552715
Reason: reason,
26562716
}

server/events_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,8 +1292,8 @@ func TestAccountReqMonitoring(t *testing.T) {
12921292
// query statz/conns for account
12931293
resp, err = ncSys.Request(statz(acc.Name), nil, time.Second)
12941294
require_NoError(t, err)
1295-
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`,
1296-
`"received":{"msgs":0,"bytes":0}`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
1295+
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0`,
1296+
`"received":{"msgs":0,"bytes":0`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
12971297
require_Contains(t, string(resp.Data), respContentAcc...)
12981298

12991299
rIb := ncSys.NewRespInbox()
@@ -1350,11 +1350,11 @@ func TestAccountReqMonitoring(t *testing.T) {
13501350

13511351
// Since we now have processed our own message, sent msgs will be at least 1.
13521352
payload := string(resp.Data)
1353-
respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
1353+
respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
13541354
require_Contains(t, payload, respContentAcc...)
13551355

13561356
// Depending on timing, statz message could be accounted too.
1357-
receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0}`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0}`)
1357+
receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0`)
13581358
require_True(t, receivedOK)
13591359
_, err = rSub.NextMsg(200 * time.Millisecond)
13601360
require_Error(t, err)

server/filestore.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7399,6 +7399,24 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
73997399
return sm, nil
74007400
}
74017401

7402+
// SubjectForSeq will return what the subject is for this sequence if found.
7403+
func (fs *fileStore) SubjectForSeq(seq uint64) (string, error) {
7404+
fs.mu.RLock()
7405+
if seq < fs.state.FirstSeq {
7406+
fs.mu.RUnlock()
7407+
return _EMPTY_, ErrStoreMsgNotFound
7408+
}
7409+
var smv StoreMsg
7410+
mb := fs.selectMsgBlock(seq)
7411+
fs.mu.RUnlock()
7412+
if mb != nil {
7413+
if sm, _, _ := mb.fetchMsgNoCopy(seq, &smv); sm != nil {
7414+
return sm.subj, nil
7415+
}
7416+
}
7417+
return _EMPTY_, ErrStoreMsgNotFound
7418+
}
7419+
74027420
// LoadMsg will lookup the message by sequence number and return it if found.
74037421
func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
74047422
return fs.msgForSeq(seq, sm)

0 commit comments

Comments
 (0)