Skip to content

feat: Add PulsarTestConsumer #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Soby Chacko; Chris Bono; Alexander Preuß; Jay Bryant; Christophe Bornet
Soby Chacko; Chris Bono; Alexander Preuß; Jay Bryant; Christophe Bornet; Jonas Geiregat
4 changes: 4 additions & 0 deletions spring-pulsar-test/spring-pulsar-test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.testcontainers:pulsar'
implementation 'org.testcontainers:junit-jupiter'
implementation 'org.awaitility:awaitility'
implementation project(':spring-pulsar')
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

/**
* Exception thrown when a test times out.
*
* @author Jonas Geiregat
*/
public class ConditionTimeoutException extends PulsarTestException {

public ConditionTimeoutException(String message, Throwable exception) {
super(message, exception);
}

public ConditionTimeoutException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

import java.util.stream.Stream;

import org.apache.pulsar.client.api.Message;

import org.springframework.util.Assert;

/**
* Exposes a set of commonly used conditions to be used in {@link PulsarConsumerTestUtil}.
*
* @author Jonas Geiregat
*/
public interface Conditions {

/**
* Verifies that the consumed messages contain the expected message count.
* @param messageCount the desired message count
* @param <T> the type of the message
* @return the condition
*/
static <T> ConsumedMessagesCondition<T> desiredMessageCount(int messageCount) {
Assert.state(messageCount > 0, "Desired message count must be greater than 0");
return messages -> messages.size() == messageCount;
}

/**
* Verifies that at least one of the consumed messages has a payload that equals the
* specified value.
* @param expectation the expected value
* @param <T> the type of the message
* @return the condition
*/
static <T> ConsumedMessagesCondition<T> containsValue(T expectation) {
return messages -> messages.stream().anyMatch(message -> message.getValue().equals(expectation));
}

/**
* Verifies that the consumed messages value contains at least all expected values.
* @param expectation the expected value
* @param <T> the type of the message
* @return the condition
*/
@SafeVarargs
@SuppressWarnings("varargs")
static <T> ConsumedMessagesCondition<T> containsValues(T... expectation) {
return messages -> {
var values = messages.stream().map(Message::getValue).toList();
return Stream.of(expectation).allMatch(values::contains);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

import java.time.Duration;

/**
* Assertions related step in the fluent API for building a Pulsar test consumer.
*
* @param <T> the type of the message payload
* @author Jonas Geiregat
*/
public interface ConditionsSpec<T> {

/**
* The maximum timeout duration to wait for the desired number of messages to be
* reached.
* @param timeout the maximum timeout duration to wait
* @return the next step in the fluent API
*/
ConditionsSpec<T> awaitAtMost(Duration timeout);

/**
* Start consuming until the given condition is met.
* @param consumedMessagesCondition the condition to be met
* @return the next step in the fluent API
*/
ConsumptionSpec<T> until(ConsumedMessagesCondition<T> consumedMessagesCondition);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

import java.util.List;

import org.apache.pulsar.client.api.Message;

/**
* A condition to be used in {@link PulsarConsumerTestUtil} to verify if it meets the
* consumed messages.
*
* @param <T> the type of the message
* @author Jonas Geiregat
*/
@FunctionalInterface
public interface ConsumedMessagesCondition<T> {

/**
* Verifies that the consumed messages meet the condition.
* @param messages the consumed messages
* @return {@code true} if the condition is met
*/
boolean meets(List<Message<T>> messages);

default ConsumedMessagesCondition<T> and(ConsumedMessagesCondition<T> other) {
return messages -> meets(messages) && other.meets(messages);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

import java.util.List;

import org.apache.pulsar.client.api.Message;

import org.springframework.pulsar.PulsarException;

/**
* Terminal specification for consuming messages.
*
* @param <T> the type of the message payload
* @author Jonas Geiregat
*/
public interface ConsumptionSpec<T> {

/**
*
* Get the consumed messages.
* @return the consumed messages
* @throws ConditionTimeoutException if the condition is not met within the timeout
* @throws PulsarException if the condition is not met within the timeout
*/
List<Message<T>> get();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.util.Assert;

/**
* Fluent API, to be used in tests, for consuming messages from Pulsar topics until a
* certain {@code Condition} has been met.
*
* @param <T> the type of the message payload
* @author Jonas Geiregat
*/
public class PulsarConsumerTestUtil<T> implements TopicSpec, SchemaSpec, ConditionsSpec<T>, ConsumptionSpec<T> {

private final PulsarConsumerFactory<T> consumerFactory;

private ConsumedMessagesCondition<T> condition;

private Schema<T> schema;

private Duration timeout = Duration.ofSeconds(30);

private List<String> topics;

public static <T> TopicSpec consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
return new PulsarConsumerTestUtil<>(pulsarConsumerFactory);
}

public PulsarConsumerTestUtil(PulsarConsumerFactory<T> consumerFactory) {
this.consumerFactory = consumerFactory;
}

@Override
public SchemaSpec fromTopic(String topic) {
Assert.state(topic != null, "Topic must not be null");
this.topics = List.of(topic);
return this;
}

@Override
public ConditionsSpec<T> awaitAtMost(Duration timeout) {
this.timeout = timeout;
return this;
}

@Override
public ConsumptionSpec<T> until(ConsumedMessagesCondition<T> condition) {
this.condition = condition;
return this;
}

@Override
@SuppressWarnings("unchecked")
public <E> ConditionsSpec<E> withSchema(Schema<E> schema) {
this.schema = (Schema<T>) schema;
return (ConditionsSpec<E>) this;
}

@Override
public List<Message<T>> get() {
var messages = new ArrayList<Message<T>>();
try (Consumer<T> consumer = consumerFactory.createConsumer(this.schema, this.topics, "test-consumer",
c -> c.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest))) {
long remainingMillis = timeout.toMillis();
do {
long loopStartTime = System.currentTimeMillis();
var message = consumer.receive(200, TimeUnit.MILLISECONDS);
if (message != null) {
messages.add(message);
consumer.acknowledge(message);
}
if (this.condition != null) {
if (this.condition.meets(messages)) {
return messages;
}
}
remainingMillis -= System.currentTimeMillis() - loopStartTime;
}
while (remainingMillis > 0);
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
if (this.condition != null && !this.condition.meets(messages)) {
throw new ConditionTimeoutException("Condition was not met within " + timeout.toSeconds() + " seconds");
}
return messages;
}

}
Loading