diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d45aa3287ec..bbc3acedeed 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -26124,3 +26124,142 @@ func TestJetStreamUpgradeConsumerVersioning(t *testing.T) { }) } } + +func TestJetStreamMirrorCrossAccountWithFilteredSubjectAndSubjectTransform(t *testing.T) { + conf := createConfFile(t, fmt.Appendf(nil, ` + listen: "127.0.0.1:-1" + jetstream: {store_dir: %q} + accounts: { + PUBLIC_ACCOUNT: { + jetstream: enabled + exports: [ + { service: "$JS.API.CONSUMER.CREATE.S.*.public.a", response_type: stream, accounts: ["INTERNAL_ACCOUNT"] }, + { service: "$JS.API.CONSUMER.CREATE.S.*.public.b", response_type: stream, accounts: ["INTERNAL_ACCOUNT"] }, + { service: "$JS.API.CONSUMER.CREATE.S.*.public.c", response_type: stream, accounts: ["INTERNAL_ACCOUNT"] }, + { service: "$JS.FC.>" }, + { stream: "shared.public.>", accounts: ["INTERNAL_ACCOUNT"] } + ] + users: [ {user: "public", password: "pwd"} ] + }, + + INTERNAL_ACCOUNT: { + jetstream: enabled + imports: [ + { service: { account: "PUBLIC_ACCOUNT", subject: "$JS.API.CONSUMER.CREATE.S.*.public.a" }, to: "JS.PUBLIC_ACCOUNT.CONSUMER.CREATE.S.*.public.a" }, + { service: { account: "PUBLIC_ACCOUNT", subject: "$JS.API.CONSUMER.CREATE.S.*.public.b" }, to: "JS.PUBLIC_ACCOUNT.CONSUMER.CREATE.S.*.public.b" }, + { service: { account: "PUBLIC_ACCOUNT", subject: "$JS.API.CONSUMER.CREATE.S.*.public.c" }, to: "JS.PUBLIC_ACCOUNT.CONSUMER.CREATE.S.*.public.c" }, + { service: { account: "PUBLIC_ACCOUNT", subject: "$JS.FC.>" }, to: "$JS.FC.>" }, + { stream: { account: "PUBLIC_ACCOUNT", subject: "shared.public.>" }, to: "shared.public.>" } + ] + users: [ {user: "internal", password: "pwd"} ] + } + } + `, t.TempDir())) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + pc, pjs := jsClientConnect(t, s, nats.UserInfo("public", "pwd")) + defer pc.Close() + + _, err := pjs.AddStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"public.>"}, + }) + require_NoError(t, err) + + ic, ijs := jsClientConnect(t, s, nats.UserInfo("internal", "pwd")) + defer ic.Close() + + addStream := func(name, fs string, transform *nats.SubjectTransformConfig) { + t.Helper() + sc := &nats.StreamConfig{ + Name: name, + Mirror: &nats.StreamSource{ + Name: "S", + External: &nats.ExternalStream{ + APIPrefix: "JS.PUBLIC_ACCOUNT", + DeliverPrefix: "shared.public", + }, + }, + } + if fs != _EMPTY_ { + sc.Mirror.FilterSubject = fs + } else { + sc.Mirror.SubjectTransforms = append(sc.Mirror.SubjectTransforms, *transform) + } + _, err = ijs.AddStream(sc) + require_NoError(t, err) + } + + checkMirror := func(name string, expected int) { + t.Helper() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + si, err := ijs.StreamInfo(name) + if err != nil { + return err + } + if n := int(si.State.Msgs); n != expected { + return fmt.Errorf("Expected %v mirrored message, got %v", expected, n) + } + return nil + }) + } + + // Just a subject filter + addStream("M1", "public.a", nil) + natsPub(t, pc, "public.a", []byte("hello")) + checkMirror("M1", 1) + + // Now try with equivalent subject transform (no destination). + addStream("M2", _EMPTY_, &nats.SubjectTransformConfig{Source: "public.b"}) + natsPub(t, pc, "public.b", []byte("hello")) + checkMirror("M2", 1) + + // And now with a transform destination. + addStream("M3", _EMPTY_, &nats.SubjectTransformConfig{Source: "public.c", Destination: "public.d"}) + natsPub(t, pc, "public.c", []byte("hello")) + checkMirror("M3", 1) + + checkMsg := func(stream, subj string, seq uint64) { + t.Helper() + msg, err := ijs.GetMsg(stream, seq) + require_NoError(t, err) + require_Equal(t, msg.Subject, subj) + } + checkMsg("M1", "public.a", 1) + checkMsg("M2", "public.b", 2) + checkMsg("M3", "public.d", 3) + + ic.Close() + pc.Close() + s.Shutdown() + s, _ = RunServerWithConfig(conf) + defer s.Shutdown() + + ic, ijs = jsClientConnect(t, s, nats.UserInfo("internal", "pwd")) + defer ic.Close() + + checkMirror("M1", 1) + checkMirror("M2", 1) + checkMirror("M3", 1) + checkMsg("M1", "public.a", 1) + checkMsg("M2", "public.b", 2) + checkMsg("M3", "public.d", 3) + + pc, _ = jsClientConnect(t, s, nats.UserInfo("public", "pwd")) + defer pc.Close() + natsPub(t, pc, "public.a", []byte("hello")) + natsPub(t, pc, "public.b", []byte("hello")) + natsPub(t, pc, "public.c", []byte("hello")) + natsFlush(t, pc) + + checkMirror("M1", 2) + checkMirror("M2", 2) + checkMirror("M3", 2) + checkMsg("M1", "public.a", 1) + checkMsg("M2", "public.b", 2) + checkMsg("M3", "public.d", 3) + checkMsg("M1", "public.a", 4) + checkMsg("M2", "public.b", 5) + checkMsg("M3", "public.d", 6) +} diff --git a/server/stream.go b/server/stream.go index 7341bf24bde..9ce3098da97 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2836,7 +2836,14 @@ func (mset *stream) setupMirrorConsumer() error { } mirror.sfs = sfs mirror.trs = trs - req.Config.FilterSubjects = sfs + // If there was no explicit FilterSubject defined and we have a single + // subject transform, use Config.FilterSubject instead of FilterSubjects + // so that we can use the extended consumer create API down below. + if req.Config.FilterSubject == _EMPTY_ && len(sfs) == 1 { + req.Config.FilterSubject = sfs[0] + } else { + req.Config.FilterSubjects = sfs + } } respCh := make(chan *JSApiConsumerCreateResponse, 1) @@ -2862,8 +2869,6 @@ func (mset *stream) setupMirrorConsumer() error { return nil } - b, _ := json.Marshal(req) - var subject string if req.Config.FilterSubject != _EMPTY_ { req.Config.Name = fmt.Sprintf("mirror-%s", createConsumerName()) @@ -2876,6 +2881,9 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.ReplaceAll(subject, "..", ".") } + // Marshal now that we are done with `req`. + b, _ := json.Marshal(req) + // Reset mirror.msgs = nil mirror.err = nil