Skip to content

Commit 4cf2cfc

Browse files
Add the RabbitMQ consumer (#24)
* Add the RabbitMQ consumer * Fix env var name Co-authored-by: Moisés <[email protected]> * Remove static methods in service * Add queue exceptions file * Update events test cases --------- Co-authored-by: Moisés <[email protected]>
1 parent 4bf14f7 commit 4cf2cfc

File tree

18 files changed

+362
-7
lines changed

18 files changed

+362
-7
lines changed

.env.docker

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
REDIS_URL=redis://redis:6379/0
2-
DATABASE_URL=psql://postgres:postgres@db:5432/postgres
2+
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
3+
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/

.env.sample

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
REDIS_URL=redis://redis:6379/0
2-
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
2+
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
3+
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/

.env.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ TEST=True
22
REDIS_URL=redis://localhost:6379/0
33
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/postgres
44
DATABASE_POOL_CLASS=NullPool
5+
RABBITMQ_AMQP_URL=amqp://guest:guest@localhost:5672/

.github/workflows/ci.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ jobs:
5353
--health-retries 5
5454
ports:
5555
- 5432:5432
56+
rabbitmq:
57+
image: rabbitmq:alpine
58+
ports:
59+
- 5672:5672
60+
options: >-
61+
--health-cmd "rabbitmqctl await_startup"
62+
--health-interval 10s
63+
--health-timeout 5s
64+
--health-retries 5
5665
steps:
5766
- uses: actions/checkout@v4
5867
- name: Set up Python ${{ matrix.python-version }}

app/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ class Settings(BaseSettings):
1414
extra="allow",
1515
case_sensitive=True,
1616
)
17+
TEST: bool = False
1718
REDIS_URL: str = "redis://"
1819
DATABASE_URL: str = "psql://postgres:"
1920
DATABASE_POOL_CLASS: str = "AsyncAdaptedQueuePool"
2021
DATABASE_POOL_SIZE: int = 10
21-
TEST: bool = False
22+
RABBITMQ_AMPQ_URL: str = "amqp://guest:guest@"
23+
RABBITMQ_AMQP_EXCHANGE: str = "safe-transaction-service-events"
24+
RABBITMQ_DECODER_EVENTS_QUEUE_NAME: str = "safe-decoder-service"
2225

2326

2427
settings = Settings()

app/datasources/queue/__init__.py

Whitespace-only changes.

app/datasources/queue/exceptions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
class QueueProviderException(Exception):
2+
"""
3+
Generic exception for QueueProvider errors.
4+
"""
5+
6+
pass
7+
8+
9+
class QueueProviderUnableToConnectException(QueueProviderException):
10+
"""
11+
Raised when a connection to RabbitMQ cannot be established.
12+
"""
13+
14+
pass
15+
16+
17+
class QueueProviderNotConnectedException(QueueProviderException):
18+
"""
19+
Raised when no connection is established.
20+
"""
21+
22+
pass
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
from asyncio import AbstractEventLoop
2+
from typing import Any, Callable
3+
4+
import aio_pika
5+
from aio_pika.abc import (
6+
AbstractExchange,
7+
AbstractIncomingMessage,
8+
AbstractQueue,
9+
AbstractRobustConnection,
10+
ConsumerTag,
11+
ExchangeType,
12+
)
13+
14+
from app.config import settings
15+
16+
from .exceptions import (
17+
QueueProviderNotConnectedException,
18+
QueueProviderUnableToConnectException,
19+
)
20+
21+
22+
class QueueProvider:
23+
24+
_connection: AbstractRobustConnection | None
25+
_exchange: AbstractExchange | None
26+
_events_queue: AbstractQueue | None
27+
28+
def __init__(self) -> None:
29+
"""
30+
Initializes the QueueProvider instance with default values.
31+
"""
32+
self._connection = None
33+
self._exchange = None
34+
self._events_queue = None
35+
36+
async def _connect(self, loop: AbstractEventLoop) -> None:
37+
"""
38+
Establishes a connection to RabbitMQ and sets up the exchange and queue.
39+
40+
:param loop: The asyncio event loop used for the connection.
41+
:return:
42+
"""
43+
try:
44+
self._connection = await aio_pika.connect_robust(
45+
url=settings.RABBITMQ_AMPQ_URL, loop=loop
46+
)
47+
except aio_pika.exceptions.AMQPConnectionError as e:
48+
raise QueueProviderUnableToConnectException(e)
49+
50+
channel = await self._connection.channel()
51+
self._exchange = await channel.declare_exchange(
52+
settings.RABBITMQ_AMQP_EXCHANGE, ExchangeType.FANOUT
53+
)
54+
self._events_queue = await channel.declare_queue(
55+
settings.RABBITMQ_DECODER_EVENTS_QUEUE_NAME, durable=True
56+
)
57+
if self._events_queue:
58+
await self._events_queue.bind(self._exchange)
59+
60+
async def connect(self, loop: AbstractEventLoop) -> None:
61+
"""
62+
Ensures that the RabbitMQ connection is established.
63+
64+
:param loop: The asyncio event loop used to establish the connection.
65+
:return:
66+
"""
67+
if not self._connection:
68+
await self._connect(loop)
69+
70+
def is_connected(self) -> bool:
71+
"""
72+
Verifies if the connection to RabbitMQ is established.
73+
74+
:return: True` if the connection is established, `False` otherwise.
75+
"""
76+
return self._connection is not None
77+
78+
async def disconnect(self) -> None:
79+
"""
80+
Safely closes the RabbitMQ connection and cleans up resources.
81+
82+
:return:
83+
"""
84+
if self._connection:
85+
if self._events_queue and self._exchange:
86+
await self._events_queue.unbind(exchange=self._exchange)
87+
await self._events_queue.delete(if_unused=False, if_empty=False)
88+
await self._connection.close()
89+
self._exchange = None
90+
self._connection = None
91+
self._events_queue = None
92+
93+
async def consume(self, callback: Callable[[str], Any]) -> ConsumerTag:
94+
"""
95+
Starts consuming messages from the declared queue.
96+
97+
- Each message is processed using the provided callback function.
98+
99+
:param callback: A function to process incoming messages.
100+
:return: A tag identifying the active consumer.
101+
:raises QueueProviderNotConnectedException: if no connection or queue is initialized.
102+
"""
103+
if not self._connection or not self._events_queue:
104+
raise QueueProviderNotConnectedException()
105+
106+
async def wrapped_callback(message: AbstractIncomingMessage) -> None:
107+
"""
108+
Wrapper for processing the message and handling ACKs.
109+
110+
:param message: The incoming RabbitMQ message.
111+
"""
112+
await message.ack()
113+
body = message.body
114+
if body:
115+
callback(body.decode("utf-8"))
116+
117+
return await self._events_queue.consume(wrapped_callback)

app/main.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,50 @@
1+
import asyncio
2+
import logging
3+
from contextlib import asynccontextmanager
4+
15
from fastapi import APIRouter, FastAPI
26

37
from . import VERSION
8+
from .datasources.queue.exceptions import QueueProviderUnableToConnectException
9+
from .datasources.queue.queue_provider import QueueProvider
410
from .routers import about, contracts, default
11+
from .services.events import EventsService
12+
13+
14+
@asynccontextmanager
15+
async def lifespan(app: FastAPI):
16+
"""
17+
Define the lifespan of the application:
18+
- Connects to the QueueProvider at startup.
19+
- Disconnects from the QueueProvider at shutdown.
20+
"""
21+
queue_provider = QueueProvider()
22+
consume_task = None
23+
try:
24+
loop = asyncio.get_running_loop()
25+
try:
26+
await queue_provider.connect(loop)
27+
except QueueProviderUnableToConnectException as e:
28+
logging.error(f"Unable to connect to Queue Provider: {e}")
29+
if queue_provider.is_connected():
30+
events_service = EventsService()
31+
consume_task = asyncio.create_task(
32+
queue_provider.consume(events_service.process_event)
33+
)
34+
yield
35+
finally:
36+
if consume_task:
37+
consume_task.cancel()
38+
await queue_provider.disconnect()
39+
540

641
app = FastAPI(
742
title="Safe Decoder Service",
843
description="Safe Core{API} decoder service",
944
version=VERSION,
1045
docs_url=None,
1146
redoc_url=None,
47+
lifespan=lifespan,
1248
)
1349

1450
# Router configuration

app/services/events.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import json
2+
import logging
3+
from typing import Dict
4+
5+
6+
class EventsService:
7+
8+
def process_event(self, message: str) -> None:
9+
"""
10+
Processes the incoming event message.
11+
12+
:param message: The incoming message to process, expected to be a JSON string.
13+
"""
14+
try:
15+
tx_service_event = json.loads(message)
16+
17+
if self.is_event_valid(tx_service_event):
18+
# TODO: process event!
19+
pass
20+
else:
21+
logging.error(
22+
f"Unsupported message. A valid message should have at least 'chainId' and 'type': {message}"
23+
)
24+
except json.JSONDecodeError:
25+
logging.error(f"Unsupported message. Cannot parse as JSON: {message}")
26+
27+
def is_event_valid(self, tx_service_event: Dict) -> bool:
28+
"""
29+
Validates if the event has the required fields 'chainId' and 'type' as strings.
30+
31+
:param tx_service_event: The event object to validate.
32+
:return: True if the event is valid (both 'chainId' and 'type' are strings), False otherwise.
33+
"""
34+
return isinstance(tx_service_event.get("chainId"), str) and isinstance(
35+
tx_service_event.get("type"), str
36+
)

app/tests/datasources/__init__.py

Whitespace-only changes.

app/tests/datasources/queue/__init__.py

Whitespace-only changes.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import asyncio
2+
import unittest
3+
from unittest.mock import patch
4+
5+
import aio_pika
6+
from aio_pika.abc import AbstractRobustConnection
7+
8+
from app.config import settings
9+
from app.datasources.queue.exceptions import QueueProviderUnableToConnectException
10+
from app.datasources.queue.queue_provider import QueueProvider
11+
12+
13+
class TestQueueProviderIntegration(unittest.IsolatedAsyncioTestCase):
14+
async def asyncSetUp(self):
15+
self.provider = QueueProvider()
16+
self.loop = asyncio.get_event_loop()
17+
18+
async def test_connect_success(self):
19+
self.assertFalse(self.provider.is_connected())
20+
await self.provider.connect(self.loop)
21+
self.assertTrue(self.provider.is_connected())
22+
await self.provider.disconnect()
23+
self.assertFalse(self.provider.is_connected())
24+
25+
async def test_connect_failure(self):
26+
provider = QueueProvider()
27+
28+
with patch("app.config.settings.RABBITMQ_AMPQ_URL", "amqp://invalid-url"):
29+
with self.assertRaises(QueueProviderUnableToConnectException):
30+
await provider.connect(self.loop)
31+
32+
async def test_consume(self):
33+
await self.provider.connect(self.loop)
34+
assert isinstance(self.provider._connection, AbstractRobustConnection)
35+
message = "Test message"
36+
channel = await self.provider._connection.channel()
37+
exchange = await channel.declare_exchange(
38+
settings.RABBITMQ_AMQP_EXCHANGE, aio_pika.ExchangeType.FANOUT
39+
)
40+
41+
await exchange.publish(
42+
aio_pika.Message(body=message.encode("utf-8")),
43+
routing_key="",
44+
)
45+
46+
received_messages = []
47+
48+
def callback(message: str):
49+
received_messages.append(message)
50+
51+
await self.provider.consume(callback)
52+
53+
# Wait to make sure the message is consumed.
54+
await asyncio.sleep(1)
55+
56+
self.assertIn(message, received_messages)
57+
await self.provider.disconnect()

app/tests/services/__init__.py

Whitespace-only changes.

app/tests/services/test_events.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import unittest
2+
from unittest.mock import patch
3+
4+
from app.services.events import EventsService
5+
6+
7+
class TestEventsService(unittest.TestCase):
8+
9+
def test_is_event_valid(self):
10+
valid_event = {"chainId": "123", "type": "transaction"}
11+
self.assertTrue(EventsService().is_event_valid(valid_event))
12+
13+
invalid_event_missing_chain_id = {"type": "transaction"}
14+
self.assertFalse(EventsService().is_event_valid(invalid_event_missing_chain_id))
15+
16+
invalid_event_missing_type = {"chainId": "123"}
17+
self.assertFalse(EventsService().is_event_valid(invalid_event_missing_type))
18+
19+
invalid_event_invalid_chain_id = {"chainId": 123, "type": "transaction"}
20+
self.assertFalse(EventsService().is_event_valid(invalid_event_invalid_chain_id))
21+
22+
invalid_event_invalid_type = {"chainId": "123", "type": 123}
23+
self.assertFalse(EventsService().is_event_valid(invalid_event_invalid_type))
24+
25+
@patch("logging.error")
26+
def test_process_event_valid_message(self, mock_log):
27+
valid_message = '{"chainId": "123", "type": "transaction"}'
28+
29+
EventsService().process_event(valid_message)
30+
31+
mock_log.assert_not_called()
32+
33+
@patch("logging.error")
34+
def test_process_event_invalid_json(self, mock_log):
35+
invalid_message = '{"chainId": "123", "type": "transaction"'
36+
37+
EventsService().process_event(invalid_message)
38+
39+
mock_log.assert_called_with(
40+
'Unsupported message. Cannot parse as JSON: {"chainId": "123", "type": "transaction"'
41+
)
42+
43+
invalid_message_invalid_type = '{"chainId": "123", "type": 123}'
44+
45+
EventsService().process_event(invalid_message_invalid_type)
46+
47+
mock_log.assert_called_with(
48+
'Unsupported message. A valid message should have at least \'chainId\' and \'type\': {"chainId": "123", "type": 123}'
49+
)

0 commit comments

Comments
 (0)