Skip to content

Commit 639db20

Browse files
danielnelsonidohalevi
authored and
idohalevi
committed
Support passive queue declaration in amqp_consumer (influxdata#5831)
1 parent 24c6930 commit 639db20

File tree

5 files changed

+99
-51
lines changed

5 files changed

+99
-51
lines changed

plugins/inputs/amqp_consumer/README.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ The following defaults are known to work with RabbitMQ:
2727
# username = ""
2828
# password = ""
2929

30-
## Exchange to declare and consume from.
30+
## Name of the exchange to declare. If unset, no exchange will be declared.
3131
exchange = "telegraf"
3232

3333
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
@@ -49,7 +49,11 @@ The following defaults are known to work with RabbitMQ:
4949
## AMQP queue durability can be "transient" or "durable".
5050
queue_durability = "durable"
5151

52-
## Binding Key
52+
## If true, queue will be passively declared.
53+
# queue_passive = false
54+
55+
## A binding between the exchange and queue using this binding key is
56+
## created. If unset, no binding is created.
5357
binding_key = "#"
5458

5559
## Maximum number of messages server should give to the worker.

plugins/inputs/amqp_consumer/amqp_consumer.go

+87-47
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type AMQPConsumer struct {
4141
// Queue Name
4242
Queue string `toml:"queue"`
4343
QueueDurability string `toml:"queue_durability"`
44+
QueuePassive bool `toml:"queue_passive"`
4445

4546
// Binding Key
4647
BindingKey string `toml:"binding_key"`
@@ -101,7 +102,7 @@ func (a *AMQPConsumer) SampleConfig() string {
101102
# username = ""
102103
# password = ""
103104
104-
## Exchange to declare and consume from.
105+
## Name of the exchange to declare. If unset, no exchange will be declared.
105106
exchange = "telegraf"
106107
107108
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
@@ -123,7 +124,11 @@ func (a *AMQPConsumer) SampleConfig() string {
123124
## AMQP queue durability can be "transient" or "durable".
124125
queue_durability = "durable"
125126
126-
## Binding Key.
127+
## If true, queue will be passively declared.
128+
# queue_passive = false
129+
130+
## A binding between the exchange and queue using this binding key is
131+
## created. If unset, no binding is created.
127132
binding_key = "#"
128133
129134
## Maximum number of messages server should give to the worker.
@@ -286,59 +291,52 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
286291
return nil, fmt.Errorf("Failed to open a channel: %s", err)
287292
}
288293

289-
var exchangeDurable = true
290-
switch a.ExchangeDurability {
291-
case "transient":
292-
exchangeDurable = false
293-
default:
294-
exchangeDurable = true
295-
}
294+
if a.Exchange != "" {
295+
var exchangeDurable = true
296+
switch a.ExchangeDurability {
297+
case "transient":
298+
exchangeDurable = false
299+
default:
300+
exchangeDurable = true
301+
}
296302

297-
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
298-
for k, v := range a.ExchangeArguments {
299-
exchangeArgs[k] = v
303+
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
304+
for k, v := range a.ExchangeArguments {
305+
exchangeArgs[k] = v
306+
}
307+
308+
err = declareExchange(
309+
ch,
310+
a.Exchange,
311+
a.ExchangeType,
312+
a.ExchangePassive,
313+
exchangeDurable,
314+
exchangeArgs)
315+
if err != nil {
316+
return nil, err
317+
}
300318
}
301319

302-
err = declareExchange(
320+
q, err := declareQueue(
303321
ch,
304-
a.Exchange,
305-
a.ExchangeType,
306-
a.ExchangePassive,
307-
exchangeDurable,
308-
exchangeArgs)
322+
a.Queue,
323+
a.QueueDurability,
324+
a.QueuePassive)
309325
if err != nil {
310326
return nil, err
311327
}
312328

313-
var queueDurable = true
314-
switch a.QueueDurability {
315-
case "transient":
316-
queueDurable = false
317-
default:
318-
queueDurable = true
319-
}
320-
321-
q, err := ch.QueueDeclare(
322-
a.Queue, // queue
323-
queueDurable, // durable
324-
false, // delete when unused
325-
false, // exclusive
326-
false, // no-wait
327-
nil, // arguments
328-
)
329-
if err != nil {
330-
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
331-
}
332-
333-
err = ch.QueueBind(
334-
q.Name, // queue
335-
a.BindingKey, // binding-key
336-
a.Exchange, // exchange
337-
false,
338-
nil,
339-
)
340-
if err != nil {
341-
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
329+
if a.BindingKey != "" {
330+
err = ch.QueueBind(
331+
q.Name, // queue
332+
a.BindingKey, // binding-key
333+
a.Exchange, // exchange
334+
false,
335+
nil,
336+
)
337+
if err != nil {
338+
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
339+
}
342340
}
343341

344342
err = ch.Qos(
@@ -402,6 +400,48 @@ func declareExchange(
402400
return nil
403401
}
404402

403+
func declareQueue(
404+
channel *amqp.Channel,
405+
queueName string,
406+
queueDurability string,
407+
queuePassive bool,
408+
) (*amqp.Queue, error) {
409+
var queue amqp.Queue
410+
var err error
411+
412+
var queueDurable = true
413+
switch queueDurability {
414+
case "transient":
415+
queueDurable = false
416+
default:
417+
queueDurable = true
418+
}
419+
420+
if queuePassive {
421+
queue, err = channel.QueueDeclarePassive(
422+
queueName, // queue
423+
queueDurable, // durable
424+
false, // delete when unused
425+
false, // exclusive
426+
false, // no-wait
427+
nil, // arguments
428+
)
429+
} else {
430+
queue, err = channel.QueueDeclare(
431+
queueName, // queue
432+
queueDurable, // durable
433+
false, // delete when unused
434+
false, // exclusive
435+
false, // no-wait
436+
nil, // arguments
437+
)
438+
}
439+
if err != nil {
440+
return nil, fmt.Errorf("error declaring queue: %v", err)
441+
}
442+
return &queue, nil
443+
}
444+
405445
// Read messages from queue and add them to the Accumulator
406446
func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) {
407447
a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery)

plugins/outputs/amqp/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ For an introduction to AMQP see:
3333
# exchange_type = "topic"
3434

3535
## If true, exchange will be passively declared.
36-
# exchange_declare_passive = false
36+
# exchange_passive = false
3737

3838
## Exchange durability can be either "transient" or "durable".
3939
# exchange_durability = "durable"

plugins/outputs/amqp/amqp.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ var sampleConfig = `
9292
# exchange_type = "topic"
9393
9494
## If true, exchange will be passively declared.
95-
# exchange_declare_passive = false
95+
# exchange_passive = false
9696
9797
## Exchange durability can be either "transient" or "durable".
9898
# exchange_durability = "durable"

plugins/outputs/amqp/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ func Connect(config *ClientConfig) (*client, error) {
7878
}
7979

8080
func (c *client) DeclareExchange() error {
81+
if c.config.exchange == "" {
82+
return nil
83+
}
84+
8185
var err error
8286
if c.config.exchangePassive {
8387
err = c.channel.ExchangeDeclarePassive(

0 commit comments

Comments
 (0)