Skip to content

Commit b0c2bb2

Browse files
authored
optimize: fall back to any of available cluster address when query cluster address is empty (#6797)
1 parent 525d1b9 commit b0c2bb2

File tree

12 files changed

+113
-50
lines changed

12 files changed

+113
-50
lines changed

changes/en-us/2.x.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Add changes here for all PR submitted to the 2.x branch.
2828
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term
2929
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process
3030
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
31+
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
3132

3233

3334
### optimize:
@@ -113,6 +114,7 @@ Thanks to these contributors for their code commits. Please report an unintended
113114
- [imashimaro](https://github.com/hmj776521114)
114115
- [lyl2008dsg](https://github.com/lyl2008dsg)
115116
- [l81893521](https://github.com/l81893521)
117+
- [laywin](https://github.com/laywin)
116118

117119

118120
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

changes/zh-cn/2.x.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题
3030
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
3131
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
32+
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址
3233

3334

3435
### optimize:
@@ -73,7 +74,6 @@
7374
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题
7475
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题
7576

76-
7777
### refactor:
7878

7979

@@ -117,6 +117,7 @@
117117
- [imashimaro](https://github.com/hmj776521114)
118118
- [lyl2008dsg](https://github.com/lyl2008dsg)
119119
- [l81893521](https://github.com/l81893521)
120+
- [laywin](https://github.com/laywin)
120121

121122

122123

discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener
7575
private static final int THREAD_POOL_NUM = 1;
7676
private static final int MAP_INITIAL_CAPACITY = 8;
7777

78+
private String transactionServiceGroup;
79+
7880
/**
7981
* default tcp check interval
8082
*/
@@ -161,6 +163,7 @@ public void unsubscribe(String cluster, ConsulListener listener) throws Exceptio
161163

162164
@Override
163165
public List<InetSocketAddress> lookup(String key) throws Exception {
166+
transactionServiceGroup = key;
164167
final String cluster = getServiceGroup(key);
165168
if (cluster == null) {
166169
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List<HealthService> services) {
311314

312315
clusterAddressMap.put(cluster, addresses);
313316

314-
removeOfflineAddressesIfNecessary(cluster, addresses);
317+
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses);
315318
}
316319

317320
/**

discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*/
1717
package org.apache.seata.discovery.registry;
1818

19+
import org.apache.seata.common.util.CollectionUtils;
20+
import org.apache.seata.config.ConfigurationFactory;
21+
1922
import java.net.InetSocketAddress;
20-
import java.util.ArrayList;
2123
import java.util.Collection;
2224
import java.util.Collections;
2325
import java.util.HashSet;
@@ -27,8 +29,6 @@
2729
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.stream.Collectors;
2931

30-
import org.apache.seata.config.ConfigurationFactory;
31-
3232
/**
3333
* The interface Registry service.
3434
*
@@ -54,7 +54,7 @@ public interface RegistryService<T> {
5454
/**
5555
* Service node health check
5656
*/
57-
Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
57+
Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
5858
/**
5959
* Register.
6060
*
@@ -119,12 +119,29 @@ default String getServiceGroup(String key) {
119119
}
120120

121121
default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
122-
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
122+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
123+
k -> new ConcurrentHashMap<>());
124+
125+
String clusterName = getServiceGroup(transactionServiceGroup);
126+
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName);
127+
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
128+
return inetSocketAddresses;
129+
}
130+
131+
// fall back to addresses of any cluster
132+
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
133+
.findAny().orElse(Collections.emptyList());
123134
}
124135

125136
default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
126137
List<InetSocketAddress> aliveAddress) {
127-
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);
138+
139+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
140+
key -> new ConcurrentHashMap<>());
141+
142+
String clusterName = getServiceGroup(transactionServiceGroup);
143+
144+
return clusterAddressMap.put(clusterName, aliveAddress);
128145
}
129146

130147

@@ -137,15 +154,21 @@ default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGrou
137154
* @param clusterName
138155
* @param newAddressed
139156
*/
140-
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {
157+
default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) {
158+
159+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
160+
key -> new ConcurrentHashMap<>());
141161

142-
List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
162+
List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList());
143163

144164
List<InetSocketAddress> inetSocketAddresses = currentAddresses
145165
.stream().filter(newAddressed::contains).collect(
146166
Collectors.toList());
147167

148-
CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
168+
// prevent empty update
169+
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
170+
clusterAddressMap.put(clusterName, inetSocketAddresses);
171+
}
149172
}
150173

151174
}

discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
7575
private static final int MAP_INITIAL_CAPACITY = 8;
7676
private static final int THREAD_POOL_SIZE = 2;
7777
private ExecutorService executorService;
78+
79+
private String transactionServiceGroup;
7880
/**
7981
* TTL for lease
8082
*/
@@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio
181183

182184
@Override
183185
public List<InetSocketAddress> lookup(String key) throws Exception {
186+
transactionServiceGroup = key;
184187
final String cluster = getServiceGroup(key);
185188
if (cluster == null) {
186189
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception {
252255
}).collect(Collectors.toList());
253256
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));
254257

255-
removeOfflineAddressesIfNecessary(cluster, instanceList);
258+
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList);
256259
}
257260

258261
/**

discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
7575
private static volatile EurekaRegistryServiceImpl instance;
7676
private static volatile EurekaClient eurekaClient;
7777

78+
private String transactionServiceGroup;
79+
7880
private EurekaRegistryServiceImpl() {
7981
}
8082

@@ -130,6 +132,7 @@ public void unsubscribe(String cluster, EurekaEventListener listener) throws Exc
130132

131133
@Override
132134
public List<InetSocketAddress> lookup(String key) throws Exception {
135+
transactionServiceGroup = key;
133136
String clusterName = getServiceGroup(key);
134137
if (clusterName == null) {
135138
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) {
169172
.collect(Collectors.toList());
170173
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
171174

172-
removeOfflineAddressesIfNecessary(clusterName, newAddressList);
175+
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
173176
}
174177
}
175178

discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener>
8484
private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
8585
private static volatile Boolean useSLBWay;
8686

87+
private String transactionServiceGroup;
88+
8789
private NacosRegistryServiceImpl() {
8890
String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
8991
Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB)
@@ -193,7 +195,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
193195
.collect(Collectors.toList());
194196
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
195197

196-
removeOfflineAddressesIfNecessary(clusterName, newAddressList);
198+
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
197199
}
198200
});
199201
}

discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
*/
1717
package org.apache.seata.discovery.registry.namingserver;
1818

19-
2019
import java.io.IOException;
2120
import java.net.InetSocketAddress;
2221
import java.rmi.RemoteException;
23-
import java.util.ArrayList;
24-
import java.util.HashMap;
2522
import java.util.List;
23+
import java.util.HashMap;
24+
import java.util.Objects;
2625
import java.util.Map;
26+
import java.util.ArrayList;
2727
import java.util.Arrays;
28-
import java.util.Objects;
28+
import java.util.Collections;
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.ConcurrentMap;
3131
import java.util.concurrent.ScheduledExecutorService;
@@ -41,22 +41,23 @@
4141
import com.fasterxml.jackson.core.type.TypeReference;
4242
import com.fasterxml.jackson.databind.DeserializationFeature;
4343
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import org.apache.http.HttpStatus;
45+
import org.apache.http.StatusLine;
46+
import org.apache.http.client.methods.CloseableHttpResponse;
4447
import org.apache.http.entity.ContentType;
4548
import org.apache.http.protocol.HTTP;
49+
import org.apache.http.util.EntityUtils;
4650
import org.apache.seata.common.metadata.Node;
4751
import org.apache.seata.common.metadata.namingserver.Instance;
4852
import org.apache.seata.common.metadata.namingserver.MetaResponse;
4953
import org.apache.seata.common.thread.NamedThreadFactory;
54+
import org.apache.seata.common.util.CollectionUtils;
55+
import org.apache.seata.common.util.HttpClientUtil;
5056
import org.apache.seata.common.util.NetUtil;
5157
import org.apache.seata.common.util.StringUtils;
5258
import org.apache.seata.config.Configuration;
5359
import org.apache.seata.config.ConfigurationFactory;
54-
import org.apache.seata.common.util.HttpClientUtil;
5560
import org.apache.seata.discovery.registry.RegistryService;
56-
import org.apache.http.HttpStatus;
57-
import org.apache.http.StatusLine;
58-
import org.apache.http.client.methods.CloseableHttpResponse;
59-
import org.apache.http.util.EntityUtils;
6061
import org.slf4j.Logger;
6162
import org.slf4j.LoggerFactory;
6263

@@ -322,17 +323,6 @@ public void unsubscribe(String vGroup) throws Exception {
322323
isSubscribed = false;
323324
}
324325

325-
@Override
326-
public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
327-
return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
328-
}
329-
330-
@Override
331-
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
332-
List<InetSocketAddress> aliveAddress) {
333-
return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
334-
}
335-
336326
/**
337327
* @param key vGroup name
338328
* @return List<InetSocketAddress> available instance list
@@ -413,6 +403,31 @@ public String getNamespace() {
413403
return namespace;
414404
}
415405

406+
@Override
407+
public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
408+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
409+
k -> new ConcurrentHashMap<>());
410+
411+
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup);
412+
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
413+
return inetSocketAddresses;
414+
}
415+
416+
// fall back to addresses of any cluster
417+
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
418+
.findAny().orElse(Collections.emptyList());
419+
}
420+
421+
@Override
422+
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
423+
List<InetSocketAddress> aliveAddress) {
424+
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
425+
key -> new ConcurrentHashMap<>());
426+
427+
428+
return clusterAddressMap.put(transactionServiceGroup, aliveAddress);
429+
}
430+
416431

417432
/**
418433
* get one namingserver url

discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
7474
private static final long KEY_TTL = 5L;
7575
private static final long KEY_REFRESH_PERIOD = 2000L;
7676

77+
private String transactionServiceGroup;
78+
7779
private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
7880
new NamedThreadFactory("RedisRegistryService-subscribe", 1));
7981
private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1,
@@ -219,6 +221,7 @@ public void unsubscribe(String cluster, RedisListener listener) {
219221

220222
@Override
221223
public List<InetSocketAddress> lookup(String key) {
224+
transactionServiceGroup = key;
222225
String clusterName = getServiceGroup(key);
223226
if (clusterName == null) {
224227
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -280,7 +283,7 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
280283
}
281284
socketAddresses.remove(inetSocketAddress);
282285

283-
removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
286+
removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses);
284287
}
285288

286289
@Override

discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb
8181

8282
private static volatile SofaRegistryServiceImpl instance;
8383

84+
private String transactionServiceGroup;
85+
8486
private SofaRegistryServiceImpl() {
8587
}
8688

@@ -159,6 +161,7 @@ public void unsubscribe(String cluster, SubscriberDataObserver listener) throws
159161

160162
@Override
161163
public List<InetSocketAddress> lookup(String key) throws Exception {
164+
transactionServiceGroup = key;
162165
String clusterName = getServiceGroup(key);
163166
if (clusterName == null) {
164167
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
@@ -174,7 +177,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
174177
List<InetSocketAddress> newAddressList = flatData(instances);
175178
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
176179

177-
removeOfflineAddressesIfNecessary(clusterName, newAddressList);
180+
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
178181
}
179182
respondRegistries.countDown();
180183
});

0 commit comments

Comments
 (0)