Skip to content

Commit 2e3c87e

Browse files
authored
Merge pull request #20149 from nwnt/nwnt/add-k8s-traffic-type
Add Kubernetes Traffic Type
2 parents f6d6111 + 6aaf32b commit 2e3c87e

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

tests/antithesis/test-template/robustness/finally/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828

2929
"go.etcd.io/etcd/tests/v3/antithesis/test-template/robustness/common"
3030
"go.etcd.io/etcd/tests/v3/robustness/report"
31-
"go.etcd.io/etcd/tests/v3/robustness/traffic"
3231
"go.etcd.io/etcd/tests/v3/robustness/validate"
3332
)
3433

@@ -61,7 +60,7 @@ func validateReports(lg *zap.Logger, serversDataPath map[string]string, reports
6160
persistedRequests, err := report.PersistedRequests(lg, slices.Collect(maps.Values(serversDataPath)))
6261
assertResult(validate.ResultFromError(err), "Loaded persisted requests")
6362

64-
validateConfig := validate.Config{ExpectRevisionUnique: traffic.EtcdPutDeleteLease.ExpectUniqueRevision()}
63+
validateConfig := validate.Config{ExpectRevisionUnique: false}
6564
result := validate.ValidateAndReturnVisualize(lg, validateConfig, reports, persistedRequests, 5*time.Minute)
6665
assertResult(result.Assumptions, "Validation assumptions fulfilled")
6766
if result.Linearization.Timeout {

tests/antithesis/test-template/robustness/traffic/main.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"math/rand/v2"
2223
"os"
2324
"slices"
2425
"sync"
@@ -37,13 +38,23 @@ import (
3738
"go.etcd.io/etcd/tests/v3/robustness/traffic"
3839
)
3940

40-
var profile = traffic.Profile{
41-
MinimalQPS: 100,
42-
MaximalQPS: 1000,
43-
BurstableQPS: 1000,
44-
ClientCount: 3,
45-
MaxNonUniqueRequestConcurrency: 3,
46-
}
41+
var (
42+
profile = traffic.Profile{
43+
MinimalQPS: 100,
44+
MaximalQPS: 1000,
45+
BurstableQPS: 1000,
46+
ClientCount: 3,
47+
MaxNonUniqueRequestConcurrency: 3,
48+
}
49+
trafficNames = []string{
50+
"etcd",
51+
"kubernetes",
52+
}
53+
traffics = []traffic.Traffic{
54+
traffic.EtcdPutDeleteLease,
55+
traffic.Kubernetes,
56+
}
57+
)
4758

4859
func main() {
4960
local := flag.Bool("local", false, "run tests locally and connect to etcd instances via localhost")
@@ -62,6 +73,9 @@ func main() {
6273
if err != nil {
6374
panic(err)
6475
}
76+
choice := rand.IntN(len(traffics))
77+
tf := traffics[choice]
78+
lg.Info("Traffic", zap.String("Type", trafficNames[choice]))
6579
r := report.TestReport{Logger: lg, ServersDataPath: etcdetcdDataPaths}
6680
defer func() {
6781
if err = r.Report(reportPath); err != nil {
@@ -70,14 +84,14 @@ func main() {
7084
}()
7185

7286
lg.Info("Start traffic generation", zap.Duration("duration", duration))
73-
r.Client, err = runTraffic(ctx, lg, hosts, baseTime, duration)
87+
r.Client, err = runTraffic(ctx, lg, tf, hosts, baseTime, duration)
7488
if err != nil {
7589
lg.Error("Failed to generate traffic")
7690
panic(err)
7791
}
7892
}
7993

80-
func runTraffic(ctx context.Context, lg *zap.Logger, hosts []string, baseTime time.Time, duration time.Duration) ([]report.ClientReport, error) {
94+
func runTraffic(ctx context.Context, lg *zap.Logger, tf traffic.Traffic, hosts []string, baseTime time.Time, duration time.Duration) ([]report.ClientReport, error) {
8195
ids := identity.NewIDProvider()
8296
r, err := traffic.CheckEmptyDatabaseAtStart(ctx, lg, hosts, ids, baseTime)
8397
if err != nil {
@@ -93,7 +107,7 @@ func runTraffic(ctx context.Context, lg *zap.Logger, hosts []string, baseTime ti
93107
startTime := time.Since(baseTime)
94108
g.Go(func() error {
95109
defer close(maxRevisionChan)
96-
trafficReports = slices.Concat(trafficReports, simulateTraffic(ctx, hosts, ids, baseTime, duration))
110+
trafficReports = slices.Concat(trafficReports, simulateTraffic(ctx, tf, hosts, ids, baseTime, duration))
97111
maxRevision := report.OperationsMaxRevision(trafficReports)
98112
maxRevisionChan <- maxRevision
99113
lg.Info("Finished simulating Traffic", zap.Int64("max-revision", maxRevision))
@@ -120,7 +134,7 @@ func runTraffic(ctx context.Context, lg *zap.Logger, hosts []string, baseTime ti
120134
return reports, nil
121135
}
122136

123-
func simulateTraffic(ctx context.Context, hosts []string, ids identity.Provider, baseTime time.Time, duration time.Duration) []report.ClientReport {
137+
func simulateTraffic(ctx context.Context, tf traffic.Traffic, hosts []string, ids identity.Provider, baseTime time.Time, duration time.Duration) []report.ClientReport {
124138
var mux sync.Mutex
125139
var wg sync.WaitGroup
126140
storage := identity.NewLeaseIDStorage()
@@ -136,7 +150,7 @@ func simulateTraffic(ctx context.Context, hosts []string, ids identity.Provider,
136150
defer wg.Done()
137151
defer c.Close()
138152

139-
traffic.EtcdPutDeleteLease.RunTrafficLoop(ctx, c, limiter,
153+
tf.RunTrafficLoop(ctx, c, limiter,
140154
ids,
141155
storage,
142156
concurrencyLimiter,
@@ -153,7 +167,7 @@ func simulateTraffic(ctx context.Context, hosts []string, ids identity.Provider,
153167
go func(c *client.RecordingClient) {
154168
defer wg.Done()
155169
defer c.Close()
156-
traffic.EtcdPutDeleteLease.RunCompactLoop(ctx, c, traffic.DefaultCompactionPeriod, finish)
170+
tf.RunCompactLoop(ctx, c, traffic.DefaultCompactionPeriod, finish)
157171
mux.Lock()
158172
reports = append(reports, c.Report())
159173
mux.Unlock()

0 commit comments

Comments
 (0)