Skip to content

fix: patch broker within testbroker context only #1619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 4, 2024

Conversation

sfran96
Copy link
Contributor

@sfran96 sfran96 commented Jul 24, 2024

Description

When using a pre-started broker, the broker gets patched only within the context manager, and once it exists the previous connection is still usable.

import pytest

from faststream.rabbit import TestRabbitBroker, RabbitBroker


@pytest.mark.asyncio
async def test_fix():
    broker = RabbitBroker(url="amqp://guest:[email protected]:5672/")
    await broker.start()
    publisher = broker.publisher()

    async with TestRabbitBroker(broker):
        for i in range(10):
            await publisher.publish(f"message {i}")

        assert publisher.mock.call_count == 10

    assert getattr(publisher, "mock", None) is None
    await publisher.publish(f"message 11")

Fixes #1479

Some of the other changes in the PR are related to using the pre-commit hooks, or not all tests working in < py3.10.

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
  • I have ensured that static analysis tests are passing by running scripts/static-analysis.sh
  • I have included code examples to illustrate the modifications

@Lancetnik
Copy link
Collaborator

Seems like some tests are broken now due new changes

@sfran96
Copy link
Contributor Author

sfran96 commented Jul 25, 2024

I saw now, something with patching one of the functions incorrectly. I also added more tests for the with_real option as per my understanding of the use of the flag.

Lancetnik and others added 2 commits July 25, 2024 11:56
test: update signature of testclient test in redis
@sfran96
Copy link
Contributor Author

sfran96 commented Jul 25, 2024

Some tests were, weirdly, still failing. I've checked the subscriber handlers for NATS and Redis and both seem to create multiple listeners for the same queue/topic if broker.start() is called multiple times 🤔.

I'm guessing this is not intended behavior, because it doesn't seem to happen on the Kafka or Rabbit brokers.

I can't seem to get the confluent kafka broker with test test_broker_with_real_patches_subscribers_and_subscribers, I'm a bit puzzled.

Redis

from faststream.redis import RedisBroker

broker = RedisBroker()
queue = "test_queue"


async def main():
    event1 = asyncio.Event()
    event2 = asyncio.Event()

    @broker.subscriber(queue)
    async def m(msg):
        if event1.is_set():
            event2.set()
        else:
            event1.set()

    await broker.start()
    await broker.start()
    await broker.publish("hello", queue)

    await asyncio.wait(
        (
            asyncio.create_task(event1.wait()),
            asyncio.create_task(event2.wait()),
        ),
        timeout=1,
    )

    assert event1.is_set() and event2.is_set() is False


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

NATS

from faststream.nats import NatsBroker

broker = NatsBroker()
queue = "test_queue"


async def main():
    event1 = asyncio.Event()
    event2 = asyncio.Event()

    @broker.subscriber(queue)
    async def m(msg):
        if event1.is_set():
            event2.set()
        else:
            event1.set()

    await broker.start()
    await broker.start()
    await broker.publish("hello", queue)

    await asyncio.wait(
        (
            asyncio.create_task(event1.wait()),
            asyncio.create_task(event2.wait()),
        ),
        timeout=1,
    )

    assert event1.is_set() and event2.is_set() is False


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

I'm pushing the changes and you can evaluate if this is the intended behavior, or event just to decide to move it to another PR.

@Lancetnik
Copy link
Collaborator

@sfran96 broker wasn't designed for multiple start calls. Of course, we should refactor whole TestBroker behavior and I am planning this to 0.6.0. But can we make smth right here, without big changes?

Lancetnik
Lancetnik previously approved these changes Aug 4, 2024
@Lancetnik
Copy link
Collaborator

Seems like we can merge it already, tu

@Lancetnik Lancetnik enabled auto-merge August 4, 2024 11:32
Lancetnik
Lancetnik previously approved these changes Aug 4, 2024
@Lancetnik Lancetnik added this pull request to the merge queue Aug 4, 2024
Merged via the queue into ag2ai:main with commit d705076 Aug 4, 2024
28 of 29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug: starting the broker breaks in-memory behaviour
2 participants