Skip to content

Commit 0b2b6d5

Browse files
authored
[fix][broker] Fix ResourceGroup report local usage (apache#22340)
Signed-off-by: Zixuan Liu <[email protected]>
1 parent a52945b commit 0b2b6d5

File tree

3 files changed

+130
-17
lines changed

3 files changed

+130
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.resourcegroup;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import io.prometheus.client.Counter;
2223
import java.util.HashMap;
2324
import java.util.Set;
@@ -218,24 +219,28 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
218219
resourceUsage.setOwner(this.getID());
219220

220221
p = resourceUsage.setPublish();
221-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p);
222+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p)) {
223+
resourceUsage.clearPublish();
224+
}
222225

223226
p = resourceUsage.setDispatch();
224-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);
227+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p)) {
228+
resourceUsage.clearDispatch();
229+
}
225230

226231
// Punt storage for now.
227232
}
228233

229234
// Transport manager mandated op.
230235
public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) {
231-
NetworkUsage p;
232-
233-
p = resourceUsage.getPublish();
234-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, p, broker);
235-
236-
p = resourceUsage.getDispatch();
237-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);
236+
if (resourceUsage.hasPublish()) {
237+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, resourceUsage.getPublish(), broker);
238+
}
238239

240+
if (resourceUsage.hasDispatch()) {
241+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, resourceUsage.getDispatch(),
242+
broker);
243+
}
239244
// Punt storage for now.
240245
}
241246

@@ -453,19 +458,17 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
453458

454459
bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
455460
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
456-
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
457-
458-
monEntity.totalUsedLocally.bytes += bytesUsed;
459-
monEntity.totalUsedLocally.messages += messagesUsed;
460-
461-
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
462461

463462
if (sendReport) {
464463
p.setBytesPerPeriod(bytesUsed);
465464
p.setMessagesPerPeriod(messagesUsed);
466465
monEntity.lastReportedValues.bytes = bytesUsed;
467466
monEntity.lastReportedValues.messages = messagesUsed;
468467
monEntity.numSuppressedUsageReports = 0;
468+
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
469+
monEntity.totalUsedLocally.bytes += bytesUsed;
470+
monEntity.totalUsedLocally.messages += messagesUsed;
471+
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
469472
} else {
470473
numSuppressions = monEntity.numSuppressedUsageReports++;
471474
}
@@ -598,6 +601,11 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
598601
};
599602
}
600603

604+
@VisibleForTesting
605+
PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClass) {
606+
return this.monitoringClassFields[monClass.ordinal()];
607+
}
608+
601609
public final String resourceGroupName;
602610

603611
public PerMonitoringClassFields[] monitoringClassFields =

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ protected void calculateQuotaForAllResourceGroups() {
686686
timeUnitScale);
687687
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
688688
maxIntervalForSuppressingReportsMSecs =
689-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
689+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
690690
}
691691
}
692692

@@ -705,7 +705,7 @@ private void initialize() {
705705
periodInSecs,
706706
this.timeUnitScale);
707707
maxIntervalForSuppressingReportsMSecs =
708-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
708+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
709709

710710
}
711711

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resourcegroup;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertTrue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
27+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
28+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields;
29+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
30+
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
31+
import org.apache.pulsar.common.policies.data.ResourceGroup;
32+
import org.testng.annotations.AfterClass;
33+
import org.testng.annotations.BeforeClass;
34+
import org.testng.annotations.Test;
35+
36+
public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest {
37+
38+
@BeforeClass
39+
@Override
40+
protected void setup() throws Exception {
41+
super.internalSetup();
42+
}
43+
44+
@AfterClass(alwaysRun = true)
45+
@Override
46+
protected void cleanup() throws Exception {
47+
super.internalCleanup();
48+
}
49+
50+
51+
@Test
52+
public void testRgFillResourceUsage() throws Exception {
53+
pulsar.getResourceGroupServiceManager().close();
54+
AtomicBoolean needReport = new AtomicBoolean(false);
55+
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.HOURS, null,
56+
new ResourceQuotaCalculator() {
57+
@Override
58+
public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
59+
long currentMessagesUsed, long lastReportedMessages,
60+
long lastReportTimeMSecsSinceEpoch) {
61+
return needReport.get();
62+
}
63+
64+
@Override
65+
public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
66+
return 0;
67+
}
68+
});
69+
String rgName = "rg-1";
70+
ResourceGroup rgConfig = new ResourceGroup();
71+
rgConfig.setPublishRateInBytes(1000L);
72+
rgConfig.setPublishRateInMsgs(2000);
73+
service.resourceGroupCreate(rgName, rgConfig);
74+
75+
org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
76+
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
77+
bytesAndMessagesCount.bytes = 20;
78+
bytesAndMessagesCount.messages = 10;
79+
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);
80+
ResourceUsage resourceUsage = new ResourceUsage();
81+
resourceGroup.rgFillResourceUsage(resourceUsage);
82+
assertFalse(resourceUsage.hasDispatch());
83+
assertFalse(resourceUsage.hasPublish());
84+
85+
PerMonitoringClassFields publishMonitoredEntity =
86+
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
87+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
88+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
89+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
90+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
91+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
92+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
93+
94+
needReport.set(true);
95+
resourceGroup.rgFillResourceUsage(resourceUsage);
96+
assertTrue(resourceUsage.hasDispatch());
97+
assertTrue(resourceUsage.hasPublish());
98+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
99+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
100+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
101+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
102+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
103+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
104+
}
105+
}

0 commit comments

Comments
 (0)