diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/overview/authors.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/overview/authors.adoc index 2559796f0..7e5ed9055 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/overview/authors.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/overview/authors.adoc @@ -1 +1 @@ -Soby Chacko; Chris Bono; Alexander Preuß; Jay Bryant; Christophe Bornet +Soby Chacko; Chris Bono; Alexander Preuß; Jay Bryant; Christophe Bornet; Jonas Geiregat diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/testing-applications.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/testing-applications.adoc new file mode 100644 index 000000000..7fd2eae8d --- /dev/null +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/testing-applications.adoc @@ -0,0 +1,42 @@ +[[testing-applications]] += Testing Applications +include::../attributes/attributes.adoc[] + +The `spring-pulsar-test` dependency includes some useful utilities when testing your applications. + +== PulsarConsumerTestUtil + +`org.springframework.pulsar.test.support.PulsarConsumerTestUtil` provides a type-safe fluent API for consuming messages from a Pulsar topic within a test. + +[source,java] +---- +List> messages = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("my-topic") + .withSchema(Schema.JSON(MyMessage.class)) + .awaitAtMost(Duration.ofSeconds(5)) + .get(); +---- + +A `until` method is also available to allow you to specify a condition that must be met before the messages are returned. + +[source,java] +---- +List> messages = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("my-topic") + .withSchema(Schema.JSON(MyMessage.class)) + .until(messages -> messages.size() == 5) + .awaitAtMost(Duration.ofSeconds(5)) + .get(); +---- + +A set of commonly used conditions are available in `org.springframework.pulsar.test.support.ConsumedMessagesConditions`. + +[source,java] +---- +List> messages = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("my-topic") + .withSchema(Schema.JSON(MyMessage.class)) + .awaitAtMost(Duration.ofSeconds(5)) + .until(containsExactlyExpectedValues(new MyMessage("foo"), new MyMessage("bar"))) + .get(); +---- diff --git a/spring-pulsar-test/spring-pulsar-test.gradle b/spring-pulsar-test/spring-pulsar-test.gradle index b88d13a3b..adb65e18e 100644 --- a/spring-pulsar-test/spring-pulsar-test.gradle +++ b/spring-pulsar-test/spring-pulsar-test.gradle @@ -8,4 +8,7 @@ dependencies { implementation 'org.junit.jupiter:junit-jupiter-api' implementation 'org.testcontainers:pulsar' implementation 'org.testcontainers:junit-jupiter' + implementation project(':spring-pulsar') + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.junit.jupiter:junit-jupiter' } diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionTimeoutException.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionTimeoutException.java new file mode 100644 index 000000000..af4aefc8c --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionTimeoutException.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +/** + * Exception thrown when a test times out. + * + * @author Jonas Geiregat + */ +public class ConditionTimeoutException extends PulsarTestException { + + public ConditionTimeoutException(String message, Throwable exception) { + super(message, exception); + } + + public ConditionTimeoutException(String message) { + super(message); + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionsSpec.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionsSpec.java new file mode 100644 index 000000000..dfce3a8fe --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConditionsSpec.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import java.time.Duration; +import java.util.List; + +import org.apache.pulsar.client.api.Message; + +import org.springframework.pulsar.PulsarException; + +/** + * Assertions related step in the fluent API for building a Pulsar test consumer. + * + * @param the type of the message payload + * @author Jonas Geiregat + */ +public interface ConditionsSpec { + + /** + * The maximum timeout duration to wait for the desired number of messages to be + * reached. + * @param timeout the maximum timeout duration to wait + * @return the next step in the fluent API + */ + ConditionsSpec awaitAtMost(Duration timeout); + + /** + * Start consuming until the given condition is met. + * @param consumedMessagesCondition the condition to be met + * @return the next step in the fluent API + */ + ConditionsSpec until(ConsumedMessagesCondition consumedMessagesCondition); + + /** + * + * Terminal operation that will get the consumed messages within the timeout verifying + * the given condition if any. + * @return the consumed messages + * @throws ConditionTimeoutException if the condition is not met within the timeout + * @throws PulsarException if the condition is not met within the timeout + */ + List> get(); + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesCondition.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesCondition.java new file mode 100644 index 000000000..b85aeabd5 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesCondition.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import java.util.List; + +import org.apache.pulsar.client.api.Message; + +/** + * A condition to be used in {@link PulsarConsumerTestUtil} to verify if it meets the + * consumed messages. + * + * @param the type of the message + * @author Jonas Geiregat + */ +@FunctionalInterface +public interface ConsumedMessagesCondition { + + /** + * Verifies that the consumed messages meet the condition. + * @param messages the consumed messages + * @return {@code true} if the condition is met + */ + boolean meets(List> messages); + + default ConsumedMessagesCondition and(ConsumedMessagesCondition other) { + return messages -> meets(messages) && other.meets(messages); + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesConditions.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesConditions.java new file mode 100644 index 000000000..af30c8e1e --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/ConsumedMessagesConditions.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import java.util.stream.Stream; + +import org.apache.pulsar.client.api.Message; + +import org.springframework.util.Assert; + +/** + * Exposes a set of commonly used conditions to be used in {@link PulsarConsumerTestUtil}. + * + * @author Jonas Geiregat + */ +public interface ConsumedMessagesConditions { + + /** + * Verifies that the consumed messages contain the expected message count. + * @param messageCount the desired message count + * @param the type of the message + * @return the condition + */ + static ConsumedMessagesCondition desiredMessageCount(int messageCount) { + Assert.state(messageCount > 0, "Desired message count must be greater than 0"); + return messages -> messages.size() == messageCount; + } + + /** + * Verifies that any of the consumed messages has a payload that equals the specified + * value. + * @param expectation the expected value + * @param the type of the message + * @return the condition + */ + static ConsumedMessagesCondition anyMessageMatchesExpected(T expectation) { + return messages -> messages.stream().anyMatch(message -> message.getValue().equals(expectation)); + } + + /** + * Verifies that the consumed messages value contains at all expected values. + * @param expectation the expected values + * @param the type of the message + * @return the condition + */ + @SafeVarargs + @SuppressWarnings("varargs") + static ConsumedMessagesCondition containsAllExpectedValues(T... expectation) { + return messages -> { + var values = messages.stream().map(Message::getValue).toList(); + return Stream.of(expectation).allMatch(values::contains); + }; + } + + /** + * Verifies that the consumed messages value contains exactly the expected values. + * @param expectation the expected values + * @param the type of the message + * @return the condition + */ + @SafeVarargs + @SuppressWarnings("varargs") + static ConsumedMessagesCondition containsExactlyExpectedValues(T... expectation) { + return ConsumedMessagesConditions.desiredMessageCount(expectation.length) + .and(containsAllExpectedValues(expectation)); + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java new file mode 100644 index 000000000..970111558 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; + +import org.springframework.pulsar.PulsarException; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.util.Assert; + +/** + * Fluent API, to be used in tests, for consuming messages from Pulsar topics until a + * certain {@code Condition} has been met. + * + * @param the type of the message payload + * @author Jonas Geiregat + */ +public class PulsarConsumerTestUtil implements TopicSpec, SchemaSpec, ConditionsSpec { + + private final PulsarConsumerFactory consumerFactory; + + private ConsumedMessagesCondition condition; + + private Schema schema; + + private Duration timeout = Duration.ofSeconds(30); + + private List topics; + + public static TopicSpec consumeMessages(PulsarConsumerFactory pulsarConsumerFactory) { + return new PulsarConsumerTestUtil<>(pulsarConsumerFactory); + } + + public PulsarConsumerTestUtil(PulsarConsumerFactory consumerFactory) { + Assert.notNull(consumerFactory, "PulsarConsumerFactory must not be null"); + this.consumerFactory = consumerFactory; + } + + @Override + public SchemaSpec fromTopic(String topic) { + Assert.notNull(topic, "Topic must not be null"); + this.topics = List.of(topic); + return this; + } + + @Override + public ConditionsSpec awaitAtMost(Duration timeout) { + Assert.notNull(timeout, "Timeout must not be null"); + this.timeout = timeout; + return this; + } + + @Override + public ConditionsSpec until(ConsumedMessagesCondition condition) { + this.condition = condition; + return this; + } + + @Override + public ConditionsSpec withSchema(Schema schema) { + this.schema = schema; + return this; + } + + @Override + public List> get() { + var messages = new ArrayList>(); + try { + String subscriptionName = UUID.randomUUID() + "-test-consumer"; + try (Consumer consumer = consumerFactory.createConsumer(this.schema, this.topics, subscriptionName, + c -> c.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest))) { + long remainingMillis = timeout.toMillis(); + do { + long loopStartTime = System.currentTimeMillis(); + var message = consumer.receive(200, TimeUnit.MILLISECONDS); + if (message != null) { + messages.add(message); + consumer.acknowledge(message); + } + if (this.condition != null) { + if (this.condition.meets(messages)) { + return messages; + } + } + remainingMillis -= System.currentTimeMillis() - loopStartTime; + } + while (remainingMillis > 0); + } + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } + if (this.condition != null && !this.condition.meets(messages)) { + throw new ConditionTimeoutException("Condition was not met within " + timeout.toSeconds() + " seconds"); + } + return messages; + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarTestException.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarTestException.java new file mode 100644 index 000000000..fd0375075 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarTestException.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +/** + * Exception thrown when a test fails. + * + * @author Jonas Geiregat + */ +public class PulsarTestException extends RuntimeException { + + public PulsarTestException(String message, Throwable exception) { + super(message, exception); + } + + @SuppressWarnings("unused") + public PulsarTestException(String message) { + super(message); + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/SchemaSpec.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/SchemaSpec.java new file mode 100644 index 000000000..8172a05d6 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/SchemaSpec.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import org.apache.pulsar.client.api.Schema; + +/** + * Specification for defining the schema to use for the {@link PulsarConsumerTestUtil}. + * + * @author Jonas Geiregat + */ +public interface SchemaSpec { + + /** + * Define the schema to use for the {@link PulsarConsumerTestUtil}. + * @param string the schema + * @return the conditions specification + */ + ConditionsSpec withSchema(Schema string); + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/TopicSpec.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/TopicSpec.java new file mode 100644 index 000000000..6fd73b13b --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/TopicSpec.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +/** + * Specification for defining the topic to consume from for the + * {@link PulsarConsumerTestUtil}. + * + * @author Jonas Geiregat + */ +public interface TopicSpec { + + /** + * Define the topic to consume from. + * @param topic the topic to consume from + * @return the schema selection specification + */ + SchemaSpec fromTopic(String topic); + +} diff --git a/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionTest.java b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionTest.java new file mode 100644 index 000000000..d47d6014c --- /dev/null +++ b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link ConsumedMessagesCondition}. + * + * @author Jonas Geiregat + */ +class ConsumedMessagesConditionTest { + + @Test + void bothConditionShouldBeMetInOrderForAChainedAndConditionToBeMet() { + var trueCondition = testCondition(true); + var falseCondition = testCondition(false); + assertThat(trueCondition.and(trueCondition).meets(List.of())).isTrue(); + assertThat(falseCondition.and(trueCondition).meets(List.of())).isFalse(); + assertThat(trueCondition.and(falseCondition).meets(List.of())).isFalse(); + assertThat(falseCondition.and(falseCondition).meets(List.of())).isFalse(); + } + + private ConsumedMessagesCondition testCondition(boolean b) { + return messages -> b; + } + +} diff --git a/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionsTest.java b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionsTest.java new file mode 100644 index 000000000..a891fd480 --- /dev/null +++ b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/ConsumedMessagesConditionsTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import static org.springframework.pulsar.test.support.ConsumedMessagesConditions.desiredMessageCount; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Tests for {@link ConsumedMessagesConditions}. + * + * @author Jonas Geiregat + */ +class ConsumedMessagesConditionsTest { + + private List> createStringMessages(int count) { + return IntStream.range(0, count) + .>mapToObj(i -> MessageImpl.create(new MessageMetadata(), + ByteBuffer.wrap(("message-" + i).getBytes()), Schema.STRING, "topic")) + .toList(); + } + + @Nested + class DesiredMessageCountTests { + + @Test + void receivedMessageCountMeetCondition() { + ConsumedMessagesCondition consumedMessagesCondition = desiredMessageCount(2); + + boolean result = consumedMessagesCondition.meets(createStringMessages(2)); + + Assertions.assertThat(result).isTrue(); + } + + @Test + void receivedMessageCountDoesNotMeetCondition() { + ConsumedMessagesCondition consumedMessagesCondition = desiredMessageCount(3); + + boolean result = consumedMessagesCondition.meets(createStringMessages(2)); + + Assertions.assertThat(result).isFalse(); + } + + @ParameterizedTest + @ValueSource(ints = { 0, -1 }) + void throwExceptionWhenDesiredMessageCountEqualOrLessThanZero(int messageCount) { + Assertions.assertThatThrownBy(() -> desiredMessageCount(messageCount)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Desired message count must be greater than 0"); + } + + } + + @Nested + class AnyMessageMatchesExpectedConditionTests { + + @Test + void messageValuesContainsExpectation() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .anyMessageMatchesExpected("message-1"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(3)); + + Assertions.assertThat(result).isTrue(); + } + + @Test + void messageValuesDoesNotContainExpectation() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .anyMessageMatchesExpected("message-3"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(3)); + + Assertions.assertThat(result).isFalse(); + } + + } + + @Nested + class ContainsAllExpectedValuesConditionTests { + + @Test + void messageValuesContainsExpectation() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .containsAllExpectedValues("message-1", "message-2"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(3)); + + Assertions.assertThat(result).isTrue(); + } + + @Test + void messageValuesDoesNotContainExpectation() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .containsAllExpectedValues("message-3", "message-4"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(3)); + + Assertions.assertThat(result).isFalse(); + } + + } + + @Nested + class ContainsExactlyExpectedValuesConditionTests { + + @Test + void messageValuesContainsExpectations() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .containsExactlyExpectedValues("message-0", "message-1"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(2)); + + Assertions.assertThat(result).isTrue(); + } + + @Test + void messageValuesDoesNotContainExpectation() { + ConsumedMessagesCondition consumedMessagesCondition = ConsumedMessagesConditions + .containsExactlyExpectedValues("message-0", "message-1"); + + boolean result = consumedMessagesCondition.meets(createStringMessages(3)); + + Assertions.assertThat(result).isFalse(); + } + + } + +} diff --git a/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarTestConsumerTestUtilTest.java b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarTestConsumerTestUtilTest.java new file mode 100644 index 000000000..238d07ba1 --- /dev/null +++ b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarTestConsumerTestUtilTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.springframework.pulsar.test.support.ConsumedMessagesConditions.desiredMessageCount; + +import java.time.Duration; +import java.util.List; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; + +/** + * Tests for {@link PulsarConsumerTestUtil}. + * + * @author Jonas Geiregat + */ +class PulsarTestConsumerTestUtilTest implements PulsarTestContainerSupport { + + private PulsarTemplate pulsarTemplate; + + private DefaultPulsarConsumerFactory pulsarConsumerFactory; + + @BeforeEach + void setup() throws PulsarClientException { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + this.pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()); + this.pulsarTemplate = new PulsarTemplate<>(new DefaultPulsarProducerFactory<>(pulsarClient)); + } + + @Test + void consumerFactoryCannotBeNull() { + assertThatThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("PulsarConsumerFactory must not be null"); + } + + @Test + void topicCannotBeNull() { + assertThatThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory).fromTopic(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Topic must not be null"); + } + + @Test + void awaitAtMostTimeoutCannotBeNull() { + assertThatThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("topic-a") + .withSchema(Schema.STRING) + .awaitAtMost(null)).isInstanceOf(IllegalArgumentException.class).hasMessage("Timeout must not be null"); + } + + @Test + void untilConditionCanBeNull() { + var testConsumer = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("topic-b") + .withSchema(Schema.STRING) + .awaitAtMost(Duration.ofSeconds(2)) + .until(null); + + pulsarTemplate.send("topic-b", "message"); + + assertThat(testConsumer.get()).hasSize(1).map(Message::getValue).containsExactly("message"); + } + + @Test + void consumerReturnsWhenConditionIsMet() { + var testConsumer = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("topic-c") + .withSchema(Schema.STRING); + + IntStream.range(0, 10).forEach(i -> pulsarTemplate.send("topic-c", "message-" + i)); + + List> messages = testConsumer.until(desiredMessageCount(2)).get(); + + assertThat(messages).hasSize(2).map(Message::getValue).containsExactly("message-0", "message-1"); + } + + @Test + void consumerReturnsAllMessagesWhenNoConditionIsPresent() { + var testConsumer = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("topic-d") + .withSchema(Schema.STRING); + + IntStream.range(0, 10).forEach(i -> pulsarTemplate.send("topic-d", "message-" + i)); + + List> messages = testConsumer.get(); + + assertThat(messages).hasSize(10) + .map(Message::getValue) + .containsExactlyElementsOf(IntStream.range(0, 10).mapToObj(i -> "message-" + i).toList()); + } + + @Test + void throwExceptionWhenConditionIsNotMet() { + var testConsumer = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + .fromTopic("topic-e") + .withSchema(Schema.STRING) + .awaitAtMost(Duration.ofSeconds(5)); + + ThrowingCallable consume = () -> testConsumer.until(desiredMessageCount(20)).get(); + + assertThatThrownBy(consume).isInstanceOf(ConditionTimeoutException.class) + .hasMessage("Condition was not met within 5 seconds"); + } + +}