Skip to content

Improvement of RabbitMQ plugin #3025 #3252 #4308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,42 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)
- return_unroutable (int, number of unroutable messages)
- return_unroutable_rate (float, number of unroutable messages per second)

- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- disk_free_alarm (int, disk alarm)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- mem_alarm (int, memory a)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)
- uptime (int, milliseconds)
- health_check_status (int, 1 or 0)
- mnesia_disk_tx_count (int, number of disk transaction)
- mnesia_ram_tx_count (int, number of ram transaction)
- mnesia_disk_tx_count_rate (float, number of disk transaction per second)
- mnesia_ram_tx_count_rate (float, number of ram transaction per second)
- gc_num (int, number of garbage collection)
- gc_bytes_reclaimed (int, bytes)
- gc_num_rate (float, number of garbage collection per second)
- gc_bytes_reclaimed_rate (float, bytes per second)
- io_read_avg_time (float, number of read operations)
- io_read_avg_time_rate (int, number of read operations per second)
- io_read_bytes (int, bytes)
- io_read_bytes_rate (float, bytes per second)
- io_write_avg_time (int, milliseconds)
- io_write_avg_time_rate (float, milliseconds per second)
- io_write_bytes (int, bytes)
- io_write_bytes_rate (float, bytes per second)

- rabbitmq_queue
- consumer_utilisation (float, percent)
Expand Down Expand Up @@ -109,7 +131,9 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd

- rabbitmq_exchange
- messages_publish_in (int, count)
- messages_publish_in_rate (int, messages per second)
- messages_publish_out (int, count)
- messages_publish_out_rate (int, messages per second)

### Tags:

Expand All @@ -121,6 +145,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd

- rabbitmq_node
- node
- url

- rabbitmq_queue
- url
Expand Down
209 changes: 153 additions & 56 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,27 @@ type Listeners struct {

// Details ...
type Details struct {
Rate float64
Rate float64 `json:"rate"`
}

// MessageStats ...
type MessageStats struct {
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishInDetails Details `json:"publish_in_details"`
PublishOut int64 `json:"publish_out"`
PublishOutDetails Details `json:"publish_out_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}

// ObjectTotals ...
Expand Down Expand Up @@ -131,18 +135,37 @@ type Queue struct {
type Node struct {
Name string

DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
MemAlarm bool `json:"mem_alarm"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
Uptime int64 `json:"uptime"`
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
GcNum int64 `json:"gc_num"`
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
}

type Exchange struct {
Expand All @@ -155,6 +178,10 @@ type Exchange struct {
AutoDelete bool `json:"auto_delete"`
}

type HealthCheck struct {
Status string `json:"status"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

Expand Down Expand Up @@ -204,6 +231,13 @@ var sampleConfig = `
queue_name_exclude = []
`

func boolToInt(b bool) int64 {
if b {
return 1
}
return 0
}

// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string {
return sampleConfig
Expand Down Expand Up @@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}

var clustering_listeners, amqp_listeners int64 = 0, 0
var clusteringListeners, amqpListeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
clusteringListeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
amqpListeners++
}
}

Expand All @@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
"clustering_listeners": clusteringListeners,
"amqp_listeners": amqpListeners,
"return_unroutable": overview.MessageStats.ReturnUnroutable,
"return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}

func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
nodes := make([]Node, 0)
allNodes := make([]Node, 0)
// Gather information about nodes
err := r.requestJSON("/api/nodes", &nodes)
err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}
now := time.Now()

nodes := make(map[string]Node)
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
}
}

numberNodes := len(nodes)
if numberNodes == 0 {
return
}

type NodeHealthCheck struct {
NodeName string
HealthCheck HealthCheck
Error error
}

healthChecksChannel := make(chan NodeHealthCheck, numberNodes)

for _, node := range nodes {
if !r.shouldGatherNode(node) {
continue
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
var healthCheck HealthCheck

err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeHealthCheck := NodeHealthCheck{
NodeName: nodeName,
Error: err,
HealthCheck: healthCheck,
}

healthChecksChannel <- nodeHealthCheck
}(node.Name, healthChecksChannel)
}

now := time.Now()

for i := 0; i < len(nodes); i++ {
nodeHealthCheck := <-healthChecksChannel

var healthCheckStatus int64 = 0

if nodeHealthCheck.Error != nil {
acc.AddError(nodeHealthCheck.Error)
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}

node := nodes[nodeHealthCheck.NodeName]

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

var running int64 = 0
if node.Running {
running = 1
}

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
Expand Down Expand Up @@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
},
tags,
)
Expand All @@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error {
r.QueueInclude = append(r.QueueInclude, r.Queues...)
}

filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
queueFilter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
if err != nil {
return err
}
r.queueFilter = filter
r.queueFilter = queueFilter

for _, q := range r.QueueExclude {
if q == "*" {
Expand Down
Loading