@@ -72,23 +72,27 @@ type Listeners struct {
72
72
73
73
// Details ...
74
74
type Details struct {
75
- Rate float64
75
+ Rate float64 `json:"rate"`
76
76
}
77
77
78
78
// MessageStats ...
79
79
type MessageStats struct {
80
- Ack int64
81
- AckDetails Details `json:"ack_details"`
82
- Deliver int64
83
- DeliverDetails Details `json:"deliver_details"`
84
- DeliverGet int64 `json:"deliver_get"`
85
- DeliverGetDetails Details `json:"deliver_get_details"`
86
- Publish int64
87
- PublishDetails Details `json:"publish_details"`
88
- Redeliver int64
89
- RedeliverDetails Details `json:"redeliver_details"`
90
- PublishIn int64 `json:"publish_in"`
91
- PublishOut int64 `json:"publish_out"`
80
+ Ack int64
81
+ AckDetails Details `json:"ack_details"`
82
+ Deliver int64
83
+ DeliverDetails Details `json:"deliver_details"`
84
+ DeliverGet int64 `json:"deliver_get"`
85
+ DeliverGetDetails Details `json:"deliver_get_details"`
86
+ Publish int64
87
+ PublishDetails Details `json:"publish_details"`
88
+ Redeliver int64
89
+ RedeliverDetails Details `json:"redeliver_details"`
90
+ PublishIn int64 `json:"publish_in"`
91
+ PublishInDetails Details `json:"publish_in_details"`
92
+ PublishOut int64 `json:"publish_out"`
93
+ PublishOutDetails Details `json:"publish_out_details"`
94
+ ReturnUnroutable int64 `json:"return_unroutable"`
95
+ ReturnUnroutableDetails Details `json:"return_unroutable_details"`
92
96
}
93
97
94
98
// ObjectTotals ...
@@ -114,45 +118,68 @@ type QueueTotals struct {
114
118
115
119
// Queue ...
116
120
type Queue struct {
117
- QueueTotals // just to not repeat the same code
118
- MessageStats `json:"message_stats"`
121
+ QueueTotals // just to not repeat the same code
122
+ MessageStats `json:"message_stats"`
119
123
Memory int64
120
124
Consumers int64
121
125
ConsumerUtilisation float64 `json:"consumer_utilisation"`
122
126
Name string
123
127
Node string
124
128
Vhost string
125
129
Durable bool
126
- AutoDelete bool `json:"auto_delete"`
127
- IdleSince string `json:"idle_since"`
130
+ AutoDelete bool `json:"auto_delete"`
131
+ IdleSince string `json:"idle_since"`
128
132
}
129
133
130
134
// Node ...
131
135
type Node struct {
132
136
Name string
133
137
134
- DiskFree int64 `json:"disk_free"`
135
- DiskFreeLimit int64 `json:"disk_free_limit"`
136
- FdTotal int64 `json:"fd_total"`
137
- FdUsed int64 `json:"fd_used"`
138
- MemLimit int64 `json:"mem_limit"`
139
- MemUsed int64 `json:"mem_used"`
140
- ProcTotal int64 `json:"proc_total"`
141
- ProcUsed int64 `json:"proc_used"`
142
- RunQueue int64 `json:"run_queue"`
143
- SocketsTotal int64 `json:"sockets_total"`
144
- SocketsUsed int64 `json:"sockets_used"`
145
- Running bool `json:"running"`
138
+ DiskFree int64 `json:"disk_free"`
139
+ DiskFreeLimit int64 `json:"disk_free_limit"`
140
+ DiskFreeAlarm bool `json:"disk_free_alarm"`
141
+ FdTotal int64 `json:"fd_total"`
142
+ FdUsed int64 `json:"fd_used"`
143
+ MemLimit int64 `json:"mem_limit"`
144
+ MemUsed int64 `json:"mem_used"`
145
+ MemAlarm bool `json:"mem_alarm"`
146
+ ProcTotal int64 `json:"proc_total"`
147
+ ProcUsed int64 `json:"proc_used"`
148
+ RunQueue int64 `json:"run_queue"`
149
+ SocketsTotal int64 `json:"sockets_total"`
150
+ SocketsUsed int64 `json:"sockets_used"`
151
+ Running bool `json:"running"`
152
+ Uptime int64 `json:"uptime"`
153
+ MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
154
+ MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
155
+ MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
156
+ MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
157
+ GcNum int64 `json:"gc_num"`
158
+ GcNumDetails Details `json:"gc_num_details"`
159
+ GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
160
+ GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
161
+ IoReadAvgTime int64 `json:"io_read_avg_time"`
162
+ IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
163
+ IoReadBytes int64 `json:"io_read_bytes"`
164
+ IoReadBytesDetails Details `json:"io_read_bytes_details"`
165
+ IoWriteAvgTime int64 `json:"io_write_avg_time"`
166
+ IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
167
+ IoWriteBytes int64 `json:"io_write_bytes"`
168
+ IoWriteBytesDetails Details `json:"io_write_bytes_details"`
146
169
}
147
170
148
171
type Exchange struct {
149
- Name string
150
- MessageStats `json:"message_stats"`
151
- Type string
152
- Internal bool
153
- Vhost string
154
- Durable bool
155
- AutoDelete bool `json:"auto_delete"`
172
+ Name string
173
+ MessageStats `json:"message_stats"`
174
+ Type string
175
+ Internal bool
176
+ Vhost string
177
+ Durable bool
178
+ AutoDelete bool `json:"auto_delete"`
179
+ }
180
+
181
+ type HealthCheck struct {
182
+ Status string `json:"status"`
156
183
}
157
184
158
185
// gatherFunc ...
@@ -204,6 +231,13 @@ var sampleConfig = `
204
231
queue_name_exclude = []
205
232
`
206
233
234
+ func boolToInt (b bool ) int64 {
235
+ if b {
236
+ return 1
237
+ }
238
+ return 0
239
+ }
240
+
207
241
// SampleConfig ...
208
242
func (r * RabbitMQ ) SampleConfig () string {
209
243
return sampleConfig
@@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
302
336
return
303
337
}
304
338
305
- var clustering_listeners , amqp_listeners int64 = 0 , 0
339
+ var clusteringListeners , amqpListeners int64 = 0 , 0
306
340
for _ , listener := range overview .Listeners {
307
341
if listener .Protocol == "clustering" {
308
- clustering_listeners ++
342
+ clusteringListeners ++
309
343
} else if listener .Protocol == "amqp" {
310
- amqp_listeners ++
344
+ amqpListeners ++
311
345
}
312
346
}
313
347
@@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
328
362
"messages_delivered" : overview .MessageStats .Deliver ,
329
363
"messages_delivered_get" : overview .MessageStats .DeliverGet ,
330
364
"messages_published" : overview .MessageStats .Publish ,
331
- "clustering_listeners" : clustering_listeners ,
332
- "amqp_listeners" : amqp_listeners ,
365
+ "clustering_listeners" : clusteringListeners ,
366
+ "amqp_listeners" : amqpListeners ,
367
+ "return_unroutable" : overview .MessageStats .ReturnUnroutable ,
368
+ "return_unroutable_rate" : overview .MessageStats .ReturnUnroutableDetails .Rate ,
333
369
}
334
370
acc .AddFields ("rabbitmq_overview" , fields , tags )
335
371
}
336
372
337
373
func gatherNodes (r * RabbitMQ , acc telegraf.Accumulator ) {
338
- nodes := make ([]Node , 0 )
374
+ allNodes := make ([]Node , 0 )
339
375
// Gather information about nodes
340
- err := r .requestJSON ("/api/nodes" , & nodes )
376
+ err := r .requestJSON ("/api/nodes" , & allNodes )
341
377
if err != nil {
342
378
acc .AddError (err )
343
379
return
344
380
}
345
- now := time .Now ()
381
+
382
+ nodes := make (map [string ]Node )
383
+ for _ , node := range allNodes {
384
+ if r .shouldGatherNode (node ) {
385
+ nodes [node .Name ] = node
386
+ }
387
+ }
388
+
389
+ numberNodes := len (nodes )
390
+ if numberNodes == 0 {
391
+ return
392
+ }
393
+
394
+ type NodeHealthCheck struct {
395
+ NodeName string
396
+ HealthCheck HealthCheck
397
+ Error error
398
+ }
399
+
400
+ healthChecksChannel := make (chan NodeHealthCheck , numberNodes )
346
401
347
402
for _ , node := range nodes {
348
- if ! r .shouldGatherNode (node ) {
349
- continue
403
+ go func (nodeName string , healthChecksChannel chan NodeHealthCheck ) {
404
+ var healthCheck HealthCheck
405
+
406
+ err := r .requestJSON ("/api/healthchecks/node/" + nodeName , & healthCheck )
407
+ nodeHealthCheck := NodeHealthCheck {
408
+ NodeName : nodeName ,
409
+ Error : err ,
410
+ HealthCheck : healthCheck ,
411
+ }
412
+
413
+ healthChecksChannel <- nodeHealthCheck
414
+ }(node .Name , healthChecksChannel )
415
+ }
416
+
417
+ now := time .Now ()
418
+
419
+ for i := 0 ; i < len (nodes ); i ++ {
420
+ nodeHealthCheck := <- healthChecksChannel
421
+
422
+ var healthCheckStatus int64 = 0
423
+
424
+ if nodeHealthCheck .Error != nil {
425
+ acc .AddError (nodeHealthCheck .Error )
426
+ } else if nodeHealthCheck .HealthCheck .Status == "ok" {
427
+ healthCheckStatus = 1
350
428
}
351
429
430
+ node := nodes [nodeHealthCheck .NodeName ]
431
+
352
432
tags := map [string ]string {"url" : r .URL }
353
433
tags ["node" ] = node .Name
354
434
355
- var running int64 = 0
356
- if node .Running {
357
- running = 1
358
- }
359
-
360
435
fields := map [string ]interface {}{
361
- "disk_free" : node .DiskFree ,
362
- "disk_free_limit" : node .DiskFreeLimit ,
363
- "fd_total" : node .FdTotal ,
364
- "fd_used" : node .FdUsed ,
365
- "mem_limit" : node .MemLimit ,
366
- "mem_used" : node .MemUsed ,
367
- "proc_total" : node .ProcTotal ,
368
- "proc_used" : node .ProcUsed ,
369
- "run_queue" : node .RunQueue ,
370
- "sockets_total" : node .SocketsTotal ,
371
- "sockets_used" : node .SocketsUsed ,
372
- "running" : running ,
436
+ "disk_free" : node .DiskFree ,
437
+ "disk_free_limit" : node .DiskFreeLimit ,
438
+ "disk_free_alarm" : boolToInt (node .DiskFreeAlarm ),
439
+ "fd_total" : node .FdTotal ,
440
+ "fd_used" : node .FdUsed ,
441
+ "mem_limit" : node .MemLimit ,
442
+ "mem_used" : node .MemUsed ,
443
+ "mem_alarm" : boolToInt (node .MemAlarm ),
444
+ "proc_total" : node .ProcTotal ,
445
+ "proc_used" : node .ProcUsed ,
446
+ "run_queue" : node .RunQueue ,
447
+ "sockets_total" : node .SocketsTotal ,
448
+ "sockets_used" : node .SocketsUsed ,
449
+ "uptime" : node .Uptime ,
450
+ "mnesia_disk_tx_count" : node .MnesiaDiskTxCount ,
451
+ "mnesia_disk_tx_count_rate" : node .MnesiaDiskTxCountDetails .Rate ,
452
+ "mnesia_ram_tx_count" : node .MnesiaRamTxCount ,
453
+ "mnesia_ram_tx_count_rate" : node .MnesiaRamTxCountDetails .Rate ,
454
+ "gc_num" : node .GcNum ,
455
+ "gc_num_rate" : node .GcNumDetails .Rate ,
456
+ "gc_bytes_reclaimed" : node .GcBytesReclaimed ,
457
+ "gc_bytes_reclaimed_rate" : node .GcBytesReclaimedDetails .Rate ,
458
+ "io_read_avg_time" : node .IoReadAvgTime ,
459
+ "io_read_avg_time_rate" : node .IoReadAvgTimeDetails .Rate ,
460
+ "io_read_bytes" : node .IoReadBytes ,
461
+ "io_read_bytes_rate" : node .IoReadBytesDetails .Rate ,
462
+ "io_write_avg_time" : node .IoWriteAvgTime ,
463
+ "io_write_avg_time_rate" : node .IoWriteAvgTimeDetails .Rate ,
464
+ "io_write_bytes" : node .IoWriteBytes ,
465
+ "io_write_bytes_rate" : node .IoWriteBytesDetails .Rate ,
466
+ "running" : boolToInt (node .Running ),
467
+ "health_check_status" : healthCheckStatus ,
373
468
}
374
469
acc .AddFields ("rabbitmq_node" , fields , tags , now )
375
470
}
@@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
459
554
acc .AddFields (
460
555
"rabbitmq_exchange" ,
461
556
map [string ]interface {}{
462
- "messages_publish_in" : exchange .MessageStats .PublishIn ,
463
- "messages_publish_out" : exchange .MessageStats .PublishOut ,
557
+ "messages_publish_in" : exchange .MessageStats .PublishIn ,
558
+ "messages_publish_in_rate" : exchange .MessageStats .PublishInDetails .Rate ,
559
+ "messages_publish_out" : exchange .MessageStats .PublishOut ,
560
+ "messages_publish_out_rate" : exchange .MessageStats .PublishOutDetails .Rate ,
464
561
},
465
562
tags ,
466
563
)
0 commit comments