Skip to content

Commit 8c7344a

Browse files
authored
Add flowDirection IE when proxying records in FlowAggregator (#6979)
This can help identify if the flow was exported from the source or destination Node in a more "standard" way. Note that we use a "special" value (0xff) for intra-Node flows. Signed-off-by: Antonin Bas <[email protected]>
1 parent ce01d6a commit 8c7344a

File tree

4 files changed

+71
-27
lines changed

4 files changed

+71
-27
lines changed

pkg/flowaggregator/exporter/ipfix.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -255,22 +255,22 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
255255
antreaInfoElements = infoelements.AntreaInfoElementsIPv6
256256
templateID = e.templateIDv6
257257
}
258-
for _, ie := range ianaInfoElements {
259-
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.IANAEnterpriseID)
258+
for _, ieName := range ianaInfoElements {
259+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID)
260260
if err != nil {
261261
return 0, err
262262
}
263263
elements = append(elements, ie)
264264
}
265-
for _, ie := range infoelements.IANAReverseInfoElements {
266-
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.IANAReversedEnterpriseID)
265+
for _, ieName := range infoelements.IANAReverseInfoElements {
266+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAReversedEnterpriseID)
267267
if err != nil {
268268
return 0, err
269269
}
270270
elements = append(elements, ie)
271271
}
272-
for _, ie := range antreaInfoElements {
273-
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
272+
for _, ieName := range antreaInfoElements {
273+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
274274
if err != nil {
275275
return 0, err
276276
}
@@ -295,8 +295,8 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
295295
}
296296
elements = append(elements, ie)
297297
}
298-
for _, ie := range infoelements.AntreaFlowEndSecondsElementList {
299-
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
298+
for _, ieName := range infoelements.AntreaFlowEndSecondsElementList {
299+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
300300
if err != nil {
301301
return 0, err
302302
}
@@ -326,8 +326,8 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
326326
elements = append(elements, ie)
327327
}
328328
}
329-
for _, ie := range infoelements.AntreaLabelsElementList {
330-
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
329+
for _, ieName := range infoelements.AntreaLabelsElementList {
330+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
331331
if err != nil {
332332
return 0, err
333333
}
@@ -339,21 +339,13 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
339339
}
340340
elements = append(elements, ie)
341341
if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy {
342-
ie, err := e.createInfoElementForTemplateSet("originalObservationDomainId", ipfixregistry.IANAEnterpriseID)
343-
if err != nil {
344-
return 0, err
345-
}
346-
elements = append(elements, ie)
347-
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID)
348-
if err != nil {
349-
return 0, err
350-
}
351-
elements = append(elements, ie)
352-
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID)
353-
if err != nil {
354-
return 0, err
342+
for _, ieName := range infoelements.IANAProxyModeElementList {
343+
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID)
344+
if err != nil {
345+
return 0, err
346+
}
347+
elements = append(elements, ie)
355348
}
356-
elements = append(elements, ie)
357349
}
358350
e.set.ResetSet()
359351
if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil {

pkg/flowaggregator/flowaggregator.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ func (fa *flowAggregator) InitCollectingProcess() error {
275275
cpInput.NumExtraElements += len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) +
276276
len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList)
277277
} else {
278-
// originalObservationDomainId, originalExporterIPv4Address, originalExporterIPv6Address
279-
cpInput.NumExtraElements += 3
278+
cpInput.NumExtraElements += len(infoelements.IANAProxyModeElementList)
280279
}
281280
// Tell the collector to accept IEs which are not part of the IPFIX registry (hardcoded in
282281
// the go-ipfix library). The preprocessor will take care of removing these elements.
@@ -505,7 +504,36 @@ func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID u
505504
if err != nil {
506505
return fmt.Errorf("cannot find record start time: %w", err)
507506
}
508-
if getFlowType(record) == ipfixregistry.FlowTypeInterNode {
507+
flowType := getFlowType(record)
508+
var withSource, withDestination bool
509+
if sourcePodName, _, exist := record.GetInfoElementWithValue("sourcePodName"); exist {
510+
withSource = sourcePodName.GetStringValue() != ""
511+
}
512+
if destinationPodName, _, exist := record.GetInfoElementWithValue("destinationPodName"); exist {
513+
withDestination = destinationPodName.GetStringValue() != ""
514+
}
515+
var direction uint8
516+
switch {
517+
// !withDestination should be redundant here
518+
case flowType == ipfixregistry.FlowTypeInterNode && withSource && !withDestination:
519+
// egress
520+
direction = 0x01
521+
// !withSource should be redundant here
522+
case flowType == ipfixregistry.FlowTypeInterNode && !withSource && withDestination:
523+
// ingress
524+
direction = 0x00
525+
case flowType == ipfixregistry.FlowTypeToExternal && withSource:
526+
// egress
527+
direction = 0x01
528+
case flowType == ipfixregistry.FlowTypeFromExternal && withDestination:
529+
// ingress
530+
direction = 0x00
531+
default:
532+
// not a valid value for the IE, we use it as a reserved value (unknown)
533+
// this covers the IntraNode case
534+
direction = 0xff
535+
}
536+
if flowType == ipfixregistry.FlowTypeInterNode {
509537
// This is the only case where K8s metadata could be missing
510538
fa.fillK8sMetadata(sourceAddress, destinationAddress, record, startTime)
511539
}
@@ -523,6 +551,9 @@ func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID u
523551
if err := fa.addOriginalExporterIPv6Address(record, originalExporterAddress); err != nil {
524552
klog.ErrorS(err, "Failed to add originalExporterIPv6Address")
525553
}
554+
if err := fa.addFlowDirection(record, direction); err != nil {
555+
klog.ErrorS(err, "Failed to add flowDirection")
556+
}
526557
return fa.sendRecord(record, isIPv6)
527558
}
528559

@@ -824,6 +855,17 @@ func (fa *flowAggregator) addOriginalExporterIPv6Address(record ipfixentities.Re
824855
return nil
825856
}
826857

858+
func (fa *flowAggregator) addFlowDirection(record ipfixentities.Record, direction uint8) error {
859+
ie, err := fa.registry.GetInfoElement("flowDirection", ipfixregistry.IANAEnterpriseID)
860+
if err != nil {
861+
return fmt.Errorf("error when getting flowDirection InfoElement: %w", err)
862+
}
863+
if err := record.AddInfoElement(ipfixentities.NewUnsigned8InfoElement(ie, direction)); err != nil {
864+
return fmt.Errorf("error when adding flowDirection InfoElement with value: %w", err)
865+
}
866+
return nil
867+
}
868+
827869
func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []map[string]interface{} {
828870
if fa.aggregationProcess != nil {
829871
return fa.aggregationProcess.GetRecords(flowKey)

pkg/flowaggregator/flowaggregator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ func TestFlowAggregator_proxyRecord(t *testing.T) {
356356
originalExporterIPv4AddressIE := ipfixentities.NewInfoElement("originalExporterIPv4Address", 0, 0, ipfixregistry.IANAEnterpriseID, 4)
357357
mockIPFIXRegistry.EXPECT().GetInfoElement("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID).Return(originalExporterIPv4AddressIE, nil)
358358
mockRecord.EXPECT().AddInfoElement(ipfixentities.NewIPAddressInfoElement(originalExporterIPv4AddressIE, exporterAddressIPv4))
359+
flowDirectionIE := ipfixentities.NewInfoElement("flowDirection", 0, 0, ipfixregistry.IANAEnterpriseID, 1)
360+
mockIPFIXRegistry.EXPECT().GetInfoElement("flowDirection", ipfixregistry.IANAEnterpriseID).Return(flowDirectionIE, nil)
361+
mockRecord.EXPECT().AddInfoElement(ipfixentities.NewUnsigned8InfoElement(flowDirectionIE, uint8(0xff)))
359362

360363
err := fa.proxyRecord(mockRecord, obsDomainID, exporterAddress)
361364
assert.NoError(t, err, "Error when proxying flow record")

pkg/flowaggregator/infoelements/elements.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,11 @@ var (
123123
"throughputFromDestinationNode",
124124
"reverseThroughputFromDestinationNode",
125125
}
126+
127+
IANAProxyModeElementList = []string{
128+
"originalObservationDomainId",
129+
"originalExporterIPv4Address",
130+
"originalExporterIPv6Address",
131+
"flowDirection",
132+
}
126133
)

0 commit comments

Comments
 (0)