1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one
3
+ * or more contributor license agreements. See the NOTICE file
4
+ * distributed with this work for additional information
5
+ * regarding copyright ownership. The ASF licenses this file
6
+ * to you under the Apache License, Version 2.0 (the
7
+ * "License"); you may not use this file except in compliance
8
+ * with the License. You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing,
13
+ * software distributed under the License is distributed on an
14
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
+ * KIND, either express or implied. See the License for the
16
+ * specific language governing permissions and limitations
17
+ * under the License.
18
+ */
19
+ package org .apache .pulsar .broker .resourcegroup ;
20
+
21
+ import static org .testng .Assert .assertEquals ;
22
+ import static org .testng .Assert .assertFalse ;
23
+ import static org .testng .Assert .assertTrue ;
24
+ import java .util .concurrent .TimeUnit ;
25
+ import java .util .concurrent .atomic .AtomicBoolean ;
26
+ import org .apache .pulsar .broker .auth .MockedPulsarServiceBaseTest ;
27
+ import org .apache .pulsar .broker .resourcegroup .ResourceGroup .BytesAndMessagesCount ;
28
+ import org .apache .pulsar .broker .resourcegroup .ResourceGroup .PerMonitoringClassFields ;
29
+ import org .apache .pulsar .broker .resourcegroup .ResourceGroup .ResourceGroupMonitoringClass ;
30
+ import org .apache .pulsar .broker .service .resource .usage .ResourceUsage ;
31
+ import org .apache .pulsar .common .policies .data .ResourceGroup ;
32
+ import org .testng .annotations .AfterClass ;
33
+ import org .testng .annotations .BeforeClass ;
34
+ import org .testng .annotations .Test ;
35
+
36
+ public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest {
37
+
38
+ @ BeforeClass
39
+ @ Override
40
+ protected void setup () throws Exception {
41
+ super .internalSetup ();
42
+ }
43
+
44
+ @ AfterClass (alwaysRun = true )
45
+ @ Override
46
+ protected void cleanup () throws Exception {
47
+ super .internalCleanup ();
48
+ }
49
+
50
+
51
+ @ Test
52
+ public void testRgFillResourceUsage () throws Exception {
53
+ pulsar .getResourceGroupServiceManager ().close ();
54
+ AtomicBoolean needReport = new AtomicBoolean (false );
55
+ ResourceGroupService service = new ResourceGroupService (pulsar , TimeUnit .HOURS , null ,
56
+ new ResourceQuotaCalculator () {
57
+ @ Override
58
+ public boolean needToReportLocalUsage (long currentBytesUsed , long lastReportedBytes ,
59
+ long currentMessagesUsed , long lastReportedMessages ,
60
+ long lastReportTimeMSecsSinceEpoch ) {
61
+ return needReport .get ();
62
+ }
63
+
64
+ @ Override
65
+ public long computeLocalQuota (long confUsage , long myUsage , long [] allUsages ) {
66
+ return 0 ;
67
+ }
68
+ });
69
+ String rgName = "rg-1" ;
70
+ ResourceGroup rgConfig = new ResourceGroup ();
71
+ rgConfig .setPublishRateInBytes (1000L );
72
+ rgConfig .setPublishRateInMsgs (2000 );
73
+ service .resourceGroupCreate (rgName , rgConfig );
74
+
75
+ org .apache .pulsar .broker .resourcegroup .ResourceGroup resourceGroup = service .resourceGroupGet (rgName );
76
+ BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount ();
77
+ bytesAndMessagesCount .bytes = 20 ;
78
+ bytesAndMessagesCount .messages = 10 ;
79
+ resourceGroup .incrementLocalUsageStats (ResourceGroupMonitoringClass .Publish , bytesAndMessagesCount );
80
+ ResourceUsage resourceUsage = new ResourceUsage ();
81
+ resourceGroup .rgFillResourceUsage (resourceUsage );
82
+ assertFalse (resourceUsage .hasDispatch ());
83
+ assertFalse (resourceUsage .hasPublish ());
84
+
85
+ PerMonitoringClassFields publishMonitoredEntity =
86
+ resourceGroup .getMonitoredEntity (ResourceGroupMonitoringClass .Publish );
87
+ assertEquals (publishMonitoredEntity .usedLocallySinceLastReport .messages , bytesAndMessagesCount .messages );
88
+ assertEquals (publishMonitoredEntity .usedLocallySinceLastReport .bytes , bytesAndMessagesCount .bytes );
89
+ assertEquals (publishMonitoredEntity .totalUsedLocally .messages , 0 );
90
+ assertEquals (publishMonitoredEntity .totalUsedLocally .bytes , 0 );
91
+ assertEquals (publishMonitoredEntity .lastReportedValues .messages , 0 );
92
+ assertEquals (publishMonitoredEntity .lastReportedValues .bytes , 0 );
93
+
94
+ needReport .set (true );
95
+ resourceGroup .rgFillResourceUsage (resourceUsage );
96
+ assertTrue (resourceUsage .hasDispatch ());
97
+ assertTrue (resourceUsage .hasPublish ());
98
+ assertEquals (publishMonitoredEntity .usedLocallySinceLastReport .messages , 0 );
99
+ assertEquals (publishMonitoredEntity .usedLocallySinceLastReport .bytes , 0 );
100
+ assertEquals (publishMonitoredEntity .totalUsedLocally .messages , bytesAndMessagesCount .messages );
101
+ assertEquals (publishMonitoredEntity .totalUsedLocally .bytes , bytesAndMessagesCount .bytes );
102
+ assertEquals (publishMonitoredEntity .lastReportedValues .messages , bytesAndMessagesCount .messages );
103
+ assertEquals (publishMonitoredEntity .lastReportedValues .bytes , bytesAndMessagesCount .bytes );
104
+ }
105
+ }
0 commit comments