Skip to content

Commit 47098c0

Browse files
jacqueshdanielnelson
authored andcommitted
Add gathering of RabbitMQ federation link metrics (influxdata#6283)
1 parent 83cc953 commit 47098c0

File tree

4 files changed

+228
-7
lines changed

4 files changed

+228
-7
lines changed

plugins/inputs/rabbitmq/README.md

+25
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
4848
## specified, metrics for all exchanges are gathered.
4949
# exchanges = ["telegraf"]
5050

51+
## A list of federation upstreams to gather as the rabbitmq_federation measurement.
52+
## If not specified, metrics for all federation upstreams are gathered.
53+
## Federation link metrics will only be gathered for queues and exchanges
54+
## whose non-federation metrics will be collected (e.g a queue excluded
55+
## by the 'queue_name_exclude' option will also be excluded from federation).
56+
# federation_upstreams = ["dataCentre2"]
57+
5158
## Queues to include and exclude. Globs accepted.
5259
## Note that an empty array for both will include all queues
5360
# queue_name_include = []
@@ -158,6 +165,16 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
158165
- messages_publish_out (int, count)
159166
- messages_publish_out_rate (int, messages per second)
160167

168+
- rabbitmq_federation
169+
- acks_uncommitted (int, count)
170+
- consumers (int, count)
171+
- messages_unacknowledged (int, count)
172+
- messages_uncommitted (int, count)
173+
- messages_unconfirmed (int, count)
174+
- messages_confirm (int, count)
175+
- messages_publish (int, count)
176+
- messages_return_unroutable (int, count)
177+
161178
### Tags:
162179

163180
- All measurements have the following tags:
@@ -187,6 +204,14 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
187204
- durable
188205
- auto_delete
189206

207+
- rabbitmq_federation
208+
- url
209+
- vhost
210+
- type
211+
- upstream
212+
- local_entity
213+
- upstream_entity
214+
190215
### Sample Queries:
191216

192217
Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:

plugins/inputs/rabbitmq/rabbitmq.go

+126-7
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,17 @@ type RabbitMQ struct {
4747
Queues []string
4848
Exchanges []string
4949

50-
QueueInclude []string `toml:"queue_name_include"`
51-
QueueExclude []string `toml:"queue_name_exclude"`
50+
QueueInclude []string `toml:"queue_name_include"`
51+
QueueExclude []string `toml:"queue_name_exclude"`
52+
FederationUpstreamInclude []string `toml:"federation_upstream_include"`
53+
FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`
5254

5355
Client *http.Client
5456

5557
filterCreated bool
5658
excludeEveryQueue bool
5759
queueFilter filter.Filter
60+
upstreamFilter filter.Filter
5861
}
5962

6063
// OverviewResponse ...
@@ -178,6 +181,38 @@ type Exchange struct {
178181
AutoDelete bool `json:"auto_delete"`
179182
}
180183

184+
// FederationLinkChannelMessageStats ...
185+
type FederationLinkChannelMessageStats struct {
186+
Confirm int64 `json:"confirm"`
187+
ConfirmDetails Details `json:"confirm_details"`
188+
Publish int64 `json:"publish"`
189+
PublishDetails Details `json:"publish_details"`
190+
ReturnUnroutable int64 `json:"return_unroutable"`
191+
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
192+
}
193+
194+
// FederationLinkChannel ...
195+
type FederationLinkChannel struct {
196+
AcksUncommitted int64 `json:"acks_uncommitted"`
197+
ConsumerCount int64 `json:"consumer_count"`
198+
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
199+
MessagesUncommitted int64 `json:"messages_uncommitted"`
200+
MessagesUnconfirmed int64 `json:"messages_unconfirmed"`
201+
MessageStats FederationLinkChannelMessageStats `json:"message_stats"`
202+
}
203+
204+
// FederationLink ...
205+
type FederationLink struct {
206+
Type string `json:"type"`
207+
Queue string `json:"queue"`
208+
UpstreamQueue string `json:"upstream_queue"`
209+
Exchange string `json:"exchange"`
210+
UpstreamExchange string `json:"upstream_exchange"`
211+
Vhost string `json:"vhost"`
212+
Upstream string `json:"upstream"`
213+
LocalChannel FederationLinkChannel `json:"local_channel"`
214+
}
215+
181216
type HealthCheck struct {
182217
Status string `json:"status"`
183218
}
@@ -214,7 +249,7 @@ type Memory struct {
214249
// gatherFunc ...
215250
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
216251

217-
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}
252+
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges, gatherFederationLinks}
218253

219254
var sampleConfig = `
220255
## Management Plugin url. (default: http://localhost:15672)
@@ -258,6 +293,15 @@ var sampleConfig = `
258293
## Note that an empty array for both will include all queues
259294
queue_name_include = []
260295
queue_name_exclude = []
296+
297+
## Federation upstreams include and exclude when gathering the rabbitmq_federation measurement.
298+
## If neither are specified, metrics for all federation upstreams are gathered.
299+
## Federation link metrics will only be gathered for queues and exchanges
300+
## whose non-federation metrics will be collected (e.g a queue excluded
301+
## by the 'queue_name_exclude' option will also be excluded from federation).
302+
## Globs accepted.
303+
# federation_upstream_include = ["dataCentre-*"]
304+
# federation_upstream_exclude = []
261305
`
262306

263307
func boolToInt(b bool) int64 {
@@ -294,12 +338,16 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
294338
}
295339
}
296340

297-
// Create queue filter if not already created
341+
// Create gather filters if not already created
298342
if !r.filterCreated {
299343
err := r.createQueueFilter()
300344
if err != nil {
301345
return err
302346
}
347+
err = r.createUpstreamFilter()
348+
if err != nil {
349+
return err
350+
}
303351
r.filterCreated = true
304352
}
305353

@@ -598,7 +646,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
598646
}
599647

600648
for _, exchange := range exchanges {
601-
if !r.shouldGatherExchange(exchange) {
649+
if !r.shouldGatherExchange(exchange.Name) {
602650
continue
603651
}
604652
tags := map[string]string{
@@ -624,6 +672,52 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
624672
}
625673
}
626674

675+
func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
676+
// Gather information about federation links
677+
federationLinks := make([]FederationLink, 0)
678+
err := r.requestJSON("/api/federation-links", &federationLinks)
679+
if err != nil {
680+
acc.AddError(err)
681+
return
682+
}
683+
684+
for _, link := range federationLinks {
685+
if !r.shouldGatherFederationLink(link) {
686+
continue
687+
}
688+
689+
tags := map[string]string{
690+
"url": r.URL,
691+
"type": link.Type,
692+
"vhost": link.Vhost,
693+
"upstream": link.Upstream,
694+
}
695+
696+
if link.Type == "exchange" {
697+
tags["exchange"] = link.Exchange
698+
tags["upstream_exchange"] = link.UpstreamExchange
699+
} else {
700+
tags["queue"] = link.Queue
701+
tags["upstream_queue"] = link.UpstreamQueue
702+
}
703+
704+
acc.AddFields(
705+
"rabbitmq_federation",
706+
map[string]interface{}{
707+
"acks_uncommitted": link.LocalChannel.AcksUncommitted,
708+
"consumers": link.LocalChannel.ConsumerCount,
709+
"messages_unacknowledged": link.LocalChannel.MessagesUnacknowledged,
710+
"messages_uncommitted": link.LocalChannel.MessagesUncommitted,
711+
"messages_unconfirmed": link.LocalChannel.MessagesUnconfirmed,
712+
"messages_confirm": link.LocalChannel.MessageStats.Confirm,
713+
"messages_publish": link.LocalChannel.MessageStats.Publish,
714+
"messages_return_unroutable": link.LocalChannel.MessageStats.ReturnUnroutable,
715+
},
716+
tags,
717+
)
718+
}
719+
}
720+
627721
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
628722
if len(r.Nodes) == 0 {
629723
return true
@@ -659,20 +753,45 @@ func (r *RabbitMQ) createQueueFilter() error {
659753
return nil
660754
}
661755

662-
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
756+
func (r *RabbitMQ) createUpstreamFilter() error {
757+
upstreamFilter, err := filter.NewIncludeExcludeFilter(r.FederationUpstreamInclude, r.FederationUpstreamExclude)
758+
if err != nil {
759+
return err
760+
}
761+
r.upstreamFilter = upstreamFilter
762+
763+
return nil
764+
}
765+
766+
func (r *RabbitMQ) shouldGatherExchange(exchangeName string) bool {
663767
if len(r.Exchanges) == 0 {
664768
return true
665769
}
666770

667771
for _, name := range r.Exchanges {
668-
if name == exchange.Name {
772+
if name == exchangeName {
669773
return true
670774
}
671775
}
672776

673777
return false
674778
}
675779

780+
func (r *RabbitMQ) shouldGatherFederationLink(link FederationLink) bool {
781+
if !r.upstreamFilter.Match(link.Upstream) {
782+
return false
783+
}
784+
785+
switch link.Type {
786+
case "exchange":
787+
return r.shouldGatherExchange(link.Exchange)
788+
case "queue":
789+
return r.queueFilter.Match(link.Queue)
790+
default:
791+
return false
792+
}
793+
}
794+
676795
func init() {
677796
inputs.Add("rabbitmq", func() telegraf.Input {
678797
return &RabbitMQ{

plugins/inputs/rabbitmq/rabbitmq_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
2828
jsonFilePath = "testdata/exchanges.json"
2929
case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64":
3030
jsonFilePath = "testdata/healthchecks.json"
31+
case "/api/federation-links":
32+
jsonFilePath = "testdata/federation-links.json"
3133
case "/api/nodes/rabbit@vagrant-ubuntu-trusty-64/memory":
3234
jsonFilePath = "testdata/memory.json"
3335
default:
@@ -162,6 +164,18 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
162164
"messages_publish_out_rate": 5.1,
163165
}
164166
compareMetrics(t, exchangeMetrics, acc, "rabbitmq_exchange")
167+
168+
federationLinkMetrics := map[string]interface{}{
169+
"acks_uncommitted": 1,
170+
"consumers": 2,
171+
"messages_unacknowledged": 3,
172+
"messages_uncommitted": 4,
173+
"messages_unconfirmed": 5,
174+
"messages_confirm": 67,
175+
"messages_publish": 890,
176+
"messages_return_unroutable": 1,
177+
}
178+
compareMetrics(t, federationLinkMetrics, acc, "rabbitmq_federation")
165179
}
166180

167181
func compareMetrics(t *testing.T, expectedMetrics map[string]interface{},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
[
2+
{
3+
"node": "rabbit@rmqlocal",
4+
"queue": "exampleLocalQueue",
5+
"upstream_queue": "exampleUpstreamQueue",
6+
"type": "queue",
7+
"vhost": "/",
8+
"upstream": "ExampleFederationUpstream",
9+
"id": "8ba5218f",
10+
"status": "running",
11+
"local_connection": "<rabbit@somehost>",
12+
"uri": "amqp://appsv03",
13+
"timestamp": "2019-08-19 15:34:15",
14+
"local_channel": {
15+
"acks_uncommitted": 1,
16+
"confirm": true,
17+
"connection_details": {
18+
"name": "<rabbit@somehost>",
19+
"peer_host": "undefined",
20+
"peer_port": "undefined"
21+
},
22+
"consumer_count": 2,
23+
"garbage_collection": {
24+
"fullsweep_after": 65535,
25+
"max_heap_size": 0,
26+
"min_bin_vheap_size": 46422,
27+
"min_heap_size": 233,
28+
"minor_gcs": 203
29+
},
30+
"global_prefetch_count": 0,
31+
"message_stats": {
32+
"confirm": 67,
33+
"confirm_details": {
34+
"rate": 2
35+
},
36+
"publish": 890,
37+
"publish_details": {
38+
"rate": 2
39+
},
40+
"return_unroutable": 1,
41+
"return_unroutable_details": {
42+
"rate": 0.1
43+
}
44+
},
45+
"messages_unacknowledged": 3,
46+
"messages_uncommitted": 4,
47+
"messages_unconfirmed": 5,
48+
"name": "<rabbit@somehost>",
49+
"node": "rabbit@rmqlocal",
50+
"number": 1,
51+
"prefetch_count": 0,
52+
"reductions": 1926653,
53+
"reductions_details": {
54+
"rate": 1068
55+
},
56+
"state": "running",
57+
"transactional": false,
58+
"user": "none",
59+
"user_who_performed_action": "none",
60+
"vhost": "sorandomsorandom"
61+
}
62+
}
63+
]

0 commit comments

Comments
 (0)