18
18
*/
19
19
package org .apache .pulsar .broker .admin ;
20
20
21
+ import static org .mockito .Mockito .doReturn ;
21
22
import io .jsonwebtoken .Jwts ;
23
+ import java .util .UUID ;
24
+ import java .util .concurrent .atomic .AtomicBoolean ;
25
+ import java .util .function .Consumer ;
22
26
import org .apache .commons .lang3 .reflect .FieldUtils ;
27
+ import org .apache .pulsar .broker .BrokerTestUtil ;
23
28
import org .apache .pulsar .broker .authorization .AuthorizationService ;
24
29
import org .apache .pulsar .client .admin .PulsarAdmin ;
25
30
import org .apache .pulsar .common .policies .data .NamespaceOperation ;
26
31
import org .apache .pulsar .common .policies .data .TopicOperation ;
27
32
import org .apache .pulsar .security .MockedPulsarStandalone ;
28
33
import org .mockito .Mockito ;
34
+ import org .mockito .invocation .InvocationOnMock ;
29
35
import org .testng .Assert ;
30
36
import org .testng .annotations .AfterMethod ;
31
37
import org .testng .annotations .BeforeMethod ;
32
- import java .util .UUID ;
33
- import java .util .concurrent .atomic .AtomicBoolean ;
34
- import static org .mockito .Mockito .doReturn ;
35
38
36
- public class AuthZTest extends MockedPulsarStandalone {
39
+ public abstract class AuthZTest extends MockedPulsarStandalone {
37
40
38
41
protected PulsarAdmin superUserAdmin ;
39
42
@@ -47,6 +50,9 @@ public class AuthZTest extends MockedPulsarStandalone {
47
50
protected static final String TENANT_ADMIN_TOKEN = Jwts .builder ()
48
51
.claim ("sub" , TENANT_ADMIN_SUBJECT ).signWith (SECRET_KEY ).compact ();
49
52
53
+ private volatile Consumer <InvocationOnMock > allowTopicOperationAsyncHandler ;
54
+ private volatile Consumer <InvocationOnMock > allowNamespaceOperationAsyncHandler ;
55
+
50
56
@ Override
51
57
public void close () throws Exception {
52
58
if (superUserAdmin != null ) {
@@ -65,48 +71,62 @@ public void close() throws Exception {
65
71
@ BeforeMethod (alwaysRun = true )
66
72
public void before () throws IllegalAccessException {
67
73
orignalAuthorizationService = getPulsarService ().getBrokerService ().getAuthorizationService ();
68
- authorizationService = Mockito . spy (orignalAuthorizationService );
74
+ authorizationService = BrokerTestUtil . spyWithoutRecordingInvocations (orignalAuthorizationService );
69
75
FieldUtils .writeField (getPulsarService ().getBrokerService (), "authorizationService" ,
70
76
authorizationService , true );
77
+ Mockito .doAnswer (invocationOnMock -> {
78
+ Consumer <InvocationOnMock > localAllowTopicOperationAsyncHandler =
79
+ allowTopicOperationAsyncHandler ;
80
+ if (localAllowTopicOperationAsyncHandler != null ) {
81
+ localAllowTopicOperationAsyncHandler .accept (invocationOnMock );
82
+ }
83
+ return invocationOnMock .callRealMethod ();
84
+ }).when (authorizationService ).allowTopicOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
85
+ Mockito .any (), Mockito .any ());
86
+ doReturn (true )
87
+ .when (authorizationService ).isValidOriginalPrincipal (Mockito .any (), Mockito .any (), Mockito .any ());
88
+ Mockito .doAnswer (invocationOnMock -> {
89
+ Consumer <InvocationOnMock > localAllowNamespaceOperationAsyncHandler =
90
+ allowNamespaceOperationAsyncHandler ;
91
+ if (localAllowNamespaceOperationAsyncHandler != null ) {
92
+ localAllowNamespaceOperationAsyncHandler .accept (invocationOnMock );
93
+ }
94
+ return invocationOnMock .callRealMethod ();
95
+ }).when (authorizationService ).allowNamespaceOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
96
+ Mockito .any (), Mockito .any ());
71
97
}
72
98
73
99
@ AfterMethod (alwaysRun = true )
74
100
public void after () throws IllegalAccessException {
75
101
FieldUtils .writeField (getPulsarService ().getBrokerService (), "authorizationService" ,
76
102
orignalAuthorizationService , true );
103
+ allowNamespaceOperationAsyncHandler = null ;
104
+ allowTopicOperationAsyncHandler = null ;
77
105
}
78
106
79
107
protected AtomicBoolean setAuthorizationTopicOperationChecker (String role , Object operation ) {
80
108
AtomicBoolean execFlag = new AtomicBoolean (false );
81
109
if (operation instanceof TopicOperation ) {
82
- Mockito . doAnswer ( invocationOnMock -> {
110
+ allowTopicOperationAsyncHandler = invocationOnMock -> {
83
111
String role_ = invocationOnMock .getArgument (2 );
84
112
if (role .equals (role_ )) {
85
113
TopicOperation operation_ = invocationOnMock .getArgument (1 );
86
114
Assert .assertEquals (operation_ , operation );
87
115
}
88
116
execFlag .set (true );
89
- return invocationOnMock .callRealMethod ();
90
- }).when (authorizationService ).allowTopicOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
91
- Mockito .any (), Mockito .any ());
117
+ };
92
118
} else if (operation instanceof NamespaceOperation ) {
93
- doReturn (true )
94
- .when (authorizationService ).isValidOriginalPrincipal (Mockito .any (), Mockito .any (), Mockito .any ());
95
- Mockito .doAnswer (invocationOnMock -> {
119
+ allowNamespaceOperationAsyncHandler = invocationOnMock -> {
96
120
String role_ = invocationOnMock .getArgument (2 );
97
121
if (role .equals (role_ )) {
98
122
TopicOperation operation_ = invocationOnMock .getArgument (1 );
99
123
Assert .assertEquals (operation_ , operation );
100
124
}
101
125
execFlag .set (true );
102
- return invocationOnMock .callRealMethod ();
103
- }).when (authorizationService ).allowNamespaceOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
104
- Mockito .any (), Mockito .any ());
126
+ };
105
127
} else {
106
128
throw new IllegalArgumentException ("" );
107
129
}
108
-
109
-
110
130
return execFlag ;
111
131
}
112
132
0 commit comments