@@ -72,23 +72,27 @@ type Listeners struct {
72
72
73
73
// Details ...
74
74
type Details struct {
75
- Rate float64
75
+ Rate float64 `json:"rate"`
76
76
}
77
77
78
78
// MessageStats ...
79
79
type MessageStats struct {
80
- Ack int64
81
- AckDetails Details `json:"ack_details"`
82
- Deliver int64
83
- DeliverDetails Details `json:"deliver_details"`
84
- DeliverGet int64 `json:"deliver_get"`
85
- DeliverGetDetails Details `json:"deliver_get_details"`
86
- Publish int64
87
- PublishDetails Details `json:"publish_details"`
88
- Redeliver int64
89
- RedeliverDetails Details `json:"redeliver_details"`
90
- PublishIn int64 `json:"publish_in"`
91
- PublishOut int64 `json:"publish_out"`
80
+ Ack int64
81
+ AckDetails Details `json:"ack_details"`
82
+ Deliver int64
83
+ DeliverDetails Details `json:"deliver_details"`
84
+ DeliverGet int64 `json:"deliver_get"`
85
+ DeliverGetDetails Details `json:"deliver_get_details"`
86
+ Publish int64
87
+ PublishDetails Details `json:"publish_details"`
88
+ Redeliver int64
89
+ RedeliverDetails Details `json:"redeliver_details"`
90
+ PublishIn int64 `json:"publish_in"`
91
+ PublishInDetails Details `json:"publish_in_details"`
92
+ PublishOut int64 `json:"publish_out"`
93
+ PublishOutDetails Details `json:"publish_out_details"`
94
+ ReturnUnroutable int64 `json:"return_unroutable"`
95
+ ReturnUnroutableDetails Details `json:"return_unroutable_details"`
92
96
}
93
97
94
98
// ObjectTotals ...
@@ -131,18 +135,37 @@ type Queue struct {
131
135
type Node struct {
132
136
Name string
133
137
134
- DiskFree int64 `json:"disk_free"`
135
- DiskFreeLimit int64 `json:"disk_free_limit"`
136
- FdTotal int64 `json:"fd_total"`
137
- FdUsed int64 `json:"fd_used"`
138
- MemLimit int64 `json:"mem_limit"`
139
- MemUsed int64 `json:"mem_used"`
140
- ProcTotal int64 `json:"proc_total"`
141
- ProcUsed int64 `json:"proc_used"`
142
- RunQueue int64 `json:"run_queue"`
143
- SocketsTotal int64 `json:"sockets_total"`
144
- SocketsUsed int64 `json:"sockets_used"`
145
- Running bool `json:"running"`
138
+ DiskFree int64 `json:"disk_free"`
139
+ DiskFreeLimit int64 `json:"disk_free_limit"`
140
+ DiskFreeAlarm bool `json:"disk_free_alarm"`
141
+ FdTotal int64 `json:"fd_total"`
142
+ FdUsed int64 `json:"fd_used"`
143
+ MemLimit int64 `json:"mem_limit"`
144
+ MemUsed int64 `json:"mem_used"`
145
+ MemAlarm bool `json:"mem_alarm"`
146
+ ProcTotal int64 `json:"proc_total"`
147
+ ProcUsed int64 `json:"proc_used"`
148
+ RunQueue int64 `json:"run_queue"`
149
+ SocketsTotal int64 `json:"sockets_total"`
150
+ SocketsUsed int64 `json:"sockets_used"`
151
+ Running bool `json:"running"`
152
+ Uptime int64 `json:"uptime"`
153
+ MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
154
+ MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
155
+ MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
156
+ MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
157
+ GcNum int64 `json:"gc_num"`
158
+ GcNumDetails Details `json:"gc_num_details"`
159
+ GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
160
+ GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
161
+ IoReadAvgTime int64 `json:"io_read_avg_time"`
162
+ IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
163
+ IoReadBytes int64 `json:"io_read_bytes"`
164
+ IoReadBytesDetails Details `json:"io_read_bytes_details"`
165
+ IoWriteAvgTime int64 `json:"io_write_avg_time"`
166
+ IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
167
+ IoWriteBytes int64 `json:"io_write_bytes"`
168
+ IoWriteBytesDetails Details `json:"io_write_bytes_details"`
146
169
}
147
170
148
171
type Exchange struct {
@@ -155,6 +178,10 @@ type Exchange struct {
155
178
AutoDelete bool `json:"auto_delete"`
156
179
}
157
180
181
+ type HealthCheck struct {
182
+ Status string `json:"status"`
183
+ }
184
+
158
185
// gatherFunc ...
159
186
type gatherFunc func (r * RabbitMQ , acc telegraf.Accumulator )
160
187
@@ -204,6 +231,13 @@ var sampleConfig = `
204
231
queue_name_exclude = []
205
232
`
206
233
234
+ func boolToInt (b bool ) int64 {
235
+ if b {
236
+ return 1
237
+ }
238
+ return 0
239
+ }
240
+
207
241
// SampleConfig ...
208
242
func (r * RabbitMQ ) SampleConfig () string {
209
243
return sampleConfig
@@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
302
336
return
303
337
}
304
338
305
- var clustering_listeners , amqp_listeners int64 = 0 , 0
339
+ var clusteringListeners , amqpListeners int64 = 0 , 0
306
340
for _ , listener := range overview .Listeners {
307
341
if listener .Protocol == "clustering" {
308
- clustering_listeners ++
342
+ clusteringListeners ++
309
343
} else if listener .Protocol == "amqp" {
310
- amqp_listeners ++
344
+ amqpListeners ++
311
345
}
312
346
}
313
347
@@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
328
362
"messages_delivered" : overview .MessageStats .Deliver ,
329
363
"messages_delivered_get" : overview .MessageStats .DeliverGet ,
330
364
"messages_published" : overview .MessageStats .Publish ,
331
- "clustering_listeners" : clustering_listeners ,
332
- "amqp_listeners" : amqp_listeners ,
365
+ "clustering_listeners" : clusteringListeners ,
366
+ "amqp_listeners" : amqpListeners ,
367
+ "return_unroutable" : overview .MessageStats .ReturnUnroutable ,
368
+ "return_unroutable_rate" : overview .MessageStats .ReturnUnroutableDetails .Rate ,
333
369
}
334
370
acc .AddFields ("rabbitmq_overview" , fields , tags )
335
371
}
336
372
337
373
func gatherNodes (r * RabbitMQ , acc telegraf.Accumulator ) {
338
- nodes := make ([]Node , 0 )
374
+ allNodes := make ([]Node , 0 )
339
375
// Gather information about nodes
340
- err := r .requestJSON ("/api/nodes" , & nodes )
376
+ err := r .requestJSON ("/api/nodes" , & allNodes )
341
377
if err != nil {
342
378
acc .AddError (err )
343
379
return
344
380
}
345
- now := time .Now ()
381
+
382
+ nodes := make (map [string ]Node )
383
+ for _ , node := range allNodes {
384
+ if r .shouldGatherNode (node ) {
385
+ nodes [node .Name ] = node
386
+ }
387
+ }
388
+
389
+ numberNodes := len (nodes )
390
+ if numberNodes == 0 {
391
+ return
392
+ }
393
+
394
+ type NodeHealthCheck struct {
395
+ NodeName string
396
+ HealthCheck HealthCheck
397
+ Error error
398
+ }
399
+
400
+ healthChecksChannel := make (chan NodeHealthCheck , numberNodes )
346
401
347
402
for _ , node := range nodes {
348
- if ! r .shouldGatherNode (node ) {
349
- continue
403
+ go func (nodeName string , healthChecksChannel chan NodeHealthCheck ) {
404
+ var healthCheck HealthCheck
405
+
406
+ err := r .requestJSON ("/api/healthchecks/node/" + nodeName , & healthCheck )
407
+ nodeHealthCheck := NodeHealthCheck {
408
+ NodeName : nodeName ,
409
+ Error : err ,
410
+ HealthCheck : healthCheck ,
411
+ }
412
+
413
+ healthChecksChannel <- nodeHealthCheck
414
+ }(node .Name , healthChecksChannel )
415
+ }
416
+
417
+ now := time .Now ()
418
+
419
+ for i := 0 ; i < len (nodes ); i ++ {
420
+ nodeHealthCheck := <- healthChecksChannel
421
+
422
+ var healthCheckStatus int64 = 0
423
+
424
+ if nodeHealthCheck .Error != nil {
425
+ acc .AddError (nodeHealthCheck .Error )
426
+ } else if nodeHealthCheck .HealthCheck .Status == "ok" {
427
+ healthCheckStatus = 1
350
428
}
351
429
430
+ node := nodes [nodeHealthCheck .NodeName ]
431
+
352
432
tags := map [string ]string {"url" : r .URL }
353
433
tags ["node" ] = node .Name
354
434
355
- var running int64 = 0
356
- if node .Running {
357
- running = 1
358
- }
359
-
360
435
fields := map [string ]interface {}{
361
- "disk_free" : node .DiskFree ,
362
- "disk_free_limit" : node .DiskFreeLimit ,
363
- "fd_total" : node .FdTotal ,
364
- "fd_used" : node .FdUsed ,
365
- "mem_limit" : node .MemLimit ,
366
- "mem_used" : node .MemUsed ,
367
- "proc_total" : node .ProcTotal ,
368
- "proc_used" : node .ProcUsed ,
369
- "run_queue" : node .RunQueue ,
370
- "sockets_total" : node .SocketsTotal ,
371
- "sockets_used" : node .SocketsUsed ,
372
- "running" : running ,
436
+ "disk_free" : node .DiskFree ,
437
+ "disk_free_limit" : node .DiskFreeLimit ,
438
+ "disk_free_alarm" : boolToInt (node .DiskFreeAlarm ),
439
+ "fd_total" : node .FdTotal ,
440
+ "fd_used" : node .FdUsed ,
441
+ "mem_limit" : node .MemLimit ,
442
+ "mem_used" : node .MemUsed ,
443
+ "mem_alarm" : boolToInt (node .MemAlarm ),
444
+ "proc_total" : node .ProcTotal ,
445
+ "proc_used" : node .ProcUsed ,
446
+ "run_queue" : node .RunQueue ,
447
+ "sockets_total" : node .SocketsTotal ,
448
+ "sockets_used" : node .SocketsUsed ,
449
+ "uptime" : node .Uptime ,
450
+ "mnesia_disk_tx_count" : node .MnesiaDiskTxCount ,
451
+ "mnesia_disk_tx_count_rate" : node .MnesiaDiskTxCountDetails .Rate ,
452
+ "mnesia_ram_tx_count" : node .MnesiaRamTxCount ,
453
+ "mnesia_ram_tx_count_rate" : node .MnesiaRamTxCountDetails .Rate ,
454
+ "gc_num" : node .GcNum ,
455
+ "gc_num_rate" : node .GcNumDetails .Rate ,
456
+ "gc_bytes_reclaimed" : node .GcBytesReclaimed ,
457
+ "gc_bytes_reclaimed_rate" : node .GcBytesReclaimedDetails .Rate ,
458
+ "io_read_avg_time" : node .IoReadAvgTime ,
459
+ "io_read_avg_time_rate" : node .IoReadAvgTimeDetails .Rate ,
460
+ "io_read_bytes" : node .IoReadBytes ,
461
+ "io_read_bytes_rate" : node .IoReadBytesDetails .Rate ,
462
+ "io_write_avg_time" : node .IoWriteAvgTime ,
463
+ "io_write_avg_time_rate" : node .IoWriteAvgTimeDetails .Rate ,
464
+ "io_write_bytes" : node .IoWriteBytes ,
465
+ "io_write_bytes_rate" : node .IoWriteBytesDetails .Rate ,
466
+ "running" : boolToInt (node .Running ),
467
+ "health_check_status" : healthCheckStatus ,
373
468
}
374
469
acc .AddFields ("rabbitmq_node" , fields , tags , now )
375
470
}
@@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
459
554
acc .AddFields (
460
555
"rabbitmq_exchange" ,
461
556
map [string ]interface {}{
462
- "messages_publish_in" : exchange .MessageStats .PublishIn ,
463
- "messages_publish_out" : exchange .MessageStats .PublishOut ,
557
+ "messages_publish_in" : exchange .MessageStats .PublishIn ,
558
+ "messages_publish_in_rate" : exchange .MessageStats .PublishInDetails .Rate ,
559
+ "messages_publish_out" : exchange .MessageStats .PublishOut ,
560
+ "messages_publish_out_rate" : exchange .MessageStats .PublishOutDetails .Rate ,
464
561
},
465
562
tags ,
466
563
)
@@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error {
487
584
r .QueueInclude = append (r .QueueInclude , r .Queues ... )
488
585
}
489
586
490
- filter , err := filter .NewIncludeExcludeFilter (r .QueueInclude , r .QueueExclude )
587
+ queueFilter , err := filter .NewIncludeExcludeFilter (r .QueueInclude , r .QueueExclude )
491
588
if err != nil {
492
589
return err
493
590
}
494
- r .queueFilter = filter
591
+ r .queueFilter = queueFilter
495
592
496
593
for _ , q := range r .QueueExclude {
497
594
if q == "*" {
0 commit comments