Skip to content

Commit e37b4cc

Browse files
danielnelsonAlain ROMEYER
authored and
Alain ROMEYER
committed
Add option to amqp output to publish persistent messages (influxdata#3528)
1 parent 1dfbe1c commit e37b4cc

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

plugins/outputs/amqp/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ For an introduction to AMQP see:
2929
## Telegraf tag to use as a routing key
3030
## ie, if this tag exists, its value will be used as the routing key
3131
routing_tag = "host"
32+
## Delivery Mode controls if a published message is persistent
33+
## Valid options are "transient" and "persistent". default: "transient"
34+
# delivery_mode = "transient"
3235
3336
## InfluxDB retention policy
3437
# retention_policy = "default"

plugins/outputs/amqp/amqp.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type AMQP struct {
3939
Precision string
4040
// Connection timeout
4141
Timeout internal.Duration
42+
// Delivery Mode controls if a published message is persistent
43+
// Valid options are "transient" and "persistent". default: "transient"
44+
DeliveryMode string
4245

4346
// Path to CA file
4447
SSLCA string `toml:"ssl_ca"`
@@ -52,7 +55,8 @@ type AMQP struct {
5255
sync.Mutex
5356
c *client
5457

55-
serializer serializers.Serializer
58+
deliveryMode uint8
59+
serializer serializers.Serializer
5660
}
5761

5862
type externalAuth struct{}
@@ -82,6 +86,9 @@ var sampleConfig = `
8286
## Telegraf tag to use as a routing key
8387
## ie, if this tag exists, its value will be used as the routing key
8488
routing_tag = "host"
89+
## Delivery Mode controls if a published message is persistent
90+
## Valid options are "transient" and "persistent". default: "transient"
91+
delivery_mode = "transient"
8592
8693
## InfluxDB retention policy
8794
# retention_policy = "default"
@@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
111118
}
112119

113120
func (q *AMQP) Connect() error {
121+
switch q.DeliveryMode {
122+
case "transient":
123+
q.deliveryMode = amqp.Transient
124+
break
125+
case "persistent":
126+
q.deliveryMode = amqp.Persistent
127+
break
128+
default:
129+
q.deliveryMode = amqp.Transient
130+
break
131+
}
132+
114133
headers := amqp.Table{
115134
"database": q.Database,
116135
"retention_policy": q.RetentionPolicy,
@@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
245264
false, // mandatory
246265
false, // immediate
247266
amqp.Publishing{
248-
Headers: c.headers,
249-
ContentType: "text/plain",
250-
Body: buf,
267+
Headers: c.headers,
268+
ContentType: "text/plain",
269+
Body: buf,
270+
DeliveryMode: q.deliveryMode,
251271
})
252272
if err != nil {
253273
return fmt.Errorf("Failed to send AMQP message: %s", err)

0 commit comments

Comments
 (0)