Skip to content

Add Spring Pulsar smoke tests #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions integration/spring-pulsar-reactive/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id "java"
id "org.springframework.boot"
id "org.springframework.cr.smoke-test"
}

dependencies {
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))
implementation("org.springframework.boot:spring-boot-starter-pulsar-reactive")
// TODO remove constraints once Boot goes back to snapshots
constraints {
implementation('org.springframework.pulsar:spring-pulsar:1.0.0-SNAPSHOT')
implementation('org.springframework.pulsar:spring-pulsar-reactive:1.0.0-SNAPSHOT')
}

implementation("org.crac:crac:$cracVersion")
implementation(project(":cr-listener"))

testImplementation("org.springframework.boot:spring-boot-starter-test")

appTestImplementation(project(":cr-smoke-test-support"))
appTestImplementation("org.awaitility:awaitility:4.2.0")
}
10 changes: 10 additions & 0 deletions integration/spring-pulsar-reactive/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '3'
services:
pulsar:
image: apachepulsar/pulsar:3.1.0
ports:
- '8080'
- '6650'
command: bin/pulsar standalone
healthcheck:
test: curl http://127.0.0.1:8080/admin/v2/namespaces/public/default
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.example.pulsar.reactive;

import java.time.Duration;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import org.springframework.cr.smoketest.support.assertj.AssertableOutput;
import org.springframework.cr.smoketest.support.junit.ApplicationTest;

import static org.assertj.core.api.Assertions.assertThat;

@ApplicationTest
public class SpringPulsarApplicationTests {

@Test
void reactivePulsarTemplateSendsMessage(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output)
.hasSingleLineContaining("Message Sent (ReactivePulsarTemplate): Greeting[message=Hello from CRaC!]"));
}

@Test
void reactivePulsarListenerMethodReceivesMessage(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output).hasSingleLineContaining(
"Message Received (@ReactivePulsarListener): Greeting[message=Hello from CRaC!]"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.example.pulsar.reactive;

import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;

@SpringBootApplication
public class SpringPulsarApplication {

public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}

@Autowired
private ReactivePulsarTemplate<Greeting> reactivePulsarTemplate;

@EventListener
public void applicationReady(ApplicationReadyEvent ignored) {
String topic = "crac-demo-reactive-topic1";
Greeting message = new Greeting("Hello from CRaC!");
reactivePulsarTemplate.send(topic, message, Schema.JSON(Greeting.class)).subscribe();
System.out.println("Message Sent (ReactivePulsarTemplate): " + message);
}

@ReactivePulsarListener(topics = "crac-demo-reactive-topic1", subscriptionName = "crac-demo-reactive-sub1")
Mono<Void> receiveMessageFromTopic(Greeting message) {
System.out.println("Message Received (@ReactivePulsarListener): " + message);
return Mono.empty();
}

@Bean
PulsarTopic listenerTopic() {
return PulsarTopic.builder("crac-demo-reactive-topic1").numberOfPartitions(1).build();
}

record Greeting(String message) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spring.pulsar.client.service-url=pulsar://${PULSAR_HOST:localhost}:${PULSAR_PORT_6650:6650}
spring.pulsar.admin.service-url=http://${PULSAR_HOST:localhost}:${PULSAR_PORT_8080:8080}
spring.pulsar.consumer.subscription.initial-position=earliest

logging.level.root=INFO
logging.level.org.apache.pulsar=WARN
logging.level.org.apache.pulsar.common.util.netty.DnsResolverUtil=ERROR
logging.level.org.springframework.pulsar=DEBUG
logging.level.org.springframework.pulsar.function=WARN
22 changes: 22 additions & 0 deletions integration/spring-pulsar/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
id "java"
id "org.springframework.boot"
id "org.springframework.cr.smoke-test"
}

dependencies {
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))
implementation("org.springframework.boot:spring-boot-starter-pulsar")
// TODO remove constraints once Boot goes back to snapshots
constraints {
implementation('org.springframework.pulsar:spring-pulsar:1.0.0-SNAPSHOT')
}

implementation("org.crac:crac:$cracVersion")
implementation(project(":cr-listener"))

testImplementation("org.springframework.boot:spring-boot-starter-test")

appTestImplementation(project(":cr-smoke-test-support"))
appTestImplementation("org.awaitility:awaitility:4.2.0")
}
10 changes: 10 additions & 0 deletions integration/spring-pulsar/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '3'
services:
pulsar:
image: apachepulsar/pulsar:3.1.0
ports:
- '8080'
- '6650'
command: bin/pulsar standalone
healthcheck:
test: curl http://127.0.0.1:8080/admin/v2/namespaces/public/default
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.example.pulsar;

import java.time.Duration;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import org.springframework.cr.smoketest.support.assertj.AssertableOutput;
import org.springframework.cr.smoketest.support.junit.ApplicationTest;

import static org.assertj.core.api.Assertions.assertThat;

@ApplicationTest
public class SpringPulsarApplicationTests {

@Test
void pulsarTemplateSendsMessageToListenerTopic(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output)
.hasSingleLineContaining("Message Sent (@PulsarListener): Greeting[message=Hello from CRaC!]"));
}

@Test
void pulsarTemplateSendsMessageToReaderTopic(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output)
.hasSingleLineContaining("Message Sent (@PulsarReader): Greeting[message=Hello from CRaC!]"));
}

@Test
void pulsarListenerMethodReceivesMessage(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output)
.hasSingleLineContaining("Message Received (@PulsarListener): Greeting[message=Hello from CRaC!]"));
}

@Test
void pulsarReaderMethodReceivesMessage(AssertableOutput output) {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output)
.hasSingleLineContaining("Message Received (@PulsarReader): Greeting[message=Hello from CRaC!]"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.example.pulsar;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopic;

@SpringBootApplication
public class SpringPulsarApplication {

public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}

@Autowired
private PulsarTemplate<Greeting> pulsarTemplate;

@EventListener
public void applicationReady(ApplicationReadyEvent ignored) throws PulsarClientException {
Greeting message = new Greeting("Hello from CRaC!");
pulsarTemplate.send("crac-demo-topic1", message, Schema.JSON(Greeting.class));
System.out.println("Message Sent (@PulsarListener): " + message);

pulsarTemplate.send("crac-demo-topic2", message, Schema.JSON(Greeting.class));
System.out.println("Message Sent (@PulsarReader): " + message);
}

@PulsarListener(topics = "crac-demo-topic1", subscriptionName = "crac-demo-sub1")
void receiveMessageFromTopic(Greeting message) {
System.out.println("Message Received (@PulsarListener): " + message);
}

@PulsarReader(topics = "crac-demo-topic2", subscriptionName = "crac-demo-sub2", startMessageId = "earliest")
void readMessageFromTopic(Greeting message) {
System.out.println("Message Received (@PulsarReader): " + message);
}

@Bean
PulsarTopic readerTopic() {
return PulsarTopic.builder("crac-demo-topic2").numberOfPartitions(1).build();
}

record Greeting(String message) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spring.pulsar.client.service-url=pulsar://${PULSAR_HOST:localhost}:${PULSAR_PORT_6650:6650}
spring.pulsar.admin.service-url=http://${PULSAR_HOST:localhost}:${PULSAR_PORT_8080:8080}
spring.pulsar.consumer.subscription.initial-position=earliest

logging.level.root=INFO
logging.level.org.apache.pulsar=WARN
logging.level.org.apache.pulsar.common.util.netty.DnsResolverUtil=ERROR
logging.level.org.springframework.pulsar=DEBUG
logging.level.org.springframework.pulsar.function=WARN