Skip to content

Commit 2d920e2

Browse files
author
chengliefeng
committed
feature: add TCC three-phase hooks (apache#6731)
2 parents 2637ac3 + 2186ceb commit 2d920e2

File tree

15 files changed

+127
-63
lines changed

15 files changed

+127
-63
lines changed

changes/en-us/2.x.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Add changes here for all PR submitted to the 2.x branch.
2929
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term
3030
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process
3131
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
32+
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
3233

3334

3435
### optimize:
@@ -71,6 +72,7 @@ Add changes here for all PR submitted to the 2.x branch.
7172
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7
7273
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1
7374
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts
75+
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case
7476

7577
### refactor:
7678

@@ -113,6 +115,7 @@ Thanks to these contributors for their code commits. Please report an unintended
113115
- [imashimaro](https://github.com/hmj776521114)
114116
- [lyl2008dsg](https://github.com/lyl2008dsg)
115117
- [l81893521](https://github.com/l81893521)
118+
- [laywin](https://github.com/laywin)
116119

117120

118121
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

changes/zh-cn/2.x.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题
3131
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
3232
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
33+
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址
3334

3435

3536
### optimize:
@@ -72,7 +73,7 @@
7273
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] 升级 elliptic 至 6.5.7 版本
7374
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1
7475
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题
75-
76+
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题
7677

7778
### refactor:
7879

@@ -117,6 +118,7 @@
117118
- [imashimaro](https://github.com/hmj776521114)
118119
- [lyl2008dsg](https://github.com/lyl2008dsg)
119120
- [l81893521](https://github.com/l81893521)
121+
- [laywin](https://github.com/laywin)
120122

121123

122124

config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ public void getConfig() {
7575
Configuration configuration = ConfigurationFactory.getInstance();
7676
String configStrValue = configuration.getConfig(SUB_NACOS_DATAID);
7777
Assertions.assertNull(configStrValue);
78-
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 5000);
78+
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 1000);
7979
Assertions.assertNull(configStrValue);
80-
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000);
80+
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000);
8181
Assertions.assertEquals("TEST", configStrValue);
8282
ConfigurationCache.clear();
8383
System.setProperty(SUB_NACOS_DATAID, "SYS-TEST");
84-
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000);
84+
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000);
8585
Assertions.assertEquals("SYS-TEST", configStrValue);
8686
ConfigurationCache.clear();
8787
System.clearProperty(SUB_NACOS_DATAID);
@@ -91,45 +91,45 @@ public void getConfig() {
9191
Assertions.assertEquals(0, configIntValue);
9292
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100);
9393
Assertions.assertEquals(100, configIntValue);
94-
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 5000);
94+
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 1000);
9595
Assertions.assertEquals(100, configIntValue);
9696

9797
ConfigurationCache.clear();
9898
boolean configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID);
9999
Assertions.assertEquals(false, configBoolValue);
100100
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true);
101101
Assertions.assertEquals(true, configBoolValue);
102-
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 5000);
102+
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 1000);
103103
Assertions.assertEquals(true, configBoolValue);
104104

105105
ConfigurationCache.clear();
106106
short configShortValue = configuration.getShort(SUB_NACOS_DATAID);
107107
Assertions.assertEquals(0, configShortValue);
108108
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)64);
109109
Assertions.assertEquals(64, configShortValue);
110-
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 5000);
110+
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 1000);
111111
Assertions.assertEquals(127, configShortValue);
112112

113113
ConfigurationCache.clear();
114114
long configLongValue = configuration.getShort(SUB_NACOS_DATAID);
115115
Assertions.assertEquals(0L, configLongValue);
116116
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 12345678L);
117117
Assertions.assertEquals(12345678L, configLongValue);
118-
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 5000);
118+
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 1000);
119119
Assertions.assertEquals(65535L, configLongValue);
120120

121121
ConfigurationCache.clear();
122122
Duration configDurValue = configuration.getDuration(SUB_NACOS_DATAID);
123123
Assertions.assertEquals(Duration.ZERO, configDurValue);
124-
Duration defaultDuration = Duration.ofMillis(5000);
124+
Duration defaultDuration = Duration.ofMillis(1000);
125125
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration);
126126
Assertions.assertEquals(defaultDuration, configDurValue);
127127
defaultDuration = Duration.ofMillis(1000);
128-
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 5000);
128+
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 1000);
129129
Assertions.assertEquals(defaultDuration, configDurValue);
130130

131131
ConfigurationCache.clear();
132-
configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 5000);
132+
configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 1000);
133133
Assertions.assertEquals("DEFAULT", configStrValue);
134134

135135
}
@@ -145,7 +145,7 @@ public void putConfigIfAbsent() {
145145
@Test
146146
public void removeConfig() {
147147
Configuration configuration = ConfigurationFactory.getInstance();
148-
boolean removed = configuration.removeConfig(SUB_NACOS_DATAID);
148+
boolean removed = configuration.removeConfig(NACOS_DATAID);
149149
Assertions.assertTrue(removed);
150150
}
151151

@@ -175,7 +175,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) {
175175
configuration.addConfigListener(SUB_NACOS_DATAID, listener);
176176
Thread.sleep(1000);
177177
configuration.putConfig(NACOS_DATAID, "KEY=VALUE");
178-
latch.await(3000, TimeUnit.MILLISECONDS);
178+
latch.await(1000, TimeUnit.MILLISECONDS);
179179
Set<ConfigurationChangeListener> listeners = configuration.getConfigListeners(SUB_NACOS_DATAID);
180180
//configcache listener + user listener
181181
Assertions.assertEquals(2, listeners.size());

discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener
7575
private static final int THREAD_POOL_NUM = 1;
7676
private static final int MAP_INITIAL_CAPACITY = 8;
7777

78+
private String transactionServiceGroup;
79+
7880
/**
7981
* default tcp check interval
8082
*/
@@ -161,6 +163,7 @@ public void unsubscribe(String cluster, ConsulListener listener) throws Exceptio
161163

162164
@Override
163165
public List<InetSocketAddress> lookup(String key) throws Exception {
166+
transactionServiceGroup = key;
164167
final String cluster = getServiceGroup(key);
165168
if (cluster == null) {
166169
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List<HealthService> services) {
311314

312315
clusterAddressMap.put(cluster, addresses);
313316

314-
removeOfflineAddressesIfNecessary(cluster, addresses);
317+
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses);
315318
}
316319

317320
/**

discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*/
1717
package org.apache.seata.discovery.registry;
1818

19+
import org.apache.seata.common.util.CollectionUtils;
20+
import org.apache.seata.config.ConfigurationFactory;
21+
1922
import java.net.InetSocketAddress;
20-
import java.util.ArrayList;
2123
import java.util.Collection;
2224
import java.util.Collections;
2325
import java.util.HashSet;
@@ -27,8 +29,6 @@
2729
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.stream.Collectors;
2931

30-
import org.apache.seata.config.ConfigurationFactory;
31-
3232
/**
3333
* The interface Registry service.
3434
*
@@ -54,7 +54,7 @@ public interface RegistryService<T> {
5454
/**
5555
* Service node health check
5656
*/
57-
Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
57+
Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
5858
/**
5959
* Register.
6060
*
@@ -119,12 +119,29 @@ default String getServiceGroup(String key) {
119119
}
120120

121121
default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
122-
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
122+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
123+
k -> new ConcurrentHashMap<>());
124+
125+
String clusterName = getServiceGroup(transactionServiceGroup);
126+
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName);
127+
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
128+
return inetSocketAddresses;
129+
}
130+
131+
// fall back to addresses of any cluster
132+
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
133+
.findAny().orElse(Collections.emptyList());
123134
}
124135

125136
default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
126137
List<InetSocketAddress> aliveAddress) {
127-
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);
138+
139+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
140+
key -> new ConcurrentHashMap<>());
141+
142+
String clusterName = getServiceGroup(transactionServiceGroup);
143+
144+
return clusterAddressMap.put(clusterName, aliveAddress);
128145
}
129146

130147

@@ -137,15 +154,21 @@ default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGrou
137154
* @param clusterName
138155
* @param newAddressed
139156
*/
140-
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {
157+
default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) {
158+
159+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
160+
key -> new ConcurrentHashMap<>());
141161

142-
List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
162+
List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList());
143163

144164
List<InetSocketAddress> inetSocketAddresses = currentAddresses
145165
.stream().filter(newAddressed::contains).collect(
146166
Collectors.toList());
147167

148-
CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
168+
// prevent empty update
169+
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
170+
clusterAddressMap.put(clusterName, inetSocketAddresses);
171+
}
149172
}
150173

151174
}

discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
7575
private static final int MAP_INITIAL_CAPACITY = 8;
7676
private static final int THREAD_POOL_SIZE = 2;
7777
private ExecutorService executorService;
78+
79+
private String transactionServiceGroup;
7880
/**
7981
* TTL for lease
8082
*/
@@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio
181183

182184
@Override
183185
public List<InetSocketAddress> lookup(String key) throws Exception {
186+
transactionServiceGroup = key;
184187
final String cluster = getServiceGroup(key);
185188
if (cluster == null) {
186189
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception {
252255
}).collect(Collectors.toList());
253256
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));
254257

255-
removeOfflineAddressesIfNecessary(cluster, instanceList);
258+
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList);
256259
}
257260

258261
/**

discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
7575
private static volatile EurekaRegistryServiceImpl instance;
7676
private static volatile EurekaClient eurekaClient;
7777

78+
private String transactionServiceGroup;
79+
7880
private EurekaRegistryServiceImpl() {
7981
}
8082

@@ -130,6 +132,7 @@ public void unsubscribe(String cluster, EurekaEventListener listener) throws Exc
130132

131133
@Override
132134
public List<InetSocketAddress> lookup(String key) throws Exception {
135+
transactionServiceGroup = key;
133136
String clusterName = getServiceGroup(key);
134137
if (clusterName == null) {
135138
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) {
169172
.collect(Collectors.toList());
170173
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
171174

172-
removeOfflineAddressesIfNecessary(clusterName, newAddressList);
175+
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
173176
}
174177
}
175178

discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener>
8484
private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
8585
private static volatile Boolean useSLBWay;
8686

87+
private String transactionServiceGroup;
88+
8789
private NacosRegistryServiceImpl() {
8890
String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
8991
Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB)
@@ -193,7 +195,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
193195
.collect(Collectors.toList());
194196
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
195197

196-
removeOfflineAddressesIfNecessary(clusterName, newAddressList);
198+
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
197199
}
198200
});
199201
}

0 commit comments

Comments
 (0)