Skip to content

Commit 0efb397

Browse files
authored
Support version skew between Antrea Agent and Flow Aggregator (#6912)
When a new IPFIX Information Element (IE) is introduced, a version mismatch between the Agent and the Flow Aggregator can be problematic. A "new" Agent can send an IE which is unknown to the "old" Flow Aggregator, or the "new" Flow Aggregator may expect an IE which is not sent by an "old" Agent. Prior to this change, we required the list of IEs sent by the Agent to be the same as the list of IEs expected by the Flow Aggregator. This is impossible to ensure during upgrade, as it may take a long time for all Agents in the cluster to be upgraded. After this change, Agents and Flow Aggregator can be upgraded in any order (although we would recommend the Flow Aggregator to be upgraded last). To achieve this, we introduce a new "process" between IPFIX collection and aggregation in the Flow Aggregator: the "preprocessor". The preprocessor is in charge of processing messages received from the IPFIX collector, prior to handling records over to the aggregation process. At the moment, its only task is to ensure that all records have the expected fields. If a record has extra fields, they will be discarded. If some fields are missing, they will be "appended" to the record with a "zero" value. For example, we will use 0 for integral types, "" for strings, 0.0.0.0 for IPv4 address, etc. Note that we are able to keep the implementation simple by assuming that a record either has missing fields or extra fields (not a combination of both), and that such fields are always at the tail of the field list. This assumption is based on implementation knowledge of the FlowExporter and the FlowAggregator. When we introduce a new IE, it always comes after all existing IEs, and we never deprecate / remove an existing IE across versions. Note that when the preprocessor adds a missing field, it is no longer possible to determine whether the field was originally missing, or was sent by the Agent with a zero value. This is why we recommend upgrading the Flow Aggregator last (to avoid this situation altogether). However, we do not believe that it is a significant drawback based on current usage. Fixes #6777 Signed-off-by: Antonin Bas <[email protected]>
1 parent afb9dfc commit 0efb397

File tree

6 files changed

+374
-15
lines changed

6 files changed

+374
-15
lines changed

pkg/flowaggregator/clickhouseclient/clickhouseclient.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ const (
9090
egressIP,
9191
appProtocolName,
9292
httpVals,
93-
egressNodeName)
93+
egressNodeName)
9494
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
9595
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
9696
?, ?, ?, ?, ?)`

pkg/flowaggregator/exporter/ipfix.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ func (e *IPFIXExporter) sendRecord(record ipfixentities.Record, isRecordIPv6 boo
175175
if err != nil {
176176
return err
177177
}
178-
klog.V(4).InfoS("Data set sent successfully", "bytes sent", sentBytes)
178+
if klog.V(7).Enabled() {
179+
klog.InfoS("Data set sent successfully", "bytes sent", sentBytes)
180+
}
179181
return nil
180182
}
181183

pkg/flowaggregator/flowaggregator.go

+76-8
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ type flowAggregator struct {
104104
clusterUUID uuid.UUID
105105
aggregatorTransportProtocol flowaggregatorconfig.AggregatorTransportProtocol
106106
collectingProcess ipfix.IPFIXCollectingProcess
107+
preprocessor *preprocessor
107108
aggregationProcess ipfix.IPFIXAggregationProcess
108109
activeFlowRecordTimeout time.Duration
109110
inactiveFlowRecordTimeout time.Duration
@@ -175,13 +176,18 @@ func NewFlowAggregator(
175176
APIServer: opt.Config.APIServer,
176177
logTickerDuration: time.Minute,
177178
}
178-
err = fa.InitCollectingProcess()
179-
if err != nil {
180-
return nil, fmt.Errorf("error when creating collecting process: %v", err)
179+
if err := fa.InitCollectingProcess(); err != nil {
180+
return nil, fmt.Errorf("error when creating collecting process: %w", err)
181181
}
182-
err = fa.InitAggregationProcess()
183-
if err != nil {
184-
return nil, fmt.Errorf("error when creating aggregation process: %v", err)
182+
// Use a buffered channel which ideally should be large enough to accommodate all the records
183+
// included in a given IPFIX message. It would be unusual to have more than 128 records in
184+
// an IPFIX message.
185+
recordCh := make(chan ipfixentities.Record, 128)
186+
if err := fa.InitPreprocessor(recordCh); err != nil {
187+
return nil, fmt.Errorf("error when creating preprocessor: %w", err)
188+
}
189+
if err := fa.InitAggregationProcess(recordCh); err != nil {
190+
return nil, fmt.Errorf("error when creating aggregation process: %w", err)
185191
}
186192
if opt.Config.ClickHouse.Enable {
187193
var err error
@@ -261,15 +267,72 @@ func (fa *flowAggregator) InitCollectingProcess() error {
261267
len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList)
262268
// clusterId
263269
cpInput.NumExtraElements += 1
270+
// Tell the collector to accept IEs which are not part of the IPFIX registry (hardcoded in
271+
// the go-ipfix library). The preprocessor will take care of removing these elements.
272+
cpInput.DecodingMode = collector.DecodingModeLenientKeepUnknown
264273
var err error
265274
fa.collectingProcess, err = collector.InitCollectingProcess(cpInput)
266275
return err
267276
}
268277

269-
func (fa *flowAggregator) InitAggregationProcess() error {
278+
func (fa *flowAggregator) InitPreprocessor(recordCh chan<- ipfixentities.Record) error {
279+
getInfoElementFromRegistry := func(ieName string, enterpriseID uint32) (*ipfixentities.InfoElement, error) {
280+
ie, err := fa.registry.GetInfoElement(ieName, enterpriseID)
281+
if err != nil {
282+
return nil, fmt.Errorf("error when looking up IE %q in registry: %w", ieName, err)
283+
}
284+
return ie, err
285+
}
286+
287+
getInfoElements := func(isIPv4 bool) ([]*ipfixentities.InfoElement, error) {
288+
ianaInfoElements := infoelements.IANAInfoElementsIPv4
289+
ianaReverseInfoElements := infoelements.IANAReverseInfoElements
290+
antreaInfoElements := infoelements.AntreaInfoElementsIPv4
291+
if !isIPv4 {
292+
ianaInfoElements = infoelements.IANAInfoElementsIPv6
293+
antreaInfoElements = infoelements.AntreaInfoElementsIPv6
294+
}
295+
infoElements := make([]*ipfixentities.InfoElement, 0)
296+
for _, ieName := range ianaInfoElements {
297+
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAEnterpriseID)
298+
if err != nil {
299+
return nil, err
300+
}
301+
infoElements = append(infoElements, ie)
302+
}
303+
for _, ieName := range ianaReverseInfoElements {
304+
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAReversedEnterpriseID)
305+
if err != nil {
306+
return nil, err
307+
}
308+
infoElements = append(infoElements, ie)
309+
}
310+
for _, ieName := range antreaInfoElements {
311+
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.AntreaEnterpriseID)
312+
if err != nil {
313+
return nil, err
314+
}
315+
infoElements = append(infoElements, ie)
316+
}
317+
return infoElements, nil
318+
}
319+
320+
infoElementsIPv4, err := getInfoElements(true)
321+
if err != nil {
322+
return err
323+
}
324+
infoElementsIPv6, err := getInfoElements(false)
325+
if err != nil {
326+
return err
327+
}
328+
fa.preprocessor, err = newPreprocessor(infoElementsIPv4, infoElementsIPv6, fa.collectingProcess.GetMsgChan(), recordCh)
329+
return err
330+
}
331+
332+
func (fa *flowAggregator) InitAggregationProcess(recordCh <-chan ipfixentities.Record) error {
270333
var err error
271334
apInput := ipfixintermediate.AggregationInput{
272-
MessageChan: fa.collectingProcess.GetMsgChan(),
335+
RecordChan: recordCh,
273336
WorkerNum: aggregationWorkerNum,
274337
CorrelateFields: correlateFields,
275338
ActiveExpiryTimeout: fa.activeFlowRecordTimeout,
@@ -293,6 +356,11 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) {
293356
fa.collectingProcess.Start()
294357
}()
295358
ipfixProcessesWg.Add(1)
359+
go func() {
360+
defer ipfixProcessesWg.Done()
361+
fa.preprocessor.Run(stopCh)
362+
}()
363+
ipfixProcessesWg.Add(1)
296364
go func() {
297365
// Same comment as above.
298366
defer ipfixProcessesWg.Done()

pkg/flowaggregator/flowaggregator_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ func TestFlowAggregator_Run(t *testing.T) {
548548
activeFlowRecordTimeout: 1 * time.Hour,
549549
logTickerDuration: 1 * time.Hour,
550550
collectingProcess: mockCollectingProcess,
551+
preprocessor: &preprocessor{},
551552
aggregationProcess: mockAggregationProcess,
552553
ipfixExporter: mockIPFIXExporter,
553554
configWatcher: configWatcher,
@@ -858,12 +859,12 @@ func TestFlowAggregator_InitAggregationProcess(t *testing.T) {
858859
activeFlowRecordTimeout: testActiveTimeout,
859860
inactiveFlowRecordTimeout: testInactiveTimeout,
860861
aggregatorTransportProtocol: flowaggregatorconfig.AggregatorTransportProtocolTCP,
862+
registry: ipfix.NewIPFIXRegistry(),
861863
}
862-
err := fa.InitCollectingProcess()
863-
require.NoError(t, err)
864-
865-
err = fa.InitAggregationProcess()
866-
require.NoError(t, err)
864+
require.NoError(t, fa.InitCollectingProcess())
865+
recordCh := make(chan ipfixentities.Record)
866+
require.NoError(t, fa.InitPreprocessor(recordCh))
867+
require.NoError(t, fa.InitAggregationProcess(recordCh))
867868
}
868869

869870
func TestFlowAggregator_fillK8sMetadata(t *testing.T) {

pkg/flowaggregator/preprocessor.go

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2025 Antrea Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package flowaggregator
16+
17+
import (
18+
"fmt"
19+
"net"
20+
21+
"github.com/vmware/go-ipfix/pkg/entities"
22+
"k8s.io/klog/v2"
23+
)
24+
25+
// preprocessor is in charge of processing messages received from the IPFIX collector, prior to
26+
// handling records over to the aggregation process. At the moment, its only task is to ensure that
27+
// all records have the expected fields. If a record has extra fields, they will be discarded. If
28+
// some fields are missing, they will be "appended" to the record with a "zero" value. For example,
29+
// we will use 0 for integral types, "" for strings, 0.0.0.0 for IPv4 address, etc. Note that we are
30+
// able to keep the implementation simple by assuming that a record either has missing fields or
31+
// extra fields (not a combination of both), and that such fields are always at the tail of the
32+
// field list. This assumption is based on implementation knowledge of the FlowExporter and the
33+
// FlowAggregator.
34+
type preprocessor struct {
35+
inCh <-chan *entities.Message
36+
outCh chan<- entities.Record
37+
38+
expectedElementsV4 int
39+
expectedElementsV6 int
40+
41+
defaultElementsWithValueV4 []entities.InfoElementWithValue
42+
defaultElementsWithValueV6 []entities.InfoElementWithValue
43+
}
44+
45+
func makeDefaultElementWithValue(ie *entities.InfoElement) (entities.InfoElementWithValue, error) {
46+
switch ie.DataType {
47+
case entities.OctetArray:
48+
var val []byte
49+
if ie.Len < entities.VariableLength {
50+
val = make([]byte, ie.Len)
51+
}
52+
return entities.NewOctetArrayInfoElement(ie, val), nil
53+
case entities.Unsigned8:
54+
return entities.NewUnsigned8InfoElement(ie, 0), nil
55+
case entities.Unsigned16:
56+
return entities.NewUnsigned16InfoElement(ie, 0), nil
57+
case entities.Unsigned32:
58+
return entities.NewUnsigned32InfoElement(ie, 0), nil
59+
case entities.Unsigned64:
60+
return entities.NewUnsigned64InfoElement(ie, 0), nil
61+
case entities.Signed8:
62+
return entities.NewSigned8InfoElement(ie, 0), nil
63+
case entities.Signed16:
64+
return entities.NewSigned16InfoElement(ie, 0), nil
65+
case entities.Signed32:
66+
return entities.NewSigned32InfoElement(ie, 0), nil
67+
case entities.Signed64:
68+
return entities.NewSigned64InfoElement(ie, 0), nil
69+
case entities.Float32:
70+
return entities.NewFloat32InfoElement(ie, 0), nil
71+
case entities.Float64:
72+
return entities.NewFloat64InfoElement(ie, 0), nil
73+
case entities.Boolean:
74+
return entities.NewBoolInfoElement(ie, false), nil
75+
case entities.DateTimeSeconds:
76+
return entities.NewDateTimeSecondsInfoElement(ie, 0), nil
77+
case entities.DateTimeMilliseconds:
78+
return entities.NewDateTimeMillisecondsInfoElement(ie, 0), nil
79+
case entities.MacAddress:
80+
return entities.NewMacAddressInfoElement(ie, make([]byte, 6)), nil
81+
case entities.Ipv4Address:
82+
return entities.NewIPAddressInfoElement(ie, net.IPv4zero), nil
83+
case entities.Ipv6Address:
84+
return entities.NewIPAddressInfoElement(ie, net.IPv6zero), nil
85+
case entities.String:
86+
return entities.NewStringInfoElement(ie, ""), nil
87+
default:
88+
return nil, fmt.Errorf("unexpected Information Element data type: %d", ie.DataType)
89+
}
90+
}
91+
92+
func makeDefaultElementsWithValue(infoElements []*entities.InfoElement) ([]entities.InfoElementWithValue, error) {
93+
elementsWithValue := make([]entities.InfoElementWithValue, len(infoElements))
94+
for idx := range infoElements {
95+
var err error
96+
if elementsWithValue[idx], err = makeDefaultElementWithValue(infoElements[idx]); err != nil {
97+
return nil, err
98+
}
99+
}
100+
return elementsWithValue, nil
101+
}
102+
103+
func newPreprocessor(infoElementsV4, infoElementsV6 []*entities.InfoElement, inCh <-chan *entities.Message, outCh chan<- entities.Record) (*preprocessor, error) {
104+
defaultElementsWithValueV4, err := makeDefaultElementsWithValue(infoElementsV4)
105+
if err != nil {
106+
return nil, fmt.Errorf("error when generating default values for IPv4 Information Elements expected from exporter: %w", err)
107+
}
108+
defaultElementsWithValueV6, err := makeDefaultElementsWithValue(infoElementsV6)
109+
if err != nil {
110+
return nil, fmt.Errorf("error when generating default values for IPv6 Information Elements expected from exporter: %w", err)
111+
}
112+
return &preprocessor{
113+
inCh: inCh,
114+
outCh: outCh,
115+
expectedElementsV4: len(infoElementsV4),
116+
expectedElementsV6: len(infoElementsV6),
117+
defaultElementsWithValueV4: defaultElementsWithValueV4,
118+
defaultElementsWithValueV6: defaultElementsWithValueV6,
119+
}, nil
120+
}
121+
122+
func (p *preprocessor) Run(stopCh <-chan struct{}) {
123+
for {
124+
select {
125+
case <-stopCh:
126+
return
127+
case msg, ok := <-p.inCh:
128+
if !ok {
129+
return
130+
}
131+
p.processMsg(msg)
132+
}
133+
}
134+
}
135+
136+
func isRecordIPv4(record entities.Record) bool {
137+
_, _, exist := record.GetInfoElementWithValue("sourceIPv4Address")
138+
return exist
139+
}
140+
141+
func (p *preprocessor) processMsg(msg *entities.Message) {
142+
set := msg.GetSet()
143+
if set.GetSetType() != entities.Data {
144+
return
145+
}
146+
records := set.GetRecords()
147+
for _, record := range records {
148+
elementList := record.GetOrderedElementList()
149+
numElements := len(elementList)
150+
isIPv4 := isRecordIPv4(record)
151+
expectedElements := p.expectedElementsV4
152+
if !isIPv4 {
153+
expectedElements = p.expectedElementsV6
154+
}
155+
if numElements == expectedElements {
156+
p.outCh <- record
157+
} else if numElements > expectedElements {
158+
if klog.V(5).Enabled() {
159+
klog.InfoS("Record received from exporter includes unexpected elements, truncating", "expectedElements", expectedElements, "receivedElements", numElements)
160+
}
161+
// Creating a new Record seems like the best option here. By using
162+
// NewDataRecordFromElements, we should minimize the number of allocations
163+
// required.
164+
p.outCh <- entities.NewDataRecordFromElements(0, elementList[:expectedElements], true)
165+
} else {
166+
if klog.V(5).Enabled() {
167+
klog.InfoS("Record received from exporter is missing information elements, adding fields with zero values", "expectedElements", expectedElements, "receivedElements", numElements)
168+
}
169+
if isIPv4 {
170+
elementList = append(elementList, p.defaultElementsWithValueV4[numElements:]...)
171+
} else {
172+
elementList = append(elementList, p.defaultElementsWithValueV6[numElements:]...)
173+
}
174+
p.outCh <- entities.NewDataRecordFromElements(0, elementList, true)
175+
}
176+
}
177+
178+
}

0 commit comments

Comments
 (0)