Skip to content

Commit 2481cd8

Browse files
committed
[fix][broker] Fix ResourceGroup report local usage (#22340)
Signed-off-by: Zixuan Liu <[email protected]> (cherry picked from commit 0b2b6d5)
1 parent 99eb49a commit 2481cd8

File tree

3 files changed

+130
-17
lines changed

3 files changed

+130
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.resourcegroup;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import io.prometheus.client.Counter;
2223
import java.util.HashMap;
2324
import java.util.Set;
@@ -216,24 +217,28 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
216217
resourceUsage.setOwner(this.getID());
217218

218219
p = resourceUsage.setPublish();
219-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p);
220+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p)) {
221+
resourceUsage.clearPublish();
222+
}
220223

221224
p = resourceUsage.setDispatch();
222-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);
225+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p)) {
226+
resourceUsage.clearDispatch();
227+
}
223228

224229
// Punt storage for now.
225230
}
226231

227232
// Transport manager mandated op.
228233
public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) {
229-
NetworkUsage p;
230-
231-
p = resourceUsage.getPublish();
232-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, p, broker);
233-
234-
p = resourceUsage.getDispatch();
235-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);
234+
if (resourceUsage.hasPublish()) {
235+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, resourceUsage.getPublish(), broker);
236+
}
236237

238+
if (resourceUsage.hasDispatch()) {
239+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, resourceUsage.getDispatch(),
240+
broker);
241+
}
237242
// Punt storage for now.
238243
}
239244

@@ -449,19 +454,17 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
449454

450455
bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
451456
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
452-
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
453-
454-
monEntity.totalUsedLocally.bytes += bytesUsed;
455-
monEntity.totalUsedLocally.messages += messagesUsed;
456-
457-
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
458457

459458
if (sendReport) {
460459
p.setBytesPerPeriod(bytesUsed);
461460
p.setMessagesPerPeriod(messagesUsed);
462461
monEntity.lastReportedValues.bytes = bytesUsed;
463462
monEntity.lastReportedValues.messages = messagesUsed;
464463
monEntity.numSuppressedUsageReports = 0;
464+
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
465+
monEntity.totalUsedLocally.bytes += bytesUsed;
466+
monEntity.totalUsedLocally.messages += messagesUsed;
467+
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
465468
} else {
466469
numSuppressions = monEntity.numSuppressedUsageReports++;
467470
}
@@ -594,6 +597,11 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
594597
};
595598
}
596599

600+
@VisibleForTesting
601+
PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClass) {
602+
return this.monitoringClassFields[monClass.ordinal()];
603+
}
604+
597605
public final String resourceGroupName;
598606

599607
public PerMonitoringClassFields[] monitoringClassFields =

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ protected void calculateQuotaForAllResourceGroups() {
687687
timeUnitScale);
688688
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
689689
maxIntervalForSuppressingReportsMSecs =
690-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
690+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
691691
}
692692
}
693693

@@ -706,7 +706,7 @@ private void initialize() {
706706
periodInSecs,
707707
this.timeUnitScale);
708708
maxIntervalForSuppressingReportsMSecs =
709-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
709+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
710710

711711
}
712712

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resourcegroup;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertTrue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
27+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
28+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields;
29+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
30+
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
31+
import org.apache.pulsar.common.policies.data.ResourceGroup;
32+
import org.testng.annotations.AfterClass;
33+
import org.testng.annotations.BeforeClass;
34+
import org.testng.annotations.Test;
35+
36+
public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest {
37+
38+
@BeforeClass
39+
@Override
40+
protected void setup() throws Exception {
41+
super.internalSetup();
42+
}
43+
44+
@AfterClass(alwaysRun = true)
45+
@Override
46+
protected void cleanup() throws Exception {
47+
super.internalCleanup();
48+
}
49+
50+
51+
@Test
52+
public void testRgFillResourceUsage() throws Exception {
53+
pulsar.getResourceGroupServiceManager().close();
54+
AtomicBoolean needReport = new AtomicBoolean(false);
55+
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.HOURS, null,
56+
new ResourceQuotaCalculator() {
57+
@Override
58+
public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
59+
long currentMessagesUsed, long lastReportedMessages,
60+
long lastReportTimeMSecsSinceEpoch) {
61+
return needReport.get();
62+
}
63+
64+
@Override
65+
public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
66+
return 0;
67+
}
68+
});
69+
String rgName = "rg-1";
70+
ResourceGroup rgConfig = new ResourceGroup();
71+
rgConfig.setPublishRateInBytes(1000L);
72+
rgConfig.setPublishRateInMsgs(2000);
73+
service.resourceGroupCreate(rgName, rgConfig);
74+
75+
org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
76+
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
77+
bytesAndMessagesCount.bytes = 20;
78+
bytesAndMessagesCount.messages = 10;
79+
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);
80+
ResourceUsage resourceUsage = new ResourceUsage();
81+
resourceGroup.rgFillResourceUsage(resourceUsage);
82+
assertFalse(resourceUsage.hasDispatch());
83+
assertFalse(resourceUsage.hasPublish());
84+
85+
PerMonitoringClassFields publishMonitoredEntity =
86+
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
87+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
88+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
89+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
90+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
91+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
92+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
93+
94+
needReport.set(true);
95+
resourceGroup.rgFillResourceUsage(resourceUsage);
96+
assertTrue(resourceUsage.hasDispatch());
97+
assertTrue(resourceUsage.hasPublish());
98+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
99+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
100+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
101+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
102+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
103+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
104+
}
105+
}

0 commit comments

Comments
 (0)