Skip to content

Commit 08e0bf0

Browse files
Cherry-picks for 2.10.29-RC.1 (#6850)
Includes the following: - #6835 - #6845 [skip ci] Signed-off-by: Neil Twigg <[email protected]>
2 parents e0b99bd + 59bfd07 commit 08e0bf0

File tree

9 files changed

+221
-62
lines changed

9 files changed

+221
-62
lines changed

server/consumer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2694,13 +2694,14 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
26942694
TimeStamp: time.Now().UTC(),
26952695
}
26962696

2697-
// If we are replicated, we need to pull certain data from our store.
2698-
if rg != nil && rg.node != nil && o.store != nil {
2697+
// We always need to pull certain data from our store.
2698+
if o.store != nil {
26992699
state, err := o.store.BorrowState()
27002700
if err != nil {
27012701
o.mu.Unlock()
27022702
return nil
27032703
}
2704+
27042705
// If we are the leader we could have o.sseq that is skipped ahead.
27052706
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
27062707
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream

server/gsl/gsl.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package gsl
1616
import (
1717
"errors"
1818
"sync"
19+
"unsafe"
1920

2021
"github.com/nats-io/nats-server/v2/server/stree"
2122
)
@@ -479,16 +480,6 @@ func IntersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], sl *Generi
479480
}
480481

481482
func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T2], subj []byte, cb func(subj []byte, entry *T1)) {
482-
if r.numNodes() == 0 {
483-
// For wildcards we can't avoid Match, but if it's a literal subject at
484-
// this point, using Find is considerably cheaper.
485-
if subjectHasWildcard(string(subj)) {
486-
st.Match(subj, cb)
487-
} else if e, ok := st.Find(subj); ok {
488-
cb(subj, e)
489-
}
490-
return
491-
}
492483
nsubj := subj
493484
if len(nsubj) > 0 {
494485
nsubj = append(subj, '.')
@@ -504,15 +495,28 @@ func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T
504495
// check whether there's interest at this level (without triggering dupes) and
505496
// match if so.
506497
nsubj := append(nsubj, '*')
507-
if len(r.pwc.subs) > 0 && r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
498+
if len(r.pwc.subs) > 0 {
508499
st.Match(nsubj, cb)
509500
}
510-
intersectStree(st, r.pwc.next, nsubj, cb)
511-
case r.numNodes() > 0:
501+
if r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
502+
intersectStree(st, r.pwc.next, nsubj, cb)
503+
}
504+
default:
512505
// Normal node with subject literals, keep iterating.
513506
for t, n := range r.nodes {
514507
nsubj := append(nsubj, t...)
515-
intersectStree(st, n.next, nsubj, cb)
508+
if len(n.subs) > 0 {
509+
if subjectHasWildcard(bytesToString(nsubj)) {
510+
st.Match(nsubj, cb)
511+
} else {
512+
if e, ok := st.Find(nsubj); ok {
513+
cb(nsubj, e)
514+
}
515+
}
516+
}
517+
if n.next != nil && n.next.numNodes() > 0 {
518+
intersectStree(st, n.next, nsubj, cb)
519+
}
516520
}
517521
}
518522
}
@@ -530,3 +534,13 @@ func subjectHasWildcard(subject string) bool {
530534
}
531535
return false
532536
}
537+
538+
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
539+
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
540+
func bytesToString(b []byte) string {
541+
if len(b) == 0 {
542+
return _EMPTY_
543+
}
544+
p := unsafe.SliceData(b)
545+
return unsafe.String(p, len(b))
546+
}

server/gsl/gsl_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,11 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) {
300300
st.Insert([]byte("one.two.six"), struct{}{})
301301
st.Insert([]byte("one.two.seven"), struct{}{})
302302
st.Insert([]byte("eight.nine"), struct{}{})
303+
st.Insert([]byte("stream.A"), struct{}{})
304+
st.Insert([]byte("stream.A.child"), struct{}{})
303305

304306
require_NoDuplicates := func(t *testing.T, got map[string]int) {
307+
t.Helper()
305308
for _, c := range got {
306309
require_Equal(t, c, 1)
307310
}
@@ -354,7 +357,7 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) {
354357
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
355358
got[string(subj)]++
356359
})
357-
require_Len(t, len(got), 5)
360+
require_Len(t, len(got), 7)
358361
require_NoDuplicates(t, got)
359362
})
360363

@@ -381,14 +384,26 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) {
381384
require_NoDuplicates(t, got)
382385
})
383386

387+
t.Run("FWCExtended", func(t *testing.T) {
388+
got := map[string]int{}
389+
sl := NewSublist[int]()
390+
require_NoError(t, sl.Insert("stream.A.>", 11))
391+
require_NoError(t, sl.Insert("stream.A", 22))
392+
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
393+
got[string(subj)]++
394+
})
395+
require_Len(t, len(got), 2)
396+
require_NoDuplicates(t, got)
397+
})
398+
384399
t.Run("FWCAll", func(t *testing.T) {
385400
got := map[string]int{}
386401
sl := NewSublist[int]()
387402
require_NoError(t, sl.Insert(">", 11))
388403
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
389404
got[string(subj)]++
390405
})
391-
require_Len(t, len(got), 5)
406+
require_Len(t, len(got), 7)
392407
require_NoDuplicates(t, got)
393408
})
394409

@@ -413,6 +428,18 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) {
413428
})
414429
require_Len(t, len(got), 0)
415430
})
431+
432+
t.Run("NoMatchPartial", func(t *testing.T) {
433+
got := map[string]int{}
434+
sl := NewSublist[int]()
435+
require_NoError(t, sl.Insert("stream.A.not-child", 11))
436+
require_NoError(t, sl.Insert("stream.A.child.>", 22))
437+
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
438+
got[string(subj)]++
439+
})
440+
require_Len(t, len(got), 0)
441+
require_NoDuplicates(t, got)
442+
})
416443
}
417444

418445
// --- TEST HELPERS ---

server/jetstream_cluster_1_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6763,7 +6763,8 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
67636763
sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
67646764
require_NoError(t, err)
67656765

6766-
js.Publish("foo.bar", []byte("HELLO"))
6766+
_, err = js.Publish("foo.bar", []byte("HELLO"))
6767+
require_NoError(t, err)
67676768

67686769
si, err := js.StreamInfo("TEST")
67696770
require_NoError(t, err)
@@ -6792,8 +6793,8 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
67926793
require_NoError(t, err)
67936794
require_Equal(t, ci.Delivered.Consumer, 1)
67946795
require_Equal(t, ci.Delivered.Stream, 1)
6795-
require_Equal(t, ci.AckFloor.Consumer, 1)
6796-
require_Equal(t, ci.AckFloor.Stream, 1)
6796+
require_Equal(t, ci.AckFloor.Consumer, 0)
6797+
require_Equal(t, ci.AckFloor.Stream, 0)
67976798
require_Equal(t, ci.NumAckPending, 0)
67986799
require_Equal(t, ci.NumRedelivered, 1)
67996800
require_Equal(t, ci.NumPending, 0)

server/jetstream_cluster_3_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5418,14 +5418,9 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
54185418
t.Helper()
54195419
require_Equal(t, a.Delivered.Consumer, 10)
54205420
require_Equal(t, a.Delivered.Stream, 10)
5421-
// If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1.
5422-
if replicated {
5423-
require_Equal(t, a.AckFloor.Consumer, 0)
5424-
require_Equal(t, a.AckFloor.Stream, 0)
5425-
} else {
5426-
require_Equal(t, a.AckFloor.Consumer, 10)
5427-
require_Equal(t, a.AckFloor.Stream, 10)
5428-
}
5421+
// Agreed upon state is always used. Otherwise, o.asflr and o.adflr would be skipped ahead.
5422+
require_Equal(t, a.AckFloor.Consumer, 0)
5423+
require_Equal(t, a.AckFloor.Stream, 0)
54295424
require_Equal(t, a.NumPending, 40)
54305425
require_Equal(t, a.NumRedelivered, 10)
54315426
a.Cluster, b.Cluster = nil, nil

server/jetstream_consumer_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,3 +1817,96 @@ func TestJetStreamConsumerDeliverAllOverlappingFilterSubjects(t *testing.T) {
18171817
}
18181818
}
18191819
}
1820+
1821+
// https://github.com/nats-io/nats-server/issues/6844
1822+
func TestJetStreamConsumerDeliverAllNonOverlappingFilterSubjects(t *testing.T) {
1823+
s := RunBasicJetStreamServer(t)
1824+
defer s.Shutdown()
1825+
1826+
nc, js := jsClientConnectNewAPI(t, s)
1827+
defer nc.Close()
1828+
1829+
ctx := context.Background()
1830+
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
1831+
Name: "TEST",
1832+
Subjects: []string{"stream.>"},
1833+
})
1834+
require_NoError(t, err)
1835+
1836+
publishMessageCount := 10
1837+
for i := 0; i < publishMessageCount; i++ {
1838+
_, err = js.Publish(ctx, "stream.subject", nil)
1839+
require_NoError(t, err)
1840+
}
1841+
1842+
// Create consumer
1843+
consumer, err := js.CreateOrUpdateConsumer(ctx, "TEST", jetstream.ConsumerConfig{
1844+
DeliverPolicy: jetstream.DeliverAllPolicy,
1845+
FilterSubjects: []string{
1846+
"stream.subject.A",
1847+
"stream.subject.A.>",
1848+
},
1849+
})
1850+
require_NoError(t, err)
1851+
1852+
i, err := consumer.Info(ctx)
1853+
require_NoError(t, err)
1854+
require_Equal(t, i.NumPending, 0)
1855+
}
1856+
1857+
func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) {
1858+
s := RunBasicJetStreamServer(t)
1859+
defer s.Shutdown()
1860+
1861+
nc, js := jsClientConnect(t, s)
1862+
defer nc.Close()
1863+
1864+
_, err := js.AddStream(&nats.StreamConfig{
1865+
Name: "TEST",
1866+
Subjects: []string{"foo.>"},
1867+
})
1868+
require_NoError(t, err)
1869+
1870+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
1871+
Durable: "CONSUMER",
1872+
AckWait: 2 * time.Second,
1873+
AckPolicy: nats.AckExplicitPolicy,
1874+
FilterSubject: "foo.bar",
1875+
})
1876+
require_NoError(t, err)
1877+
1878+
// Publish two messages, one the consumer is interested in.
1879+
_, err = js.Publish("foo.bar", nil)
1880+
require_NoError(t, err)
1881+
_, err = js.Publish("foo.other", nil)
1882+
require_NoError(t, err)
1883+
1884+
sub, err := js.PullSubscribe("foo.bar", "CONSUMER")
1885+
require_NoError(t, err)
1886+
defer sub.Drain()
1887+
1888+
// Consumer info should start empty.
1889+
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
1890+
require_NoError(t, err)
1891+
require_Equal(t, ci.Delivered.Stream, 0)
1892+
require_Equal(t, ci.AckFloor.Stream, 0)
1893+
1894+
// Fetch more messages than match our filter.
1895+
msgs, err := sub.Fetch(2, nats.MaxWait(time.Second))
1896+
require_NoError(t, err)
1897+
require_Len(t, len(msgs), 1)
1898+
1899+
// We have received, but not acknowledged, consumer info must reflect that.
1900+
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
1901+
require_NoError(t, err)
1902+
require_Equal(t, ci.Delivered.Stream, 1)
1903+
require_Equal(t, ci.AckFloor.Stream, 0)
1904+
1905+
// Now we acknowledge the message and expect our delivered/ackfloor to be correct.
1906+
require_NoError(t, msgs[0].AckSync())
1907+
1908+
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
1909+
require_NoError(t, err)
1910+
require_Equal(t, ci.Delivered.Stream, 1)
1911+
require_Equal(t, ci.AckFloor.Stream, 1)
1912+
}

server/jetstream_test.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18870,7 +18870,7 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
1887018870
})
1887118871
require_NoError(t, err)
1887218872

18873-
for i := 0; i < 100; i++ {
18873+
for i := 0; i < 10; i++ {
1887418874
sendStreamMsg(t, nc, "foo", "msg")
1887518875
}
1887618876

@@ -18885,7 +18885,8 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
1888518885

1888618886
sub := natsSubSync(t, nc, inbox)
1888718887
for i := 0; i < 10; i++ {
18888-
natsNexMsg(t, sub, time.Second)
18888+
msg := natsNexMsg(t, sub, time.Second)
18889+
require_NoError(t, msg.AckSync())
1888918890
}
1889018891

1889118892
acc, err := s.lookupAccount(globalAccountName)
@@ -18911,6 +18912,13 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
1891118912
require_True(t, si.State.FirstSeq == 1_000_000)
1891218913
require_True(t, si.State.LastSeq == 999_999)
1891318914

18915+
acc, err = s.lookupAccount(globalAccountName)
18916+
require_NoError(t, err)
18917+
mset, err = acc.lookupStream("TEST")
18918+
require_NoError(t, err)
18919+
o = mset.lookupConsumer("dur")
18920+
require_True(t, o != nil)
18921+
1891418922
natsSubSync(t, nc, inbox)
1891518923
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
1891618924
ci, err := js.ConsumerInfo("TEST", "dur")
@@ -18920,8 +18928,19 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
1892018928
if ci.NumAckPending != 0 {
1892118929
return fmt.Errorf("NumAckPending should be 0, got %v", ci.NumAckPending)
1892218930
}
18923-
if ci.Delivered.Stream != 999_999 {
18924-
return fmt.Errorf("Delivered.Stream should be 999,999, got %v", ci.Delivered.Stream)
18931+
// Delivered stays the same because it reflects what was actually delivered.
18932+
// And must not be influenced by purges/compacts.
18933+
if ci.Delivered.Stream != 10 {
18934+
return fmt.Errorf("Delivered.Stream should be 10, got %v", ci.Delivered.Stream)
18935+
}
18936+
18937+
// Starting sequence should be skipped ahead, respecting the compact.
18938+
o.mu.RLock()
18939+
18940+
sseq := o.sseq
18941+
o.mu.RUnlock()
18942+
if sseq != 1_000_000 {
18943+
return fmt.Errorf("o.sseq should be 1,000,000, got %v", sseq)
1892518944
}
1892618945
return nil
1892718946
})
@@ -19720,10 +19739,10 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) {
1972019739
info, err := js.ConsumerInfo("name", durable)
1972119740
require_NoError(t, err)
1972219741

19723-
require_True(t, info.NumAckPending == 0)
19724-
require_True(t, info.AckFloor.Stream == 8)
19725-
require_True(t, info.AckFloor.Consumer == 1)
19726-
require_True(t, info.NumPending == 0)
19742+
require_Equal(t, info.NumAckPending, 0)
19743+
require_Equal(t, info.AckFloor.Stream, 6)
19744+
require_Equal(t, info.AckFloor.Consumer, 1)
19745+
require_Equal(t, info.NumPending, 0)
1972719746
}
1972819747

1972919748
func TestJetStreamConsumerMultipleSubjectsLastPerSubject(t *testing.T) {
@@ -19798,10 +19817,10 @@ func TestJetStreamConsumerMultipleSubjectsLastPerSubject(t *testing.T) {
1979819817
info, err := js.ConsumerInfo("name", durable)
1979919818
require_NoError(t, err)
1980019819

19801-
require_True(t, info.AckFloor.Consumer == 2)
19802-
require_True(t, info.AckFloor.Stream == 9)
19803-
require_True(t, info.Delivered.Stream == 12)
19804-
require_True(t, info.Delivered.Consumer == 3)
19820+
require_Equal(t, info.AckFloor.Consumer, 2)
19821+
require_Equal(t, info.AckFloor.Stream, 9)
19822+
require_Equal(t, info.Delivered.Stream, 10)
19823+
require_Equal(t, info.Delivered.Consumer, 3)
1980519824

1980619825
require_NoError(t, err)
1980719826

0 commit comments

Comments
 (0)