Skip to content

Regression in FilterSubject handling between v2.11.1 and v2.11.2 #6844

Closed
@obecker

Description

@obecker

Observed behavior

This is probably related to #6824 and its solution.

Scenario:

  • some producer publishes some messages on a stream for subject test.subject
  • then a consumer on this stream uses filter subjects with test.subject.a.x and test.subject.a.x.* (or test.subject.a.x.>)

Observed behavior:

  • the consumer consumes the messages

Expected behavior

  • the consumer does not consume the message

This happens only if I use both filter subjects, i.e. with only test.subject.a.x or only test.subject.a.x.* the scenario works as expected. And it works as expected in nats v2.11.1.

Server and client version

  • server: v2.11.2 (I'm using the docker image nats:2.11.2-alpine)
  • client: java-library io.nats:jnats:2.21.1

Host environment

  • macOS 15.4.1
  • docker v4.41.0
  • java v21.0.7
  • kotlin 2.1.20

Steps to reproduce

docker-compose.yaml

volumes:
  nats-data:
    driver: local

services:
  nats:
    image: nats:2.11.2-alpine
    ports:
      - '4222:4222'
      - '6222:6222'
      - '8222:8222'
    command: -js -sd /tmp/nats -m 8222
    volumes:
      - 'nats-data:/tmp/nats'
    healthcheck:
      test: wget http://localhost:8222/healthz -q -S -O -
      start_period: 3s
      start_interval: 2s
      timeout: 1s
      retries: 10

Kotlin client code

import io.nats.client.ConsumerContext
import io.nats.client.Message
import io.nats.client.Nats
import io.nats.client.Options
import io.nats.client.api.AckPolicy
import io.nats.client.api.ConsumerConfiguration
import io.nats.client.api.DeliverPolicy
import io.nats.client.api.ReplayPolicy
import io.nats.client.api.StreamConfiguration
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

class NatsConsumerTest {

    @Test
    fun testSubjectFilter() = runTest {

        val natsOptions = Options.Builder()
            .server("nats://localhost:4222")
            .connectionName("test-connection")
            .build()
        val connection = Nats.connect(natsOptions)

        val streamName = "test-stream"
        val streamConfig = StreamConfiguration.Builder()
            .name(streamName)
            .subjects("test.>")
            .build()

        connection.jetStreamManagement().addStream(streamConfig)

        val js = connection.jetStream()

        js.publish("test.subject", "foo message".toByteArray())
        js.publish("test.subject", "bar message".toByteArray())
        js.publish("test.subject", "baz message".toByteArray())
        js.publish("test.subject", "qux message".toByteArray())

        val consumerConfig = ConsumerConfiguration.builder()
            .durable("test-consumer")
            .filterSubjects("test.subject.a.x", "test.subject.a.x.*")
            .ackPolicy(AckPolicy.Explicit)
            .deliverPolicy(DeliverPolicy.All)
            .replayPolicy(ReplayPolicy.Instant)
            .maxAckPending(1)
            .ackWait(30.seconds.toJavaDuration())
            .build()

        val consumer = js.getStreamContext(streamName).createOrUpdateConsumer(consumerConfig)

        val flow = consumer.toFlow()

        val messages = flow.collectAll()

        println(messages) // outputs [foo message]

        connection.jetStreamManagement().deleteStream(streamName)
    }

    private fun ConsumerContext.toFlow(): Flow<Message> = flow {
        iterate().use {
            do {
                it.nextMessage(250)?.run { emit(this) }
            } while (with(consumerInfo) { numAckPending > 0 || numPending > 0 })
        }
    }

    private suspend fun Flow<Message>.collectAll(): List<String> {
        val messages = mutableListOf<String>()
        collect { message ->
            messages.add(String(message.data))
            message.ack()
        }
        return messages.toList()
    }
}

Metadata

Metadata

Assignees

Labels

defectSuspected defect such as a bug or regression

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions