Skip to content

Commit acd769d

Browse files
committed
Improvement of RabbitMQ plugin influxdata#3025 influxdata#3252
* new metrics: * unroutable messages * node uptime * gc metrics * mnesia metrics * node healthcheck * IO metrics * refactoring tests: * moved the json examples to a separate files * check metric values Signed-off-by: Vitalii Solodilov <[email protected]>
1 parent 7781507 commit acd769d

File tree

7 files changed

+564
-590
lines changed

7 files changed

+564
-590
lines changed

plugins/inputs/rabbitmq/rabbitmq.go

+162-65
Original file line numberDiff line numberDiff line change
@@ -72,23 +72,27 @@ type Listeners struct {
7272

7373
// Details ...
7474
type Details struct {
75-
Rate float64
75+
Rate float64 `json:"rate"`
7676
}
7777

7878
// MessageStats ...
7979
type MessageStats struct {
80-
Ack int64
81-
AckDetails Details `json:"ack_details"`
82-
Deliver int64
83-
DeliverDetails Details `json:"deliver_details"`
84-
DeliverGet int64 `json:"deliver_get"`
85-
DeliverGetDetails Details `json:"deliver_get_details"`
86-
Publish int64
87-
PublishDetails Details `json:"publish_details"`
88-
Redeliver int64
89-
RedeliverDetails Details `json:"redeliver_details"`
90-
PublishIn int64 `json:"publish_in"`
91-
PublishOut int64 `json:"publish_out"`
80+
Ack int64
81+
AckDetails Details `json:"ack_details"`
82+
Deliver int64
83+
DeliverDetails Details `json:"deliver_details"`
84+
DeliverGet int64 `json:"deliver_get"`
85+
DeliverGetDetails Details `json:"deliver_get_details"`
86+
Publish int64
87+
PublishDetails Details `json:"publish_details"`
88+
Redeliver int64
89+
RedeliverDetails Details `json:"redeliver_details"`
90+
PublishIn int64 `json:"publish_in"`
91+
PublishInDetails Details `json:"publish_in_details"`
92+
PublishOut int64 `json:"publish_out"`
93+
PublishOutDetails Details `json:"publish_out_details"`
94+
ReturnUnroutable int64 `json:"return_unroutable"`
95+
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
9296
}
9397

9498
// ObjectTotals ...
@@ -114,45 +118,68 @@ type QueueTotals struct {
114118

115119
// Queue ...
116120
type Queue struct {
117-
QueueTotals // just to not repeat the same code
118-
MessageStats `json:"message_stats"`
121+
QueueTotals // just to not repeat the same code
122+
MessageStats `json:"message_stats"`
119123
Memory int64
120124
Consumers int64
121125
ConsumerUtilisation float64 `json:"consumer_utilisation"`
122126
Name string
123127
Node string
124128
Vhost string
125129
Durable bool
126-
AutoDelete bool `json:"auto_delete"`
127-
IdleSince string `json:"idle_since"`
130+
AutoDelete bool `json:"auto_delete"`
131+
IdleSince string `json:"idle_since"`
128132
}
129133

130134
// Node ...
131135
type Node struct {
132136
Name string
133137

134-
DiskFree int64 `json:"disk_free"`
135-
DiskFreeLimit int64 `json:"disk_free_limit"`
136-
FdTotal int64 `json:"fd_total"`
137-
FdUsed int64 `json:"fd_used"`
138-
MemLimit int64 `json:"mem_limit"`
139-
MemUsed int64 `json:"mem_used"`
140-
ProcTotal int64 `json:"proc_total"`
141-
ProcUsed int64 `json:"proc_used"`
142-
RunQueue int64 `json:"run_queue"`
143-
SocketsTotal int64 `json:"sockets_total"`
144-
SocketsUsed int64 `json:"sockets_used"`
145-
Running bool `json:"running"`
138+
DiskFree int64 `json:"disk_free"`
139+
DiskFreeLimit int64 `json:"disk_free_limit"`
140+
DiskFreeAlarm bool `json:"disk_free_alarm"`
141+
FdTotal int64 `json:"fd_total"`
142+
FdUsed int64 `json:"fd_used"`
143+
MemLimit int64 `json:"mem_limit"`
144+
MemUsed int64 `json:"mem_used"`
145+
MemAlarm bool `json:"mem_alarm"`
146+
ProcTotal int64 `json:"proc_total"`
147+
ProcUsed int64 `json:"proc_used"`
148+
RunQueue int64 `json:"run_queue"`
149+
SocketsTotal int64 `json:"sockets_total"`
150+
SocketsUsed int64 `json:"sockets_used"`
151+
Running bool `json:"running"`
152+
Uptime int64 `json:"uptime"`
153+
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
154+
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
155+
MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
156+
MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
157+
GcNum int64 `json:"gc_num"`
158+
GcNumDetails Details `json:"gc_num_details"`
159+
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
160+
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
161+
IoReadAvgTime int64 `json:"io_read_avg_time"`
162+
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
163+
IoReadBytes int64 `json:"io_read_bytes"`
164+
IoReadBytesDetails Details `json:"io_read_bytes_details"`
165+
IoWriteAvgTime int64 `json:"io_write_avg_time"`
166+
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
167+
IoWriteBytes int64 `json:"io_write_bytes"`
168+
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
146169
}
147170

148171
type Exchange struct {
149-
Name string
150-
MessageStats `json:"message_stats"`
151-
Type string
152-
Internal bool
153-
Vhost string
154-
Durable bool
155-
AutoDelete bool `json:"auto_delete"`
172+
Name string
173+
MessageStats `json:"message_stats"`
174+
Type string
175+
Internal bool
176+
Vhost string
177+
Durable bool
178+
AutoDelete bool `json:"auto_delete"`
179+
}
180+
181+
type HealthCheck struct {
182+
Status string `json:"status"`
156183
}
157184

158185
// gatherFunc ...
@@ -204,6 +231,13 @@ var sampleConfig = `
204231
queue_name_exclude = []
205232
`
206233

234+
func boolToInt(b bool) int64 {
235+
if b {
236+
return 1
237+
}
238+
return 0
239+
}
240+
207241
// SampleConfig ...
208242
func (r *RabbitMQ) SampleConfig() string {
209243
return sampleConfig
@@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
302336
return
303337
}
304338

305-
var clustering_listeners, amqp_listeners int64 = 0, 0
339+
var clusteringListeners, amqpListeners int64 = 0, 0
306340
for _, listener := range overview.Listeners {
307341
if listener.Protocol == "clustering" {
308-
clustering_listeners++
342+
clusteringListeners++
309343
} else if listener.Protocol == "amqp" {
310-
amqp_listeners++
344+
amqpListeners++
311345
}
312346
}
313347

@@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
328362
"messages_delivered": overview.MessageStats.Deliver,
329363
"messages_delivered_get": overview.MessageStats.DeliverGet,
330364
"messages_published": overview.MessageStats.Publish,
331-
"clustering_listeners": clustering_listeners,
332-
"amqp_listeners": amqp_listeners,
365+
"clustering_listeners": clusteringListeners,
366+
"amqp_listeners": amqpListeners,
367+
"return_unroutable": overview.MessageStats.ReturnUnroutable,
368+
"return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
333369
}
334370
acc.AddFields("rabbitmq_overview", fields, tags)
335371
}
336372

337373
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
338-
nodes := make([]Node, 0)
374+
allNodes := make([]Node, 0)
339375
// Gather information about nodes
340-
err := r.requestJSON("/api/nodes", &nodes)
376+
err := r.requestJSON("/api/nodes", &allNodes)
341377
if err != nil {
342378
acc.AddError(err)
343379
return
344380
}
345-
now := time.Now()
381+
382+
nodes := make(map[string]Node)
383+
for _, node := range allNodes {
384+
if r.shouldGatherNode(node) {
385+
nodes[node.Name] = node
386+
}
387+
}
388+
389+
numberNodes := len(nodes)
390+
if numberNodes == 0 {
391+
return
392+
}
393+
394+
type NodeHealthCheck struct {
395+
NodeName string
396+
HealthCheck HealthCheck
397+
Error error
398+
}
399+
400+
healthChecksChannel := make(chan NodeHealthCheck, numberNodes)
346401

347402
for _, node := range nodes {
348-
if !r.shouldGatherNode(node) {
349-
continue
403+
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
404+
var healthCheck HealthCheck
405+
406+
err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
407+
nodeHealthCheck := NodeHealthCheck{
408+
NodeName: nodeName,
409+
Error: err,
410+
HealthCheck: healthCheck,
411+
}
412+
413+
healthChecksChannel <- nodeHealthCheck
414+
}(node.Name, healthChecksChannel)
415+
}
416+
417+
now := time.Now()
418+
419+
for i := 0; i < len(nodes); i++ {
420+
nodeHealthCheck := <-healthChecksChannel
421+
422+
var healthCheckStatus int64 = 0
423+
424+
if nodeHealthCheck.Error != nil {
425+
acc.AddError(nodeHealthCheck.Error)
426+
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
427+
healthCheckStatus = 1
350428
}
351429

430+
node := nodes[nodeHealthCheck.NodeName]
431+
352432
tags := map[string]string{"url": r.URL}
353433
tags["node"] = node.Name
354434

355-
var running int64 = 0
356-
if node.Running {
357-
running = 1
358-
}
359-
360435
fields := map[string]interface{}{
361-
"disk_free": node.DiskFree,
362-
"disk_free_limit": node.DiskFreeLimit,
363-
"fd_total": node.FdTotal,
364-
"fd_used": node.FdUsed,
365-
"mem_limit": node.MemLimit,
366-
"mem_used": node.MemUsed,
367-
"proc_total": node.ProcTotal,
368-
"proc_used": node.ProcUsed,
369-
"run_queue": node.RunQueue,
370-
"sockets_total": node.SocketsTotal,
371-
"sockets_used": node.SocketsUsed,
372-
"running": running,
436+
"disk_free": node.DiskFree,
437+
"disk_free_limit": node.DiskFreeLimit,
438+
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
439+
"fd_total": node.FdTotal,
440+
"fd_used": node.FdUsed,
441+
"mem_limit": node.MemLimit,
442+
"mem_used": node.MemUsed,
443+
"mem_alarm": boolToInt(node.MemAlarm),
444+
"proc_total": node.ProcTotal,
445+
"proc_used": node.ProcUsed,
446+
"run_queue": node.RunQueue,
447+
"sockets_total": node.SocketsTotal,
448+
"sockets_used": node.SocketsUsed,
449+
"uptime": node.Uptime,
450+
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
451+
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
452+
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
453+
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
454+
"gc_num": node.GcNum,
455+
"gc_num_rate": node.GcNumDetails.Rate,
456+
"gc_bytes_reclaimed": node.GcBytesReclaimed,
457+
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
458+
"io_read_avg_time": node.IoReadAvgTime,
459+
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
460+
"io_read_bytes": node.IoReadBytes,
461+
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
462+
"io_write_avg_time": node.IoWriteAvgTime,
463+
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
464+
"io_write_bytes": node.IoWriteBytes,
465+
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
466+
"running": boolToInt(node.Running),
467+
"health_check_status": healthCheckStatus,
373468
}
374469
acc.AddFields("rabbitmq_node", fields, tags, now)
375470
}
@@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
459554
acc.AddFields(
460555
"rabbitmq_exchange",
461556
map[string]interface{}{
462-
"messages_publish_in": exchange.MessageStats.PublishIn,
463-
"messages_publish_out": exchange.MessageStats.PublishOut,
557+
"messages_publish_in": exchange.MessageStats.PublishIn,
558+
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
559+
"messages_publish_out": exchange.MessageStats.PublishOut,
560+
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
464561
},
465562
tags,
466563
)

0 commit comments

Comments
 (0)