Skip to content

Commit 713585b

Browse files
authored
[POA-3519] Report rate limited requests & configured rate limit (#108)
Count the number of rate limited requests. Report the configured rate limit during the initial telemetry report and with each stats capture report.
1 parent fafb760 commit 713585b

File tree

5 files changed

+26
-6
lines changed

5 files changed

+26
-6
lines changed

apidump/apidump.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ func (a *apidump) SendInitialTelemetry() {
244244
CLITargetArch: architecture.GetCanonicalArch(),
245245
AkitaDockerRelease: env.InDocker(),
246246
DockerDesktop: env.HasDockerInternalHostAddress(),
247+
AgentRateLimit: a.WitnessesPerMinute,
247248
}
248249

249250
if pod, present := a.Args.Tags[tags.XAkitaKubernetesPod]; present {
@@ -285,6 +286,7 @@ func (a *apidump) SendPacketTelemetry(observedDuration int) {
285286
req := &kgxapi.PostClientPacketCaptureStatsRequest{
286287
AgentResourceUsage: usage.Get(),
287288
ObservedDurationInSeconds: observedDuration,
289+
AgentRateLimit: a.WitnessesPerMinute,
288290
}
289291
if a.dumpSummary != nil {
290292
req.PacketCountSummary = a.dumpSummary.FilterSummary.Summary(topNForSummary)
@@ -766,7 +768,7 @@ func (a *apidump) Run() error {
766768
// Subsampling.
767769
collector = trace.NewSamplingCollector(args.SampleRate, collector)
768770
if rateLimit != nil {
769-
collector = rateLimit.NewCollector(collector)
771+
collector = rateLimit.NewCollector(collector, summary)
770772
}
771773

772774
// Path and host filters.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/OneOfOne/xxhash v1.2.8
88
github.com/Pallinder/go-randomdata v1.2.0
99
github.com/akitasoftware/akita-ir v0.0.0-20241213050034-057d7b6097e8
10-
github.com/akitasoftware/akita-libs v0.0.0-20250314233547-6ff5afce3bf6
10+
github.com/akitasoftware/akita-libs v0.0.0-20250428180153-cb2e977a2ee3
1111
github.com/akitasoftware/go-utils v0.0.0-20240213133309-b95d4ace8803
1212
github.com/andybalholm/brotli v1.0.1
1313
github.com/aws/aws-sdk-go-v2 v1.17.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/akitasoftware/akita-libs v0.0.0-20250207124702-a568277a6ab6 h1:HPvFcp
3232
github.com/akitasoftware/akita-libs v0.0.0-20250207124702-a568277a6ab6/go.mod h1:Fg14kX6+N7we3KdP1c11W/SzbKsgapV1hP5d4Z/Hqwc=
3333
github.com/akitasoftware/akita-libs v0.0.0-20250314233547-6ff5afce3bf6 h1:kuJ8kv8oG3h+B7MZF3x5sj4z/WTmJaNlC5FxM41fh9c=
3434
github.com/akitasoftware/akita-libs v0.0.0-20250314233547-6ff5afce3bf6/go.mod h1:Fg14kX6+N7we3KdP1c11W/SzbKsgapV1hP5d4Z/Hqwc=
35+
github.com/akitasoftware/akita-libs v0.0.0-20250428180153-cb2e977a2ee3 h1:ANLqfVeDdlJoLf1jCjWlxtKaj65N8bLJ9OLmp1h8w/o=
36+
github.com/akitasoftware/akita-libs v0.0.0-20250428180153-cb2e977a2ee3/go.mod h1:Fg14kX6+N7we3KdP1c11W/SzbKsgapV1hP5d4Z/Hqwc=
3537
github.com/akitasoftware/go-utils v0.0.0-20240213133309-b95d4ace8803 h1:ebIh/EFuaP8GczzMe8EwVID/blSv5Tej6S8NE4xyarQ=
3638
github.com/akitasoftware/go-utils v0.0.0-20240213133309-b95d4ace8803/go.mod h1:+IOXf7l/QCAQECJzjJwhTp1sBkRoJ6WciZwJezUwBa4=
3739
github.com/akitasoftware/gopacket v1.1.18-0.20240820200020-7289ae956f70 h1:VnU7QLDBwRujpQoHwShs5yu0Ahv1fSalNJa4UijwlmY=

trace/rate_limit.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/akitasoftware/akita-libs/akinet"
9+
"github.com/akitasoftware/akita-libs/client_telemetry"
910
"github.com/postmanlabs/postman-insights-agent/printer"
1011
"github.com/spf13/viper"
1112
)
@@ -90,8 +91,7 @@ func (r *SharedRateLimit) endInterval(end time.Time) {
9091
r.FirstEstimate = false
9192
} else {
9293
alpha := viper.GetFloat64(RateLimitExponentialAlpha)
93-
exponentialMovingAverage :=
94-
(1-alpha)*float64(r.EstimatedSampleInterval) + alpha*float64(intervalLength)
94+
exponentialMovingAverage := (1-alpha)*float64(r.EstimatedSampleInterval) + alpha*float64(intervalLength)
9595
printer.Debugln("New estimate:", exponentialMovingAverage)
9696
r.EstimatedSampleInterval = time.Duration(uint64(exponentialMovingAverage))
9797
}
@@ -206,14 +206,18 @@ type rateLimitCollector struct {
206206

207207
// Channel from RateLimit for epoch starts
208208
epochCh chan time.Time
209+
210+
// Packet counter
211+
packetCount PacketCountConsumer
209212
}
210213

211-
func (r *SharedRateLimit) NewCollector(next Collector) Collector {
214+
func (r *SharedRateLimit) NewCollector(next Collector, packetCounts PacketCountConsumer) Collector {
212215
c := &rateLimitCollector{
213216
RateLimit: r,
214217
NextCollector: next,
215218
RequestArrivalTimes: make(map[requestKey]time.Time),
216219
epochCh: make(chan time.Time, 1),
220+
packetCount: packetCounts,
217221
}
218222
r.lock.Lock()
219223
defer r.lock.Unlock()
@@ -240,6 +244,14 @@ func (r *rateLimitCollector) Process(pnt akinet.ParsedNetworkTraffic) error {
240244
r.NextCollector.Process(pnt)
241245
key := requestKey{c.StreamID.String(), c.Seq}
242246
r.RequestArrivalTimes[key] = pnt.ObservationTime
247+
} else {
248+
r.packetCount.Update(client_telemetry.PacketCounts{
249+
Interface: pnt.Interface,
250+
DstHost: c.Host,
251+
SrcPort: pnt.SrcPort,
252+
DstPort: pnt.DstPort,
253+
HTTPRequestsRateLimited: 1,
254+
})
243255
}
244256
case akinet.HTTPResponse:
245257
// Collect iff the request is in our map. (This means responses to calls

trace/rate_limit_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func TestRateLimit_FirstSample(t *testing.T) {
4444
start := time.Now()
4545
cc := &countingCollector{}
4646
rl := NewRateLimit(1.0)
47-
c := rl.NewCollector(cc).(*rateLimitCollector)
47+
pc := NewPacketCounter()
48+
c := rl.NewCollector(cc, pc).(*rateLimitCollector)
4849

4950
// Sample packet from another test
5051
streamID := uuid.New()
@@ -95,6 +96,9 @@ func TestRateLimit_FirstSample(t *testing.T) {
9596
if cc.GetNumPackets() != 5 {
9697
t.Errorf("Expected 5 packets in collector, got %v", cc.GetNumPackets())
9798
}
99+
if pc.total.HTTPRequestsRateLimited != 5 {
100+
t.Errorf("Expected 5 rate limited request, got %v", pc.total.HTTPRequestsRateLimited)
101+
}
98102
if rl.FirstEstimate {
99103
t.Errorf("Expected FirstEstimate to be false")
100104
}

0 commit comments

Comments
 (0)