Skip to content

Commit 5ecd040

Browse files
committed
[fix][broker] Fix ResourceGroup report local usage (apache#22340)
Signed-off-by: Zixuan Liu <[email protected]> (cherry picked from commit 0b2b6d5) Signed-off-by: Zixuan Liu <[email protected]>
1 parent 590633d commit 5ecd040

File tree

3 files changed

+137
-20
lines changed

3 files changed

+137
-20
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.resourcegroup;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import io.prometheus.client.Counter;
2223
import java.util.HashMap;
2324
import java.util.Set;
@@ -243,30 +244,38 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
243244
resourceUsage.setOwner(this.getID());
244245

245246
p = resourceUsage.setPublish();
246-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p);
247+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p)) {
248+
resourceUsage.clearPublish();
249+
}
247250

248251
p = resourceUsage.setDispatch();
249-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);
252+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p)) {
253+
resourceUsage.clearDispatch();
254+
}
250255

251256
p = resourceUsage.setReplicationDispatch();
252-
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p);
257+
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p)) {
258+
resourceUsage.clearReplicationDispatch();
259+
}
253260

254261
// Punt storage for now.
255262
}
256263

257264
// Transport manager mandated op.
258265
public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) {
259-
NetworkUsage p;
260-
261-
p = resourceUsage.getPublish();
262-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, p, broker);
263-
264-
p = resourceUsage.getDispatch();
265-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);
266+
if (resourceUsage.hasPublish()) {
267+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, resourceUsage.getPublish(), broker);
268+
}
266269

267-
p = resourceUsage.getReplicationDispatch();
268-
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker);
270+
if (resourceUsage.hasDispatch()) {
271+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, resourceUsage.getDispatch(),
272+
broker);
273+
}
269274

275+
if (resourceUsage.hasReplicationDispatch()) {
276+
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch,
277+
resourceUsage.getReplicationDispatch(), broker);
278+
}
270279
// Punt storage for now.
271280
}
272281

@@ -496,19 +505,17 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
496505

497506
bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
498507
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
499-
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
500-
501-
monEntity.totalUsedLocally.bytes += bytesUsed;
502-
monEntity.totalUsedLocally.messages += messagesUsed;
503-
504-
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
505508

506509
if (sendReport) {
507510
p.setBytesPerPeriod(bytesUsed);
508511
p.setMessagesPerPeriod(messagesUsed);
509512
monEntity.lastReportedValues.bytes = bytesUsed;
510513
monEntity.lastReportedValues.messages = messagesUsed;
511514
monEntity.numSuppressedUsageReports = 0;
515+
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
516+
monEntity.totalUsedLocally.bytes += bytesUsed;
517+
monEntity.totalUsedLocally.messages += messagesUsed;
518+
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
512519
} else {
513520
numSuppressions = monEntity.numSuppressedUsageReports++;
514521
}
@@ -649,6 +656,11 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
649656
};
650657
}
651658

659+
@VisibleForTesting
660+
PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClass) {
661+
return this.monitoringClassFields[monClass.ordinal()];
662+
}
663+
652664
public final String resourceGroupName;
653665

654666
public PerMonitoringClassFields[] monitoringClassFields =

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ protected void calculateQuotaForAllResourceGroups() {
780780
timeUnitScale);
781781
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
782782
maxIntervalForSuppressingReportsMSecs =
783-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
783+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
784784
}
785785
}
786786

@@ -799,7 +799,7 @@ private void initialize() {
799799
periodInSecs,
800800
this.timeUnitScale);
801801
maxIntervalForSuppressingReportsMSecs =
802-
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
802+
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
803803

804804
}
805805

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resourcegroup;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertTrue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
27+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
28+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields;
29+
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
30+
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
31+
import org.apache.pulsar.common.policies.data.ResourceGroup;
32+
import org.testng.annotations.AfterClass;
33+
import org.testng.annotations.BeforeClass;
34+
import org.testng.annotations.Test;
35+
36+
public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest {
37+
38+
@BeforeClass
39+
@Override
40+
protected void setup() throws Exception {
41+
super.internalSetup();
42+
}
43+
44+
@AfterClass(alwaysRun = true)
45+
@Override
46+
protected void cleanup() throws Exception {
47+
super.internalCleanup();
48+
}
49+
50+
51+
@Test
52+
public void testRgFillResourceUsage() throws Exception {
53+
pulsar.getResourceGroupServiceManager().close();
54+
AtomicBoolean needReport = new AtomicBoolean(false);
55+
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.HOURS, null,
56+
new ResourceQuotaCalculator() {
57+
@Override
58+
public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
59+
long currentMessagesUsed, long lastReportedMessages,
60+
long lastReportTimeMSecsSinceEpoch) {
61+
return needReport.get();
62+
}
63+
64+
@Override
65+
public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
66+
return 0;
67+
}
68+
});
69+
String rgName = "rg-1";
70+
ResourceGroup rgConfig = new ResourceGroup();
71+
rgConfig.setPublishRateInBytes(1000L);
72+
rgConfig.setPublishRateInMsgs(2000);
73+
service.resourceGroupCreate(rgName, rgConfig);
74+
75+
org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
76+
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
77+
bytesAndMessagesCount.bytes = 20;
78+
bytesAndMessagesCount.messages = 10;
79+
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);
80+
ResourceUsage resourceUsage = new ResourceUsage();
81+
resourceGroup.rgFillResourceUsage(resourceUsage);
82+
assertFalse(resourceUsage.hasDispatch());
83+
assertFalse(resourceUsage.hasPublish());
84+
85+
PerMonitoringClassFields publishMonitoredEntity =
86+
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
87+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
88+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
89+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
90+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
91+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
92+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
93+
94+
needReport.set(true);
95+
resourceGroup.rgFillResourceUsage(resourceUsage);
96+
assertTrue(resourceUsage.hasDispatch());
97+
assertTrue(resourceUsage.hasPublish());
98+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
99+
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
100+
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
101+
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
102+
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
103+
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
104+
}
105+
}

0 commit comments

Comments
 (0)