Skip to content

Commit 9017f7a

Browse files
[FIXED] Remove expected headers when mirroring (#6961)
When publishing a message into a stream with an `Nats-Expected-Stream` (or other) header, it prevents this message from being stored by a mirror. Previously this was also prevented when sourcing, but that was fixed since 2.10.14 in #5256. Resolves #5865 Signed-off-by: Maurice van Veen <[email protected]>
2 parents 79e399e + 48fecd2 commit 9017f7a

File tree

2 files changed

+55
-0
lines changed

2 files changed

+55
-0
lines changed

server/jetstream_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9511,6 +9511,57 @@ func TestJetStreamMirrorBasics(t *testing.T) {
95119511

95129512
}
95139513

9514+
func TestJetStreamMirrorStripExpectedHeaders(t *testing.T) {
9515+
s := RunBasicJetStreamServer(t)
9516+
defer s.Shutdown()
9517+
9518+
// Client for API requests.
9519+
nc, js := jsClientConnect(t, s)
9520+
defer nc.Close()
9521+
9522+
// Create source and mirror streams.
9523+
_, err := js.AddStream(&nats.StreamConfig{
9524+
Name: "S",
9525+
Subjects: []string{"foo"},
9526+
})
9527+
require_NoError(t, err)
9528+
_, err = js.AddStream(&nats.StreamConfig{
9529+
Name: "M",
9530+
Mirror: &nats.StreamSource{Name: "S"},
9531+
})
9532+
require_NoError(t, err)
9533+
9534+
m := nats.NewMsg("foo")
9535+
pubAck, err := js.PublishMsg(m)
9536+
require_NoError(t, err)
9537+
require_Equal(t, pubAck.Sequence, 1)
9538+
9539+
// Mirror should get message.
9540+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
9541+
if si, err := js.StreamInfo("M"); err != nil {
9542+
return err
9543+
} else if si.State.Msgs != 1 {
9544+
return fmt.Errorf("expected 1 mirrored msg, got %d", si.State.Msgs)
9545+
}
9546+
return nil
9547+
})
9548+
9549+
m.Header.Set("Nats-Expected-Stream", "S")
9550+
pubAck, err = js.PublishMsg(m)
9551+
require_NoError(t, err)
9552+
require_Equal(t, pubAck.Sequence, 2)
9553+
9554+
// Mirror should strip expected headers and store the message.
9555+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
9556+
if si, err := js.StreamInfo("M"); err != nil {
9557+
return err
9558+
} else if si.State.Msgs != 2 {
9559+
return fmt.Errorf("expected 2 mirrored msgs, got %d", si.State.Msgs)
9560+
}
9561+
return nil
9562+
})
9563+
}
9564+
95149565
func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) {
95159566
s := RunBasicJetStreamServer(t)
95169567
defer s.Shutdown()

server/stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2960,6 +2960,10 @@ func (mset *stream) setupMirrorConsumer() error {
29602960
msgs := mirror.msgs
29612961
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29622962
hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy.
2963+
if len(hdr) > 0 {
2964+
// Remove any Nats-Expected- headers as we don't want to validate them.
2965+
hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-")
2966+
}
29632967
mset.queueInbound(msgs, subject, reply, hdr, msg, nil, nil)
29642968
mirror.last.Store(time.Now().UnixNano())
29652969
})

0 commit comments

Comments
 (0)