Skip to content

Add proof of async tasks with dramatic and periodiq #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.docker
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
REDIS_URL=redis://redis:6379/0
DATABASE_URL=psql://postgres:postgres@db:5432/postgres
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_QUEUE_NAME=default
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=psql://postgres:postgres@localhost:5432/postgres
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_QUEUE_NAME=default
3 changes: 3 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=psql://postgres:postgres@localhost:5432/postgres
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_QUEUE_NAME=default
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class Settings(BaseSettings):
)
REDIS_URL: str = "redis://"
DATABASE_URL: str = "psql://postgres:"
RABBITMQ_HOST: str = "rabbitmq"
RABBITMQ_PORT: int = 5672
RABBITMQ_QUEUE_NAME: str = ""


settings = Settings()
Empty file added app/consumers/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions app/consumers/queue_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import aio_pika
from aio_pika.abc import AbstractIncomingMessage

from app.config import settings
from app.workers.tasks import example_task


class QueueConsumer:

@staticmethod
async def process_incoming_message(message: AbstractIncomingMessage) -> None:
await message.ack()
body = message.body
if body:
example_task.send(body.decode())

async def consume(self, loop):
connection = await aio_pika.connect_robust(
host=settings.RABBITMQ_HOST, port=settings.RABBITMQ_PORT, loop=loop
)
channel = await connection.channel()
queue = await channel.declare_queue(settings.RABBITMQ_QUEUE_NAME)
await queue.consume(self.process_incoming_message, no_ack=False)
return connection
10 changes: 10 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio

from fastapi import APIRouter, FastAPI

from . import VERSION
from .consumers.queue_consumer import QueueConsumer
from .routers import about, default

app = FastAPI(
Expand All @@ -18,3 +21,10 @@
api_v1_router.include_router(about.router)
app.include_router(api_v1_router)
app.include_router(default.router)


@app.on_event("startup")
async def startup():
loop = asyncio.get_running_loop()
task = loop.create_task(QueueConsumer().consume(loop))
await task
Empty file added app/workers/__init__.py
Empty file.
28 changes: 28 additions & 0 deletions app/workers/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import time
from datetime import datetime

import dramatiq
from dramatiq.brokers.redis import RedisBroker
from periodiq import PeriodiqMiddleware, cron

from app.config import settings

redis_broker = RedisBroker(url=settings.REDIS_URL)
redis_broker.add_middleware(PeriodiqMiddleware(skip_delay=60))

dramatiq.set_broker(redis_broker)


@dramatiq.actor
def example_task(message: str) -> None:
time.sleep(10) # Network delay simulation
logging.info(f"processed! -> {message}")
return


@dramatiq.actor(periodic=cron("*/2 * * * *"))
def scheduled_example_task() -> None:
time.sleep(10) # Network delay simulation
logging.info(f"processed with crontab! at {datetime.now()}")
return
29 changes: 28 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ services:
command:
- --appendonly yes

rabbitmq:
image: rabbitmq:alpine
ports:
- "5672:5672"
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 30s
retries: 5

dramatiq-worker:
build:
context: .
dockerfile: docker/web/Dockerfile
env_file:
- .env
environment:
RABBITMQ_QUEUE_NAME: "default"
depends_on:
- redis
command:
docker/web/dramatiq/worker/run.sh

web:
build:
context: .
Expand All @@ -43,4 +66,8 @@ services:
- "8888:8888"
volumes:
- nginx-shared:/nginx
command: docker/web/run_web.sh
command:
docker/web/run_web.sh
depends_on:
rabbitmq:
condition: service_healthy
8 changes: 8 additions & 0 deletions docker/web/dramatiq/worker/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

set -euo pipefail

dramatiq app.workers.tasks & # dramatiq async actors
periodiq -v app.workers.tasks & # cron dramatiq async actors

wait
3 changes: 2 additions & 1 deletion docker/web/run_web.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ rm -rf $DOCKER_SHARED_DIR/*
cp -r static/ $DOCKER_SHARED_DIR/

echo "==> $(date +%H:%M:%S) ==> Running Uvicorn... "
exec uvicorn app.main:app --host 0.0.0.0 --port 8888 --proxy-headers --uds $DOCKER_SHARED_DIR/uvicorn.socket
exec uvicorn app.main:app --host 0.0.0.0 --port 8888 --proxy-headers --uds $DOCKER_SHARED_DIR/uvicorn.socket

4 changes: 4 additions & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
aio-pika==9.5.1
dramatiq[redis, watch]==1.17.1
fastapi[all]==0.115.5
periodiq==0.13.0
pika==1.3.2
pydantic-settings==2.6.1
redis[hiredis]==5.2.0
12 changes: 12 additions & 0 deletions scripts/send_test_message_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import random

import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost", port=5672),
)
channel = connection.channel()
channel.queue_declare(queue="default")
channel.basic_publish(
exchange="", routing_key="default", body=f"message with ID-{random.randint(1, 100)}"
)
Loading