Skip to content

Commit 135f273

Browse files
author
chengliefeng
committed
feature: add TCC three-phase hooks (#6731)
1 parent 5601c36 commit 135f273

File tree

4 files changed

+256
-1
lines changed

4 files changed

+256
-1
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.fence.hook;
18+
19+
20+
public interface TccHook {
21+
22+
/**
23+
* before tcc prepare
24+
*/
25+
void beforeTccPrepare(String xid, Long branchId, String actionName);
26+
27+
/**
28+
* after tcc prepare
29+
*/
30+
void afterTccPrepare(String xid, Long branchId, String actionName);
31+
32+
/**
33+
* before tcc commit
34+
*/
35+
void beforeTccCommit(String xid, Long branchId, String actionName);
36+
37+
/**
38+
* after tcc commit
39+
*/
40+
void afterTccCommit(String xid, Long branchId, String actionName);
41+
42+
/**
43+
* before tcc rollback
44+
*/
45+
void beforeTccRollback(String xid, Long branchId, String actionName);
46+
47+
/**
48+
* after tcc rollback
49+
*/
50+
void afterTccRollback(String xid, Long branchId, String actionName);
51+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.fence.hook;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.concurrent.CopyOnWriteArrayList;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
public final class TccHookManager {
27+
private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class);
28+
29+
private TccHookManager() {
30+
31+
}
32+
33+
private static final List<TccHook> TCC_HOOKS = new CopyOnWriteArrayList<>();
34+
// Cache unmodifiable lists
35+
private volatile static List<TccHook> CACHED_UNMODIFIABLE_HOOKS = null;
36+
37+
/**
38+
* get the hooks
39+
* @return tccHook list
40+
*/
41+
public static List<TccHook> getHooks() {
42+
if (CACHED_UNMODIFIABLE_HOOKS == null) {
43+
synchronized (TccHookManager.class) {
44+
if (CACHED_UNMODIFIABLE_HOOKS == null) {
45+
CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS);
46+
}
47+
}
48+
}
49+
return CACHED_UNMODIFIABLE_HOOKS;
50+
}
51+
52+
/**
53+
* add new hook
54+
* @param tccHook tccHook
55+
*/
56+
public static void registerHook(TccHook tccHook) {
57+
if (tccHook == null) {
58+
throw new NullPointerException("tccHook must not be null");
59+
}
60+
TCC_HOOKS.add(tccHook);
61+
CACHED_UNMODIFIABLE_HOOKS = null;
62+
LOGGER.info("TccHook registered succeeded! TccHooks size: {}", TCC_HOOKS.size());
63+
}
64+
65+
/**
66+
* clear hooks
67+
*/
68+
public static void clear() {
69+
TCC_HOOKS.clear();
70+
CACHED_UNMODIFIABLE_HOOKS = null;
71+
LOGGER.info("All TccHooks have been cleared.");
72+
}
73+
}

integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.reflect.UndeclaredThrowableException;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.List;
2526
import java.util.Map;
2627

2728
import org.apache.seata.common.Constants;
@@ -32,6 +33,8 @@
3233
import org.apache.seata.common.util.NetUtil;
3334
import org.apache.seata.core.context.RootContext;
3435
import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler;
36+
import org.apache.seata.integration.tx.api.fence.hook.TccHook;
37+
import org.apache.seata.integration.tx.api.fence.hook.TccHookManager;
3538
import org.apache.seata.integration.tx.api.util.JsonUtil;
3639
import org.apache.seata.rm.DefaultResourceManager;
3740
import org.apache.seata.rm.tcc.api.BusinessActionContext;
@@ -87,7 +90,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
8790
try {
8891
//share actionContext implicitly
8992
BusinessActionContextUtil.setContext(actionContext);
90-
93+
doBeforeTccPrepare(xid, branchId, actionName);
9194
if (businessActionParam.getUseCommonFence()) {
9295
try {
9396
// Use common Fence, and return the business result
@@ -105,6 +108,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
105108
}
106109
} finally {
107110
try {
111+
doAfterTccPrepare(xid, branchId, actionName);
108112
//to report business action context finally if the actionContext.getUpdated() is true
109113
BusinessActionContextUtil.reportContext(actionContext);
110114
} finally {
@@ -119,6 +123,46 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
119123
}
120124
}
121125

126+
/**
127+
* to do some business operations before tcc prepare
128+
* @param xid the xid
129+
* @param branchId the branchId
130+
* @param actionName the actionName
131+
*/
132+
private void doBeforeTccPrepare(String xid, String branchId, String actionName) {
133+
List<TccHook> hooks = TccHookManager.getHooks();
134+
if (hooks.isEmpty()) {
135+
return;
136+
}
137+
for (TccHook hook : hooks) {
138+
try {
139+
hook.beforeTccPrepare(xid, Long.valueOf(branchId), actionName);
140+
} catch (Exception e) {
141+
LOGGER.error("Failed execute beforeTccPrepare in hook {}", e.getMessage(), e);
142+
}
143+
}
144+
}
145+
146+
/**
147+
* to do some business operations after tcc prepare
148+
* @param xid the xid
149+
* @param branchId the branchId
150+
* @param actionName the actionName
151+
*/
152+
private void doAfterTccPrepare(String xid, String branchId, String actionName) {
153+
List<TccHook> hooks = TccHookManager.getHooks();
154+
if (hooks.isEmpty()) {
155+
return;
156+
}
157+
for (TccHook hook : hooks) {
158+
try {
159+
hook.afterTccPrepare(xid, Long.valueOf(branchId), actionName);
160+
} catch (Exception e) {
161+
LOGGER.error("Failed execute afterTccPrepare in hook {}", e.getMessage(), e);
162+
}
163+
}
164+
}
165+
122166
/**
123167
* Get or create action context, and reset to arguments
124168
*

tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.Method;
2020
import java.lang.reflect.UndeclaredThrowableException;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324

@@ -31,6 +32,8 @@
3132
import org.apache.seata.core.model.BranchType;
3233
import org.apache.seata.core.model.Resource;
3334
import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler;
35+
import org.apache.seata.integration.tx.api.fence.hook.TccHook;
36+
import org.apache.seata.integration.tx.api.fence.hook.TccHookManager;
3437
import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult;
3538
import org.apache.seata.rm.AbstractResourceManager;
3639
import org.apache.seata.rm.tcc.api.BusinessActionContext;
@@ -121,6 +124,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI
121124
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
122125
//share actionContext implicitly
123126
BusinessActionContextUtil.setContext(businessActionContext);
127+
doBeforeTccCommit(xid, branchId, tccResource.getActionName());
124128
Object ret;
125129
boolean result;
126130
// add idempotent and anti hanging
@@ -149,6 +153,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI
149153
LOGGER.error(msg, ExceptionUtil.unwrap(t));
150154
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
151155
} finally {
156+
doAfterTccCommit(xid, branchId, tccResource.getActionName());
152157
// clear the action context
153158
BusinessActionContextUtil.clear();
154159
}
@@ -184,6 +189,7 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc
184189
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
185190
//share actionContext implicitly
186191
BusinessActionContextUtil.setContext(businessActionContext);
192+
doBeforeTccRollback(xid, branchId, tccResource.getActionName());
187193
Object ret;
188194
boolean result;
189195
// add idempotent and anti hanging
@@ -213,11 +219,92 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc
213219
LOGGER.error(msg, ExceptionUtil.unwrap(t));
214220
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
215221
} finally {
222+
doAfterTccRollback(xid, branchId, tccResource.getActionName());
216223
// clear the action context
217224
BusinessActionContextUtil.clear();
218225
}
219226
}
220227

228+
/**
229+
* to do some business operations before tcc rollback
230+
* @param xid the xid
231+
* @param branchId the branchId
232+
* @param actionName the actionName
233+
*/
234+
private void doBeforeTccRollback(String xid, long branchId, String actionName) {
235+
List<TccHook> hooks = TccHookManager.getHooks();
236+
if (hooks.isEmpty()) {
237+
return;
238+
}
239+
for (TccHook hook : hooks) {
240+
try {
241+
hook.beforeTccRollback(xid, branchId, actionName);
242+
} catch (Exception e) {
243+
LOGGER.error("Failed execute beforeTccRollback in hook {}", e.getMessage(), e);
244+
}
245+
}
246+
}
247+
248+
/**
249+
* to do some business operations after tcc rollback
250+
* @param xid the xid
251+
* @param branchId the branchId
252+
* @param actionName the actionName
253+
*/
254+
private void doAfterTccRollback(String xid, long branchId, String actionName) {
255+
List<TccHook> hooks = TccHookManager.getHooks();
256+
if (hooks.isEmpty()) {
257+
return;
258+
}
259+
for (TccHook hook : hooks) {
260+
try {
261+
hook.afterTccRollback(xid, branchId, actionName);
262+
} catch (Exception e) {
263+
LOGGER.error("Failed execute afterTccRollback in hook {}", e.getMessage(), e);
264+
}
265+
}
266+
}
267+
268+
/**
269+
* to do some business operations before tcc commit
270+
* @param xid the xid
271+
* @param branchId the branchId
272+
* @param actionName the actionName
273+
*/
274+
private void doBeforeTccCommit(String xid, long branchId, String actionName) {
275+
List<TccHook> hooks = TccHookManager.getHooks();
276+
if (hooks.isEmpty()) {
277+
return;
278+
}
279+
for (TccHook hook : hooks) {
280+
try {
281+
hook.beforeTccCommit(xid, branchId, actionName);
282+
} catch (Exception e) {
283+
LOGGER.error("Failed execute beforeTccCommit in hook {}", e.getMessage(), e);
284+
}
285+
}
286+
}
287+
288+
/**
289+
* to do some business operations after tcc commit
290+
* @param xid the xid
291+
* @param branchId the branchId
292+
* @param actionName the actionName
293+
*/
294+
private void doAfterTccCommit(String xid, long branchId, String actionName) {
295+
List<TccHook> hooks = TccHookManager.getHooks();
296+
if (hooks.isEmpty()) {
297+
return;
298+
}
299+
for (TccHook hook : hooks) {
300+
try {
301+
hook.afterTccCommit(xid, branchId, actionName);
302+
} catch (Exception e) {
303+
LOGGER.error("Failed execute afterTccCommit in hook {}", e.getMessage(), e);
304+
}
305+
}
306+
}
307+
221308
/**
222309
* get phase two commit method's args
223310
* @param tccResource tccResource

0 commit comments

Comments
 (0)