Skip to content

Commit d3087fd

Browse files
fix: batch probing in e2e NP tests (#7068)
On older machines, the tests `AntreaPolicyExtendedNamespaces` would fail but not on CI. The failure was due to the 1k+ simultaneous probes. By limiting the number of concurrent probes, older machines can handle the load and CI speed is not compromised Fixes #7067 Signed-off-by: Peter Tran <[email protected]>
1 parent 30f0523 commit d3087fd

File tree

1 file changed

+114
-66
lines changed

1 file changed

+114
-66
lines changed

test/e2e/k8s_util.go

+114-66
Original file line numberDiff line numberDiff line change
@@ -1032,55 +1032,116 @@ func (k *KubernetesUtils) waitForPodInNamespace(ns string, pod string) ([]string
10321032
}
10331033
}
10341034

1035+
type httpServerReadiness struct {
1036+
*KubernetesUtils
1037+
pods []Pod
1038+
reachability *Reachability
1039+
remoteCluster *KubernetesUtils
1040+
protocolPortPairs map[utils.AntreaPolicyProtocol][]int32
1041+
}
1042+
1043+
func (hsr *httpServerReadiness) isReady() bool {
1044+
hsr.reachability = NewReachability(hsr.pods, Connected)
1045+
hsr.validate()
1046+
if _, wrong, _ := hsr.reachability.Summary(); wrong != 0 {
1047+
return false
1048+
}
1049+
1050+
return true
1051+
}
1052+
10351053
func (k *KubernetesUtils) waitForHTTPServers(allPods []Pod) error {
1036-
const maxTries = 10
10371054
log.Infof("waiting for HTTP servers (ports 80, 81 and 8080:8085) to become ready")
10381055

1039-
serversAreReady := func() bool {
1040-
reachability := NewReachability(allPods, Connected)
1041-
k.Validate(allPods, reachability, []int32{80, 81, 8080, 8081, 8082, 8083, 8084, 8085}, utils.ProtocolTCP)
1042-
if _, wrong, _ := reachability.Summary(); wrong != 0 {
1043-
return false
1044-
}
1056+
httpServerReadiness := httpServerReadiness{
1057+
pods: allPods,
1058+
KubernetesUtils: k,
1059+
protocolPortPairs: map[utils.AntreaPolicyProtocol][]int32{
1060+
utils.ProtocolTCP: {80, 81, 8080, 8081, 8082, 8083, 8084, 8085},
1061+
utils.ProtocolUDP: {80, 81},
1062+
utils.ProtocolSCTP: {80, 81},
1063+
},
1064+
}
10451065

1046-
k.Validate(allPods, reachability, []int32{80, 81}, utils.ProtocolUDP)
1047-
if _, wrong, _ := reachability.Summary(); wrong != 0 {
1048-
return false
1049-
}
1066+
if httpServerReadiness.isReady() {
1067+
log.Infof("All HTTP servers are ready")
1068+
return nil
1069+
} else {
1070+
return fmt.Errorf("HTTP servers are not ready")
1071+
}
1072+
}
1073+
1074+
// Encapsulate the data needed to perform a probe between pods
1075+
type probeVector struct {
1076+
fromPod Pod
1077+
toPod Pod
1078+
port int32
1079+
protocol utils.AntreaPolicyProtocol
1080+
}
10501081

1051-
k.Validate(allPods, reachability, []int32{80, 81}, utils.ProtocolSCTP)
1052-
if _, wrong, _ := reachability.Summary(); wrong != 0 {
1053-
return false
1082+
// Populate the channel with all combinations of probes based on the required ports and protocols
1083+
func (hsr *httpServerReadiness) buildProbeVectors(probes chan<- probeVector) {
1084+
for protocol, ports := range hsr.protocolPortPairs {
1085+
for _, fromPod := range hsr.pods {
1086+
for _, podTo := range hsr.pods {
1087+
for _, port := range ports {
1088+
probes <- probeVector{fromPod, podTo, port, protocol}
1089+
}
1090+
}
10541091
}
1055-
return true
10561092
}
1093+
close(probes)
1094+
}
10571095

1058-
for i := 0; i < maxTries; i++ {
1059-
if serversAreReady() {
1060-
log.Infof("All HTTP servers are ready")
1061-
return nil
1062-
}
1063-
time.Sleep(defaultInterval)
1096+
// Calculate the number of probes created across all port protocol permutations
1097+
func (hsr *httpServerReadiness) numProbes() int {
1098+
probeCount := 0
1099+
podCount := len(hsr.pods)
1100+
podCountSquared := podCount * podCount
1101+
for protocol := range hsr.protocolPortPairs {
1102+
ports := hsr.protocolPortPairs[protocol]
1103+
probeCount += podCountSquared * len(ports)
10641104
}
1065-
return fmt.Errorf("after %d tries, HTTP servers are not ready", maxTries)
1105+
return probeCount
10661106
}
10671107

1068-
func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachability, port int32, protocol utils.AntreaPolicyProtocol) {
1069-
numProbes := len(allPods) * len(allPods)
1070-
resultsCh := make(chan *probeResult, numProbes)
1071-
// TODO: find better metrics, this is only for POC.
1072-
oneProbe := func(podFrom, podTo Pod, port int32) {
1108+
// Spawn a fixed set of workers to complete probing of the servers
1109+
func (hsr *httpServerReadiness) spawnProberPool(resultsCh chan *probeResult) {
1110+
numProbes := hsr.numProbes()
1111+
probes := make(chan probeVector, numProbes)
1112+
hsr.buildProbeVectors(probes)
1113+
1114+
probe := func(vector probeVector) {
1115+
podFrom := vector.fromPod
1116+
podTo := vector.toPod
1117+
port := vector.port
1118+
protocol := vector.protocol
10731119
log.Tracef("Probing: %s -> %s", podFrom, podTo)
1074-
expectedResult := reachability.Expected.Get(podFrom.String(), podTo.String())
1075-
connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, nil, &expectedResult)
1120+
expectedResult := hsr.reachability.Expected.Get(podFrom.String(), podTo.String())
1121+
connectivity, err := hsr.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, hsr.remoteCluster, &expectedResult)
10761122
resultsCh <- &probeResult{podFrom, podTo, connectivity, err}
10771123
}
1078-
for _, pod1 := range allPods {
1079-
for _, pod2 := range allPods {
1080-
go oneProbe(pod1, pod2, port)
1124+
1125+
startProber := func() {
1126+
for vector := range probes {
1127+
probe(vector)
10811128
}
10821129
}
1083-
for i := 0; i < numProbes; i++ {
1130+
1131+
// Tested value as the upper limit for running locally with minimal impacts to CI speeds
1132+
proberRateLimit := 150
1133+
for range min(proberRateLimit, numProbes) {
1134+
go startProber()
1135+
}
1136+
}
1137+
1138+
// Validates two way connectivity between all pods across all protocol and port permutations
1139+
func (hsr *httpServerReadiness) validate() {
1140+
numProbes := hsr.numProbes()
1141+
resultsCh := make(chan *probeResult, numProbes)
1142+
hsr.spawnProberPool(resultsCh)
1143+
1144+
for range numProbes {
10841145
r := <-resultsCh
10851146
if r.err != nil {
10861147
log.Errorf("unable to perform probe %s -> %s: %v", r.podFrom, r.podTo, r.err)
@@ -1093,11 +1154,11 @@ func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachabil
10931154
// If the connectivity from podFrom to podTo has been observed and is different
10941155
// from the connectivity we received, store Error connectivity in reachability
10951156
// matrix.
1096-
prevConn := reachability.Observed.Get(r.podFrom.String(), r.podTo.String())
1157+
prevConn := hsr.reachability.Observed.Get(r.podFrom.String(), r.podTo.String())
10971158
if prevConn == Unknown {
1098-
reachability.Observe(r.podFrom, r.podTo, r.connectivity)
1159+
hsr.reachability.Observe(r.podFrom, r.podTo, r.connectivity)
10991160
} else if prevConn != r.connectivity {
1100-
reachability.Observe(r.podFrom, r.podTo, Error)
1161+
hsr.reachability.Observe(r.podFrom, r.podTo, Error)
11011162
}
11021163
}
11031164
}
@@ -1107,41 +1168,28 @@ func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachabil
11071168
// be consistent across all provided ports. Otherwise, this connectivity will be
11081169
// treated as Error.
11091170
func (k *KubernetesUtils) Validate(allPods []Pod, reachability *Reachability, ports []int32, protocol utils.AntreaPolicyProtocol) {
1110-
for _, port := range ports {
1111-
// we do not run all the probes in parallel as we have experienced that on some
1112-
// machines, this can cause a fraction of the probes to always fail, despite the
1113-
// built-in retry (3x) mechanism. Probably because of the large number of probes,
1114-
// each one being executed in its own goroutine. For example, with 9 Pods and for
1115-
// ports 80, 81, 8080, 8081, 8082, 8083, 8084 and 8085, we would end up with
1116-
// potentially 9*9*8 = 648 simultaneous probes.
1117-
k.validateOnePort(allPods, reachability, port, protocol)
1171+
httpServerReadiness := httpServerReadiness{
1172+
pods: allPods,
1173+
KubernetesUtils: k,
1174+
protocolPortPairs: map[utils.AntreaPolicyProtocol][]int32{
1175+
protocol: ports,
1176+
},
1177+
reachability: reachability,
11181178
}
1179+
httpServerReadiness.validate()
11191180
}
11201181

11211182
func (k *KubernetesUtils) ValidateRemoteCluster(remoteCluster *KubernetesUtils, allPods []Pod, reachability *Reachability, port int32, protocol utils.AntreaPolicyProtocol) {
1122-
numProbes := len(allPods) * len(allPods)
1123-
resultsCh := make(chan *probeResult, numProbes)
1124-
oneProbe := func(podFrom, podTo Pod, port int32) {
1125-
log.Tracef("Probing: %s -> %s", podFrom, podTo)
1126-
expectedResult := reachability.Expected.Get(podFrom.String(), podTo.String())
1127-
connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, remoteCluster, &expectedResult)
1128-
resultsCh <- &probeResult{podFrom, podTo, connectivity, err}
1129-
}
1130-
for _, pod1 := range allPods {
1131-
for _, pod2 := range allPods {
1132-
go oneProbe(pod1, pod2, port)
1133-
}
1134-
}
1135-
for i := 0; i < numProbes; i++ {
1136-
r := <-resultsCh
1137-
if r.err != nil {
1138-
log.Errorf("unable to perform probe %s -> %s in %s: %v", r.podFrom, r.podTo, k.ClusterName, r.err)
1139-
}
1140-
prevConn := reachability.Observed.Get(r.podFrom.String(), r.podTo.String())
1141-
if prevConn == Unknown {
1142-
reachability.Observe(r.podFrom, r.podTo, r.connectivity)
1143-
}
1183+
httpServerReadiness := httpServerReadiness{
1184+
pods: allPods,
1185+
KubernetesUtils: k,
1186+
protocolPortPairs: map[utils.AntreaPolicyProtocol][]int32{
1187+
protocol: {port},
1188+
},
1189+
reachability: reachability,
1190+
remoteCluster: remoteCluster,
11441191
}
1192+
httpServerReadiness.validate()
11451193
}
11461194

11471195
func (k *KubernetesUtils) Bootstrap(namespaces map[string]TestNamespaceMeta, podsPerNamespace []string, createNamespaces bool, nodeNames map[string]string, hostNetworks map[string]bool) (map[string][]string, error) {

0 commit comments

Comments
 (0)