@@ -19,6 +19,9 @@ import (
19
19
"github.com/influxdata/telegraf/plugins/inputs"
20
20
"google.golang.org/grpc"
21
21
"google.golang.org/grpc/credentials"
22
+
23
+ // Register GRPC gzip decoder to support compressed telemetry
24
+ _ "google.golang.org/grpc/encoding/gzip"
22
25
"google.golang.org/grpc/peer"
23
26
)
24
27
@@ -32,6 +35,7 @@ type CiscoTelemetryMDT struct {
32
35
// Common configuration
33
36
Transport string
34
37
ServiceAddress string `toml:"service_address"`
38
+ DecodeNXOS bool `toml:"decode_nxos"`
35
39
MaxMsgSize int `toml:"max_msg_size"`
36
40
Aliases map [string ]string `toml:"aliases"`
37
41
@@ -76,6 +80,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
76
80
var opts []grpc.ServerOption
77
81
tlsConfig , err := c .ServerConfig .TLSConfig ()
78
82
if err != nil {
83
+ c .listener .Close ()
79
84
return err
80
85
} else if tlsConfig != nil {
81
86
opts = append (opts , grpc .Creds (credentials .NewTLS (tlsConfig )))
@@ -198,6 +203,8 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS
198
203
log .Printf ("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT GRPC dialout connection from %s" , peer .Addr )
199
204
}
200
205
206
+ var chunkBuffer bytes.Buffer
207
+
201
208
for {
202
209
packet , err := stream .Recv ()
203
210
if err != nil {
@@ -212,7 +219,18 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS
212
219
break
213
220
}
214
221
215
- c .handleTelemetry (packet .Data )
222
+ // Reassemble chunked telemetry data received from NX-OS
223
+ if packet .TotalSize == 0 {
224
+ c .handleTelemetry (packet .Data )
225
+ } else if int (packet .TotalSize ) <= c .MaxMsgSize {
226
+ chunkBuffer .Write (packet .Data )
227
+ if chunkBuffer .Len () >= int (packet .TotalSize ) {
228
+ c .handleTelemetry (chunkBuffer .Bytes ())
229
+ chunkBuffer .Reset ()
230
+ }
231
+ } else {
232
+ c .acc .AddError (fmt .Errorf ("dropped too large packet: %dB > %dB" , packet .TotalSize , c .MaxMsgSize ))
233
+ }
216
234
}
217
235
218
236
if peerOK {
@@ -224,7 +242,6 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS
224
242
225
243
// Handle telemetry packet from any transport, decode and add as measurement
226
244
func (c * CiscoTelemetryMDT ) handleTelemetry (data []byte ) {
227
- var namebuf bytes.Buffer
228
245
telemetry := & telemetry.Telemetry {}
229
246
err := proto .Unmarshal (data , telemetry )
230
247
if err != nil {
@@ -254,12 +271,12 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
254
271
tags ["source" ] = telemetry .GetNodeIdStr ()
255
272
tags ["subscription" ] = telemetry .GetSubscriptionIdStr ()
256
273
for _ , subfield := range field .Fields {
257
- c .parseGPBKVField (subfield , & namebuf , telemetry .EncodingPath , timestamp , tags , nil )
274
+ c .parseGPBKVField (subfield , "" , telemetry .EncodingPath , timestamp , tags , nil )
258
275
}
259
276
case "content" :
260
277
fields = make (map [string ]interface {}, len (field .Fields ))
261
278
for _ , subfield := range field .Fields {
262
- c .parseGPBKVField (subfield , & namebuf , telemetry .EncodingPath , timestamp , tags , fields )
279
+ c .parseGPBKVField (subfield , "" , telemetry .EncodingPath , timestamp , tags , fields )
263
280
}
264
281
default :
265
282
log .Printf ("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s" , field .Name )
@@ -268,29 +285,36 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
268
285
269
286
// Find best alias for encoding path and emit measurement
270
287
if len (fields ) > 0 && len (tags ) > 0 && len (telemetry .EncodingPath ) > 0 {
271
- name := telemetry .EncodingPath
272
- if alias , ok := c .aliases [name ]; ok {
273
- tags ["path" ] = name
274
- name = alias
275
- } else {
276
- log .Printf ("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s" , name )
277
- }
278
- c .acc .AddFields (name , fields , tags , timestamp )
279
- } else {
288
+ c .addFieldsWithAlias (telemetry .EncodingPath , fields , tags , timestamp )
289
+ } else if ! c .DecodeNXOS {
280
290
c .acc .AddError (fmt .Errorf ("empty encoding path or measurement" ))
281
291
}
282
292
}
283
293
}
284
294
295
+ // Add fields doing alias replacement
296
+ func (c * CiscoTelemetryMDT ) addFieldsWithAlias (path string , fields map [string ]interface {},
297
+ tags map [string ]string , timestamp time.Time ) {
298
+ name := path
299
+ if alias , ok := c .aliases [name ]; ok {
300
+ tags ["path" ] = name
301
+ name = alias
302
+ } else {
303
+ log .Printf ("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s" , name )
304
+ }
305
+ c .acc .AddFields (name , fields , tags , timestamp )
306
+ }
307
+
285
308
// Recursively parse GPBKV field structure into fields or tags
286
- func (c * CiscoTelemetryMDT ) parseGPBKVField (field * telemetry.TelemetryField , namebuf * bytes. Buffer ,
309
+ func (c * CiscoTelemetryMDT ) parseGPBKVField (field * telemetry.TelemetryField , prefix string ,
287
310
path string , timestamp time.Time , tags map [string ]string , fields map [string ]interface {}) {
288
-
289
- namelen := namebuf .Len ()
290
- if namelen > 0 {
291
- namebuf .WriteRune ('/' )
311
+ localname := strings .Replace (field .Name , "-" , "_" , - 1 )
312
+ name := localname
313
+ if len (name ) == 0 {
314
+ name = prefix
315
+ } else if len (prefix ) > 0 {
316
+ name = prefix + "/" + localname
292
317
}
293
- namebuf .WriteString (strings .Replace (field .Name , "-" , "_" , - 1 ))
294
318
295
319
// Decode Telemetry field value if set
296
320
var value interface {}
@@ -318,21 +342,91 @@ func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, nam
318
342
if value != nil {
319
343
// Distinguish between tags (keys) and fields (data) to write to
320
344
if fields != nil {
321
- fields [namebuf . String () ] = value
345
+ fields [name ] = value
322
346
} else {
323
- if _ , exists := tags [field . Name ]; ! exists { // Use short keys whenever possible
324
- tags [field . Name ] = fmt .Sprint (value )
347
+ if _ , exists := tags [localname ]; ! exists { // Use short keys whenever possible
348
+ tags [localname ] = fmt .Sprint (value )
325
349
} else {
326
- tags [namebuf . String () ] = fmt .Sprint (value )
350
+ tags [name ] = fmt .Sprint (value )
327
351
}
328
352
}
329
353
}
354
+ if fields == nil || ! c .DecodeNXOS {
355
+ for _ , subfield := range field .Fields {
356
+ c .parseGPBKVField (subfield , name , path , timestamp , tags , fields )
357
+ }
358
+ } else if c .DecodeNXOS && len (field .Fields ) > 0 { // NX-OS extended decoding logic
359
+ c .parseNXOSTelemetryStructure (field , prefix , name , path , timestamp , tags , fields )
360
+ }
361
+ }
330
362
363
+ // Parse extended structure of NX-OS platform telemetry
364
+ func (c * CiscoTelemetryMDT ) parseNXOSTelemetryStructure (field * telemetry.TelemetryField , prefix string ,
365
+ name string , path string , timestamp time.Time , tags map [string ]string , fields map [string ]interface {}) {
366
+ var attributes , children , rows * telemetry.TelemetryField
367
+
368
+ // NX-OS uses certain fieldnames to indicate the structure following
331
369
for _ , subfield := range field .Fields {
332
- c .parseGPBKVField (subfield , namebuf , path , timestamp , tags , fields )
370
+ if subfield .Name == "attributes" && len (subfield .Fields ) > 0 {
371
+ attributes = subfield
372
+ } else if subfield .Name == "children" && len (subfield .Fields ) > 0 {
373
+ children = subfield
374
+ } else if strings .HasPrefix (subfield .Name , "ROW_" ) {
375
+ rows = subfield
376
+ } else { // Fallback to regular telemetry decoding
377
+ c .parseGPBKVField (subfield , name , path , timestamp , tags , fields )
378
+ }
333
379
}
334
380
335
- namebuf .Truncate (namelen )
381
+ if attributes != nil {
382
+ // DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/
383
+ values := make (map [string ]interface {})
384
+ for _ , subfield := range attributes .Fields {
385
+ c .parseGPBKVField (subfield , "" , path , timestamp , tags , values )
386
+ }
387
+ if rn , hasRN := values ["rn" ]; hasRN {
388
+ // Promote the relative name of the entry from a value to a key
389
+ tags [prefix ] = fmt .Sprint (rn )
390
+ delete (values , "rn" )
391
+ for key , value := range values {
392
+ // Work around an issue where a field is returned of type string when empty
393
+ // and as a number otherwise causing type confusion, thus remove empty strings
394
+ if str , isStr := value .(string ); isStr && len (str ) == 0 {
395
+ delete (values , key )
396
+ }
397
+ }
398
+ c .addFieldsWithAlias (path + "/" + prefix , values , tags , timestamp )
399
+ } else if _ , hasDN := values ["dn" ]; ! hasDN { // Check for distinguished name being present
400
+ c .acc .AddError (fmt .Errorf ("NX-OS decoding failed: missing dn field" ))
401
+ }
402
+ if children != nil {
403
+ // This is a nested structure, children will inherit relative name keys of parent
404
+ for _ , subfield := range children .Fields {
405
+ c .parseGPBKVField (subfield , prefix , path , timestamp , tags , fields )
406
+ }
407
+ }
408
+ delete (tags , prefix )
409
+ } else if rows != nil {
410
+ // NXAPI structure: https://developer.cisco.com/docs/cisco-nexus-9000-series-nx-api-cli-reference-release-9-2x/
411
+ for _ , row := range rows .Fields {
412
+ values := make (map [string ]interface {})
413
+ for i , subfield := range row .Fields {
414
+ c .parseGPBKVField (subfield , "" , path , timestamp , tags , values )
415
+ if i == 0 { // First subfield contains the index, promote it from value to tag
416
+ tags [prefix ] = fmt .Sprint (values [subfield .Name ])
417
+ delete (values , subfield .Name )
418
+ }
419
+ }
420
+ for key , value := range values {
421
+ // Work around an issue where a field is returned of type string when empty
422
+ // and as a number otherwise causing type confusion, thus remove empty strings
423
+ if str , isStr := value .(string ); isStr && len (str ) == 0 {
424
+ delete (values , key )
425
+ }
426
+ }
427
+ c .addFieldsWithAlias (path + "/" + prefix , values , tags , timestamp )
428
+ }
429
+ }
336
430
}
337
431
338
432
// Stop listener and cleanup
@@ -355,6 +449,9 @@ const sampleConfig = `
355
449
## Address and port to host telemetry listener
356
450
service_address = ":57000"
357
451
452
+ ## Enable support for decoding NX-OS platform-specific telemetry extensions (disable for IOS XR and IOS XE)
453
+ # decode_nxos = true
454
+
358
455
## Enable TLS; grpc transport only.
359
456
# tls_cert = "/etc/telegraf/cert.pem"
360
457
# tls_key = "/etc/telegraf/key.pem"
0 commit comments