Closed
Description
Observed behavior
This is probably related to #6824 and its solution.
Scenario:
- some producer publishes some messages on a stream for subject
test.subject
- then a consumer on this stream uses filter subjects with
test.subject.a.x
andtest.subject.a.x.*
(ortest.subject.a.x.>
)
Observed behavior:
- the consumer consumes the messages
Expected behavior
- the consumer does not consume the message
This happens only if I use both filter subjects, i.e. with only test.subject.a.x
or only test.subject.a.x.*
the scenario works as expected. And it works as expected in nats v2.11.1.
Server and client version
- server: v2.11.2 (I'm using the docker image nats:2.11.2-alpine)
- client: java-library io.nats:jnats:2.21.1
Host environment
- macOS 15.4.1
- docker v4.41.0
- java v21.0.7
- kotlin 2.1.20
Steps to reproduce
docker-compose.yaml
volumes:
nats-data:
driver: local
services:
nats:
image: nats:2.11.2-alpine
ports:
- '4222:4222'
- '6222:6222'
- '8222:8222'
command: -js -sd /tmp/nats -m 8222
volumes:
- 'nats-data:/tmp/nats'
healthcheck:
test: wget http://localhost:8222/healthz -q -S -O -
start_period: 3s
start_interval: 2s
timeout: 1s
retries: 10
Kotlin client code
import io.nats.client.ConsumerContext
import io.nats.client.Message
import io.nats.client.Nats
import io.nats.client.Options
import io.nats.client.api.AckPolicy
import io.nats.client.api.ConsumerConfiguration
import io.nats.client.api.DeliverPolicy
import io.nats.client.api.ReplayPolicy
import io.nats.client.api.StreamConfiguration
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
class NatsConsumerTest {
@Test
fun testSubjectFilter() = runTest {
val natsOptions = Options.Builder()
.server("nats://localhost:4222")
.connectionName("test-connection")
.build()
val connection = Nats.connect(natsOptions)
val streamName = "test-stream"
val streamConfig = StreamConfiguration.Builder()
.name(streamName)
.subjects("test.>")
.build()
connection.jetStreamManagement().addStream(streamConfig)
val js = connection.jetStream()
js.publish("test.subject", "foo message".toByteArray())
js.publish("test.subject", "bar message".toByteArray())
js.publish("test.subject", "baz message".toByteArray())
js.publish("test.subject", "qux message".toByteArray())
val consumerConfig = ConsumerConfiguration.builder()
.durable("test-consumer")
.filterSubjects("test.subject.a.x", "test.subject.a.x.*")
.ackPolicy(AckPolicy.Explicit)
.deliverPolicy(DeliverPolicy.All)
.replayPolicy(ReplayPolicy.Instant)
.maxAckPending(1)
.ackWait(30.seconds.toJavaDuration())
.build()
val consumer = js.getStreamContext(streamName).createOrUpdateConsumer(consumerConfig)
val flow = consumer.toFlow()
val messages = flow.collectAll()
println(messages) // outputs [foo message]
connection.jetStreamManagement().deleteStream(streamName)
}
private fun ConsumerContext.toFlow(): Flow<Message> = flow {
iterate().use {
do {
it.nextMessage(250)?.run { emit(this) }
} while (with(consumerInfo) { numAckPending > 0 || numPending > 0 })
}
}
private suspend fun Flow<Message>.collectAll(): List<String> {
val messages = mutableListOf<String>()
collect { message ->
messages.add(String(message.data))
message.ack()
}
return messages.toList()
}
}