Skip to content

Commit 233ea99

Browse files
AMQP: added custom delayed queue config (#796)
* AMQP: added custom delayed queue config * added documentation for delayed queue
1 parent 26776f9 commit 233ea99

File tree

5 files changed

+76
-36
lines changed

5 files changed

+76
-36
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ RabbitMQ related configuration. Not necessary if you are using other broker/back
343343
* `QueueBindingArguments`: an optional map of additional arguments used when binding to an AMQP queue
344344
* `BindingKey`: The queue is bind to the exchange with this key, e.g. `machinery_task`
345345
* `PrefetchCount`: How many tasks to prefetch (set to `1` if you have long running tasks)
346+
* `DelayedQueue`: delayed queue name to be used for task retry or delayed task (if empty it will follow auto create and delate delayed queues)
346347

347348
#### DynamoDB
348349

v1/brokers/amqp/amqp.go

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -354,24 +354,47 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
354354
return fmt.Errorf("JSON marshal error: %s", err)
355355
}
356356

357-
// It's necessary to redeclare the queue each time (to zero its TTL timer).
358-
queueName := fmt.Sprintf(
359-
"delay.%d.%s.%s",
360-
delayMs, // delay duration in mileseconds
361-
b.GetConfig().AMQP.Exchange,
362-
signature.RoutingKey, // routing key
363-
)
357+
queueName := b.GetConfig().AMQP.DelayedQueue
364358
declareQueueArgs := amqp.Table{
365359
// Exchange where to send messages after TTL expiration.
366360
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
367361
// Routing key which use when resending expired messages.
368362
"x-dead-letter-routing-key": signature.RoutingKey,
369-
// Time in milliseconds
370-
// after that message will expire and be sent to destination.
371-
"x-message-ttl": delayMs,
372-
// Time after that the queue will be deleted.
373-
"x-expires": delayMs * 2,
374363
}
364+
messageProperties := amqp.Publishing{
365+
Headers: amqp.Table(signature.Headers),
366+
ContentType: "application/json",
367+
Body: message,
368+
DeliveryMode: amqp.Persistent,
369+
Expiration: fmt.Sprint(delayMs),
370+
}
371+
if queueName == "" {
372+
// It's necessary to redeclare the queue each time (to zero its TTL timer).
373+
queueName = fmt.Sprintf(
374+
"delay.%d.%s.%s",
375+
delayMs, // delay duration in mileseconds
376+
b.GetConfig().AMQP.Exchange,
377+
signature.RoutingKey, // routing key
378+
)
379+
declareQueueArgs = amqp.Table{
380+
// Exchange where to send messages after TTL expiration.
381+
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
382+
// Routing key which use when resending expired messages.
383+
"x-dead-letter-routing-key": signature.RoutingKey,
384+
// Time in milliseconds
385+
// after that message will expire and be sent to destination.
386+
"x-message-ttl": delayMs,
387+
// Time after that the queue will be deleted.
388+
"x-expires": delayMs * 2,
389+
}
390+
messageProperties = amqp.Publishing{
391+
Headers: amqp.Table(signature.Headers),
392+
ContentType: "application/json",
393+
Body: message,
394+
DeliveryMode: amqp.Persistent,
395+
}
396+
}
397+
375398
conn, channel, _, _, _, err := b.Connect(
376399
b.GetConfig().Broker,
377400
b.GetConfig().MultipleBrokerSeparator,
@@ -397,12 +420,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
397420
queueName, // routing key
398421
false, // mandatory
399422
false, // immediate
400-
amqp.Publishing{
401-
Headers: amqp.Table(signature.Headers),
402-
ContentType: "application/json",
403-
Body: message,
404-
DeliveryMode: amqp.Persistent,
405-
},
423+
messageProperties,
406424
); err != nil {
407425
return err
408426
}

v1/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type AMQPConfig struct {
8585
BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"`
8686
PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"`
8787
AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"`
88+
DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"`
8889
}
8990

9091
// DynamoDBConfig wraps DynamoDB related configuration

v2/brokers/amqp/amqp.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -354,24 +354,48 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
354354
return fmt.Errorf("JSON marshal error: %s", err)
355355
}
356356

357-
// It's necessary to redeclare the queue each time (to zero its TTL timer).
358-
queueName := fmt.Sprintf(
359-
"delay.%d.%s.%s",
360-
delayMs, // delay duration in mileseconds
361-
b.GetConfig().AMQP.Exchange,
362-
signature.RoutingKey, // routing key
363-
)
357+
queueName := b.GetConfig().AMQP.DelayedQueue
364358
declareQueueArgs := amqp.Table{
365359
// Exchange where to send messages after TTL expiration.
366360
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
367361
// Routing key which use when resending expired messages.
368362
"x-dead-letter-routing-key": signature.RoutingKey,
369-
// Time in milliseconds
370-
// after that message will expire and be sent to destination.
371-
"x-message-ttl": delayMs,
372-
// Time after that the queue will be deleted.
373-
"x-expires": delayMs * 2,
374363
}
364+
messageProperties := amqp.Publishing{
365+
Headers: amqp.Table(signature.Headers),
366+
ContentType: "application/json",
367+
Body: message,
368+
DeliveryMode: amqp.Persistent,
369+
Expiration: fmt.Sprint(delayMs),
370+
}
371+
372+
if queueName == "" {
373+
// It's necessary to redeclare the queue each time (to zero its TTL timer).
374+
queueName = fmt.Sprintf(
375+
"delay.%d.%s.%s",
376+
delayMs, // delay duration in mileseconds
377+
b.GetConfig().AMQP.Exchange,
378+
signature.RoutingKey, // routing key
379+
)
380+
declareQueueArgs = amqp.Table{
381+
// Exchange where to send messages after TTL expiration.
382+
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
383+
// Routing key which use when resending expired messages.
384+
"x-dead-letter-routing-key": signature.RoutingKey,
385+
// Time in milliseconds
386+
// after that message will expire and be sent to destination.
387+
"x-message-ttl": delayMs,
388+
// Time after that the queue will be deleted.
389+
"x-expires": delayMs * 2,
390+
}
391+
messageProperties = amqp.Publishing{
392+
Headers: amqp.Table(signature.Headers),
393+
ContentType: "application/json",
394+
Body: message,
395+
DeliveryMode: amqp.Persistent,
396+
}
397+
}
398+
375399
conn, channel, _, _, _, err := b.Connect(
376400
b.GetConfig().Broker,
377401
b.GetConfig().MultipleBrokerSeparator,
@@ -397,12 +421,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
397421
queueName, // routing key
398422
false, // mandatory
399423
false, // immediate
400-
amqp.Publishing{
401-
Headers: amqp.Table(signature.Headers),
402-
ContentType: "application/json",
403-
Body: message,
404-
DeliveryMode: amqp.Persistent,
405-
},
424+
messageProperties,
406425
); err != nil {
407426
return err
408427
}

v2/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type AMQPConfig struct {
8585
BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"`
8686
PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"`
8787
AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"`
88+
DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"`
8889
}
8990

9091
// DynamoDBConfig wraps DynamoDB related configuration

0 commit comments

Comments
 (0)