Skip to content

Commit e8ea2cc

Browse files
authored
bugfix: thread safety issue when adding and removing instances (apache#6668)
1 parent f873475 commit e8ea2cc

File tree

12 files changed

+126
-137
lines changed

12 files changed

+126
-137
lines changed

changes/en-us/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
1313
- [[#6626](https://github.com/apache/incubator-seata/pull/6626)] fix hsf ConsumerModel convert error
1414
- [[#6642](https://github.com/apache/incubator-seata/pull/6642)] codecov token not found
1515
- [[#6661](https://github.com/apache/incubator-seata/pull/6661)] fix `tableMeta` cache scheduled refresh issue
16+
- [[#6668](https://github.com/apache/incubator-seata/pull/6668)] thread safety issue when adding and removing instances
1617

1718
### optimize:
1819
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses

changes/zh-cn/2.x.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
- [[#6640](https://github.com/apache/incubator-seata/pull/6640)] 优化codecov相关配置
1515
- [[#6642](https://github.com/apache/incubator-seata/pull/6642)] 修复codecov token找不到导致无法提交单测覆盖度报告
1616
- [[#6661](https://github.com/apache/incubator-seata/pull/6661)] 修复`tableMeta`缓存定时刷新失效问题
17+
- [[#6668](https://github.com/apache/incubator-seata/pull/6668)] 解决namingserver同一个集群下instance添加和删除时的线程安全问题
18+
1719

1820
### optimize:
1921
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池

common/src/main/java/org/apache/seata/common/metadata/namingserver/Unit.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,18 @@ public void removeInstance(Node node) {
4949
}
5050

5151
/**
52-
* @param node
53-
* @return true if the node has changed, false if there is no change.
52+
* @param node node
5453
*/
55-
public boolean addInstance(NamingServerNode node) {
54+
public void addInstance(NamingServerNode node) {
5655
if (nodeList.contains(node)) {
5756
Node node1 = nodeList.get(nodeList.indexOf(node));
5857
if (node.isTotalEqual(node1)) {
59-
return false;
58+
return;
6059
} else {
6160
nodeList.remove(node1);
6261
}
6362
}
6463
nodeList.add(node);
65-
return true;
6664

6765
}
6866

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.namingserver.controller;
18+
19+
import org.apache.seata.common.result.Result;
20+
import org.springframework.web.bind.annotation.GetMapping;
21+
import org.springframework.web.bind.annotation.RestController;
22+
23+
@RestController
24+
public class HealthController {
25+
26+
@GetMapping("/health")
27+
public Result<?> healthCheck() {
28+
return new Result<>();
29+
}
30+
31+
}

namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.seata.namingserver.listener.Watcher;
2525
import org.apache.seata.namingserver.manager.ClusterWatcherManager;
2626
import org.apache.seata.namingserver.manager.NamingManager;
27-
import org.apache.seata.namingserver.vo.monitor.ClusterVO;
28-
import org.apache.seata.namingserver.vo.monitor.WatcherVO;
27+
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
28+
import org.apache.seata.namingserver.entity.vo.monitor.WatcherVO;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131
import org.springframework.web.bind.annotation.PostMapping;
@@ -56,11 +56,11 @@ public class NamingController {
5656
private ClusterWatcherManager clusterWatcherManager;
5757

5858
@PostMapping("/register")
59-
public Result<?> registerInstance(@RequestParam String namespace,
59+
public Result<String> registerInstance(@RequestParam String namespace,
6060
@RequestParam String clusterName,
6161
@RequestParam String unit,
6262
@RequestBody NamingServerNode registerBody) {
63-
Result result = new Result();
63+
Result<String> result = new Result<>();
6464
boolean isSuccess = namingManager.registerInstance(registerBody, namespace, clusterName, unit);
6565
if (isSuccess) {
6666
result.setMessage("node has registered successfully!");
@@ -72,9 +72,9 @@ public Result<?> registerInstance(@RequestParam String namespace,
7272
}
7373

7474
@PostMapping("/unregister")
75-
public Result<?> unregisterInstance(@RequestParam String unit,
75+
public Result<String> unregisterInstance(@RequestParam String unit,
7676
@RequestBody Node registerBody) {
77-
Result result = new Result();
77+
Result<String> result = new Result<>();
7878
boolean isSuccess = namingManager.unregisterInstance(unit, registerBody);
7979
if (isSuccess) {
8080
result.setMessage("node has unregistered successfully!");
@@ -90,29 +90,24 @@ public List<ClusterVO> monitorCluster(String namespace) {
9090
return namingManager.monitorCluster(namespace);
9191
}
9292

93-
@GetMapping("/health")
94-
public Result<?> healthCheck() {
95-
return new Result<>();
96-
}
97-
9893
@GetMapping("/discovery")
9994
public MetaResponse discovery(@RequestParam String vGroup, @RequestParam String namespace) {
10095
return new MetaResponse(namingManager.getClusterListByVgroup(vGroup, namespace),
10196
clusterWatcherManager.getTermByvGroup(vGroup));
10297
}
10398

10499
@PostMapping("/changeGroup")
105-
public Result<?> changeGroup(@RequestParam String namespace,
100+
public Result<String> changeGroup(@RequestParam String namespace,
106101
@RequestParam String clusterName,
107102
@RequestParam String unitName,
108103
@RequestParam String vGroup) {
109104

110-
Result<?> addGroupResult = namingManager.addGroup(namespace, vGroup, clusterName, unitName);
105+
Result<String> addGroupResult = namingManager.addGroup(namespace, vGroup, clusterName, unitName);
111106
if (!addGroupResult.isSuccess()) {
112107
return addGroupResult;
113108
}
114109
// remove vGroup in old cluster
115-
Result<?> removeGroupResult = namingManager.removeGroup(namespace, vGroup, unitName);
110+
Result<String> removeGroupResult = namingManager.removeGroup(namespace, vGroup, unitName);
116111
if (!removeGroupResult.isSuccess()) {
117112
return removeGroupResult;
118113
}

namingserver/src/main/java/org/apache/seata/namingserver/pojo/ClusterData.java renamed to namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,35 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.seata.namingserver.pojo;
18-
19-
20-
import org.apache.seata.common.metadata.Cluster;
21-
import org.apache.seata.common.metadata.Node;
22-
import org.apache.seata.common.metadata.namingserver.NamingServerNode;
23-
import org.apache.seata.common.metadata.namingserver.Unit;
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
26-
import org.springframework.stereotype.Component;
17+
package org.apache.seata.namingserver.entity.pojo;
2718

2819
import java.util.ArrayList;
2920
import java.util.List;
3021
import java.util.Map;
3122
import java.util.Objects;
3223
import java.util.Optional;
3324
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import java.util.concurrent.locks.Lock;
27+
import java.util.concurrent.locks.ReentrantLock;
3428
import java.util.stream.Collectors;
3529

36-
@Component
37-
public class ClusterData extends AbstractClusterData {
30+
import org.apache.seata.common.metadata.Cluster;
31+
import org.apache.seata.common.metadata.Node;
32+
import org.apache.seata.common.metadata.namingserver.NamingServerNode;
33+
import org.apache.seata.common.metadata.namingserver.Unit;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
import org.springframework.util.CollectionUtils;
37+
import org.springframework.util.StringUtils;
38+
39+
public class ClusterData {
3840
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterData.class);
3941
private String clusterName;
4042
private String clusterType;
4143
private final Map<String, Unit> unitData;
44+
45+
private Lock lock = new ReentrantLock();
4246

4347

4448
public ClusterData() {
@@ -84,8 +88,14 @@ public void removeInstance(Node node, String unitName) {
8488
return;
8589
}
8690
unit.removeInstance(node);
87-
if (unit.getNamingInstanceList() == null || unit.getNamingInstanceList().size() == 0) {
88-
unitData.remove(unitName);
91+
// remove unit if unit has no instance
92+
lock.lock();
93+
try {
94+
if (CollectionUtils.isEmpty(unit.getNamingInstanceList())) {
95+
unitData.remove(unitName);
96+
}
97+
} finally {
98+
lock.unlock();
8999
}
90100
}
91101

@@ -101,7 +111,7 @@ public Cluster getClusterByUnit(String unitName) {
101111
Cluster clusterResponse = new Cluster();
102112
clusterResponse.setClusterName(clusterName);
103113
clusterResponse.setClusterType(clusterType);
104-
if (unitName == null || unitName.equals("")) {
114+
if (!StringUtils.hasLength(unitName)) {
105115
clusterResponse.setUnitData(new ArrayList<>(unitData.values()));
106116
} else {
107117
List<Unit> unitList = new ArrayList<>();
@@ -112,26 +122,28 @@ public Cluster getClusterByUnit(String unitName) {
112122
return clusterResponse;
113123
}
114124

115-
@Override
116125
public boolean registerInstance(NamingServerNode instance, String unitName) {
117126
// refresh node weight
118127
Object weightValue = instance.getMetadata().get("weight");
119128
if (weightValue != null) {
120129
instance.setWeight(Double.parseDouble(String.valueOf(weightValue)));
121130
instance.getMetadata().remove("weight");
122131
}
123-
124-
Unit unit = unitData.get(unitName);
125-
if (unit == null) {
126-
unit = new Unit();
127-
List<Node> instances = new ArrayList<>();
128-
instances.add(instance);
132+
Unit currentUnit = unitData.computeIfAbsent(unitName, value -> {
133+
Unit unit = new Unit();
134+
List<Node> instances = new CopyOnWriteArrayList<>();
129135
unit.setUnitName(unitName);
130136
unit.setNamingInstanceList(instances);
131-
unitData.put(unitName, unit);
132-
return true;
137+
return unit;
138+
});
139+
// ensure that when adding an instance, the remove side will not delete the unit.
140+
lock.lock();
141+
try {
142+
currentUnit.addInstance(instance);
143+
} finally {
144+
lock.unlock();
133145
}
134-
return unit.addInstance(instance);
146+
return true;
135147
}
136148

137149

namingserver/src/main/java/org/apache/seata/namingserver/vo/monitor/ClusterVO.java renamed to namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/monitor/ClusterVO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.seata.namingserver.vo.monitor;
17+
package org.apache.seata.namingserver.entity.vo.monitor;
1818

1919
import org.apache.seata.common.metadata.namingserver.Unit;
20-
import org.apache.seata.namingserver.pojo.ClusterData;
20+
import org.apache.seata.namingserver.entity.pojo.ClusterData;
2121

2222
import java.util.ArrayList;
2323
import java.util.List;

namingserver/src/main/java/org/apache/seata/namingserver/vo/monitor/WatcherVO.java renamed to namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/monitor/WatcherVO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.seata.namingserver.vo.monitor;
17+
package org.apache.seata.namingserver.entity.vo.monitor;
1818

1919
import java.util.List;
2020

0 commit comments

Comments
 (0)