Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProxiedKafkaCompanionResource cannot use dev service #47112

Open
vsevel opened this issue Apr 1, 2025 · 2 comments
Open

ProxiedKafkaCompanionResource cannot use dev service #47112

vsevel opened this issue Apr 1, 2025 · 2 comments
Assignees
Labels

Comments

@vsevel
Copy link
Contributor

vsevel commented Apr 1, 2025

Describe the bug

I have created a simple kafka application, where tests are using the default redpanda dev service.
The application is created with https://code.quarkus.io/?e=messaging-kafka

I am adding the following test from the documentation:

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class MyTest {

    @InjectKafkaCompanion
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("words", UUID.randomUUID().toString()));

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("words", 10);
        orders.awaitCompletion();
        assertEquals(10, orders.count());
    }

}

all tests pass.

now I am adding quarkus-test-kafka-companion, plus the following test:

@QuarkusTestResource(ProxiedKafkaCompanionResource.class)
public class MyToxiProxyTest {

    Logger logger = LoggerFactory.getLogger(MyToxiProxyTest.class);

    @InjectKafkaProxy
    KafkaProxy proxy;

    // not injected, see https://github.com/quarkusio/quarkus/issues/47109
    // @InjectKafkaCompanion
    KafkaCompanion companion;

    @Test
    void testProcessor() throws IOException {

        proxy.toxi.toxics().bandwidth("CUT_CONNECTION_UPSTREAM", ToxicDirection.UPSTREAM, 0L);
        proxy.toxi.toxics().bandwidth("CUT_CONNECTION_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0L);

        // see https://github.com/quarkusio/quarkus/issues/47109
        companion = new KafkaCompanion(proxy.getProxyBootstrapServers());

        logger.info("toxics: {}", proxy.toxi.toxics().getAll());

        // the following is expected to fail because of cut connection

        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("words", UUID.randomUUID().toString()));

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

         ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("words", 10);
         orders.awaitCompletion();
         assertEquals(10, orders.count());
    }
}

if I execute mvn package , this will fail with:

[ERROR] org.acme.MyToxiProxyTest.testProcessor -- Time elapsed: 0.016 s <<< ERROR!
java.lang.NullPointerException: Cannot read field "toxi" because "this.proxy" is null
        at org.acme.MyToxiProxyTest.testProcessor(MyToxiProxyTest.java:41)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at io.quarkus.test.junit.QuarkusTestExtension.runExtensionMethod(QuarkusTestExtension.java:960)
        at io.quarkus.test.junit.QuarkusTestExtension.interceptTestMethod(QuarkusTestExtension.java:810)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

this happens because in KafkaCompanionResource, we have:

    @Override
    public void setIntegrationTestContext(DevServicesContext context) {
        Map<String, String> devServicesProperties = context.devServicesProperties();
        String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers");
        if (bootstrapServers != null) {
            kafkaCompanion = new KafkaCompanion(bootstrapServers);

then we won't be creating a container in:

    @Override
    public void init(Map<String, String> initArgs) {
        if (kafkaCompanion == null) {
            strimziKafkaContainerImage = initArgs.get(STRIMZI_KAFKA_IMAGE_KEY);
            String portString = initArgs.get(KAFKA_PORT_KEY);
            kafkaPort = portString == null ? null : Integer.parseInt(portString);
            kraft = Boolean.parseBoolean(initArgs.get(KRAFT_KEY));
            kafka = createContainer(strimziKafkaContainerImage);

as a result, the proxied kafka won't be created in ProxiedKafkaCompanionResource:

    @Override
    protected StrimziKafkaContainer createContainer(String imageName) {
        if (imageName == null) {
            proxiedKafka = new ProxiedStrimziKafkaContainer();
        } else {
            proxiedKafka = new ProxiedStrimziKafkaContainer(imageName);
        }
        return proxiedKafka;
    }

to get the proxy to be created, we would need to disable the kafka dev service, but then the other tests would fail.

what we want is reuse the kafka dev service in ProxiedKafkaCompanionResource if enabled.

Expected behavior

the ProxiedKafkaCompanionResource should be able to reuse the dev service.

Actual behavior

it does not, and leaves us with no options if we have non proxy tests, plus proxy tests.
this is qualified as a bug for that reason. arguably this could considered as an improvement request.
nevertheless, we find the companion and the toxi proxy integration to be very useful, and we would like mix those tests together with the out kafka dev service broker.

How to Reproduce?

follow the initial description.

Output of uname -a or ver

No response

Output of java -version

No response

Quarkus version or git rev

3.21

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

@vsevel vsevel added the kind/bug Something isn't working label Apr 1, 2025
Copy link

quarkus-bot bot commented Apr 1, 2025

/cc @alesj (kafka), @cescoffier (kafka), @geoand (devservices), @ozangunalp (kafka)

@vsevel
Copy link
Contributor Author

vsevel commented Apr 1, 2025

the code example is identical to the one in #47109, because the other issue was found trying to reproduce the current one. but they are not related.
however the ultimate goal would be to be able to run proxy and non proxy tests with the companion, and regular kafka tests, with the dev service.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants