Skip to content

Commit 522d795

Browse files
authored
Merge pull request #30 from SphereEx/dev-5.5.1
Improve DataSourceStateManager
2 parents 49342e8 + 4bd2f0a commit 522d795

File tree

21 files changed

+162
-66
lines changed

21 files changed

+162
-66
lines changed

features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardi
165165
: new ShardingTableBroadcastRoutingEngine(database, sqlStatementContext, shardingRuleTableNames);
166166
}
167167
if (!shardingRuleTableNames.isEmpty()) {
168-
return new ShardingUnicastRoutingEngine(sqlStatementContext, shardingRuleTableNames, connectionContext);
168+
// SPEX CHANGED: BEGIN
169+
return new ShardingUnicastRoutingEngine(sqlStatementContext, database.getName(), shardingRuleTableNames, connectionContext);
170+
// SPEX CHANGED: END
169171
}
170172
return new ShardingDataSourceGroupBroadcastRoutingEngine();
171173
}
@@ -198,7 +200,9 @@ private static ShardingRouteEngine getDQLRoutingEngine(final ShardingRule shardi
198200
final ConnectionContext connectionContext) {
199201
Collection<String> tableNames = sqlStatementContext instanceof TableAvailable ? ((TableAvailable) sqlStatementContext).getTablesContext().getTableNames() : Collections.emptyList();
200202
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty()) {
201-
return new ShardingUnicastRoutingEngine(sqlStatementContext, tableNames, connectionContext);
203+
// SPEX CHANGED: BEGIN
204+
return new ShardingUnicastRoutingEngine(sqlStatementContext, database.getName(), tableNames, connectionContext);
205+
// SPEX CHANGED: END
202206
}
203207
Collection<String> shardingLogicTableNames = shardingRule.getShardingLogicTableNames(tableNames);
204208
if (shardingLogicTableNames.isEmpty()) {

features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/unicast/ShardingUnicastRoutingEngine.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
package org.apache.shardingsphere.sharding.route.engine.type.unicast;
1919

2020
import com.google.common.collect.Sets;
21+
import com.sphereex.dbplusengine.SphereEx;
22+
import com.sphereex.dbplusengine.infra.state.datasource.DataSourceStateManager;
2123
import lombok.RequiredArgsConstructor;
2224
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
2325
import org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterViewStatementContext;
2426
import org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateViewStatementContext;
2527
import org.apache.shardingsphere.infra.binder.context.statement.ddl.DropViewStatementContext;
2628
import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
27-
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
2829
import org.apache.shardingsphere.infra.datanode.DataNode;
2930
import org.apache.shardingsphere.infra.route.context.RouteContext;
3031
import org.apache.shardingsphere.infra.route.context.RouteMapper;
3132
import org.apache.shardingsphere.infra.route.context.RouteUnit;
33+
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
3234
import org.apache.shardingsphere.sharding.exception.syntax.DataSourceIntersectionNotFoundException;
3335
import org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngine;
3436
import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -51,6 +53,9 @@ public final class ShardingUnicastRoutingEngine implements ShardingRouteEngine {
5153

5254
private final SQLStatementContext sqlStatementContext;
5355

56+
@SphereEx
57+
private final String logicDatabaseName;
58+
5459
private final Collection<String> logicTables;
5560

5661
private final ConnectionContext connectionContext;
@@ -112,6 +117,9 @@ private void routeWithMultipleTables(final RouteContext routeContext, final Shar
112117
private String getRandomDataSourceName(final Collection<String> dataSourceNames) {
113118
Collection<String> usedDataSourceNames = connectionContext.getUsedDataSourceNames();
114119
List<String> availableDataSourceNames = new ArrayList<>(usedDataSourceNames.isEmpty() ? dataSourceNames : usedDataSourceNames);
115-
return availableDataSourceNames.get(ThreadLocalRandom.current().nextInt(availableDataSourceNames.size()));
120+
// SPEX CHANGED: BEGIN
121+
List<String> enabledDataSourceNames = DataSourceStateManager.getInstance().getEnabledDataSourceNames(logicDatabaseName, availableDataSourceNames);
122+
return enabledDataSourceNames.get(ThreadLocalRandom.current().nextInt(enabledDataSourceNames.size()));
123+
// SPEX CHANGED: END
116124
}
117125
}

features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/route/engine/type/unicast/ShardingUnicastRoutingEngineTest.java

+20-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.shardingsphere.sharding.route.engine.type.unicast;
1919

20+
import com.sphereex.dbplusengine.SphereEx;
21+
import com.sphereex.dbplusengine.SphereEx.Type;
2022
import org.apache.groovy.util.Maps;
2123
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
2224
import org.apache.shardingsphere.infra.binder.context.statement.ddl.CursorStatementContext;
@@ -55,47 +57,58 @@ void setUp() {
5557

5658
@Test
5759
void assertRoutingForShardingTable() {
58-
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SQLStatementContext.class), Collections.singleton("t_order"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
60+
@SphereEx(Type.MODIFY)
61+
RouteContext actual = new ShardingUnicastRoutingEngine(
62+
mock(SQLStatementContext.class), "sharding_db", Collections.singleton("t_order"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
5963
assertThat(actual.getRouteUnits().size(), is(1));
6064
assertFalse("ds_2".equalsIgnoreCase(actual.getRouteUnits().iterator().next().getDataSourceMapper().getLogicName()));
6165
}
6266

6367
@Test
6468
void assertRoutingForBroadcastTable() {
65-
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SQLStatementContext.class), Collections.singleton("t_config"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
69+
@SphereEx(Type.MODIFY)
70+
RouteContext actual = new ShardingUnicastRoutingEngine(
71+
mock(SQLStatementContext.class), "sharding_db", Collections.singleton("t_config"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
6672
assertThat(actual.getRouteUnits().size(), is(1));
6773
}
6874

6975
@Test
7076
void assertRoutingForNoTable() {
71-
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SQLStatementContext.class), Collections.emptyList(), new ConnectionContext(Collections::emptySet)).route(shardingRule);
77+
@SphereEx(Type.MODIFY)
78+
RouteContext actual = new ShardingUnicastRoutingEngine(
79+
mock(SQLStatementContext.class), "sharding_db", Collections.emptyList(), new ConnectionContext(Collections::emptySet)).route(shardingRule);
7280
assertThat(actual.getRouteUnits().size(), is(1));
7381
}
7482

83+
@SphereEx(Type.MODIFY)
7584
@Test
7685
void assertRouteForWithNoIntersection() {
7786
assertThrows(ShardingTableRuleNotFoundException.class, () -> new ShardingUnicastRoutingEngine(
78-
mock(SQLStatementContext.class), Arrays.asList("t_order", "t_config", "t_product"), new ConnectionContext(Collections::emptySet)).route(shardingRule));
87+
mock(SQLStatementContext.class), "sharding_db", Arrays.asList("t_order", "t_config", "t_product"), new ConnectionContext(Collections::emptySet)).route(shardingRule));
7988
}
8089

8190
@Test
8291
void assertRoutingForTableWithoutTableRule() {
83-
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SQLStatementContext.class), Collections.singleton("t_other"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
92+
@SphereEx(Type.MODIFY)
93+
RouteContext actual = new ShardingUnicastRoutingEngine(
94+
mock(SQLStatementContext.class), "sharding_db", Collections.singleton("t_other"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
8495
assertThat(actual.getRouteUnits().size(), is(1));
8596
}
8697

8798
@Test
8899
void assertRoutingForBroadcastTableWithCursorStatement() {
100+
@SphereEx(Type.MODIFY)
89101
RouteContext actual = new ShardingUnicastRoutingEngine(
90-
mock(CursorStatementContext.class), Collections.singleton("t_config"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
102+
mock(CursorStatementContext.class), "sharding_db", Collections.singleton("t_config"), new ConnectionContext(Collections::emptySet)).route(shardingRule);
91103
assertThat(actual.getRouteUnits().size(), is(1));
92104
assertThat(actual.getRouteUnits().iterator().next().getDataSourceMapper().getActualName(), is("ds_0"));
93105
}
94106

95107
@Test
96108
void assertRoutingForBroadcastTableWithPreferredDataSource() {
97109
ConnectionContext connectionContext = new ConnectionContext(() -> Collections.singleton("ds_1"));
98-
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SelectStatementContext.class), Collections.singleton("t_config"), connectionContext).route(shardingRule);
110+
@SphereEx(Type.MODIFY)
111+
RouteContext actual = new ShardingUnicastRoutingEngine(mock(SelectStatementContext.class), "sharding_db", Collections.singleton("t_config"), connectionContext).route(shardingRule);
99112
assertThat(actual.getRouteUnits().size(), is(1));
100113
assertThat(actual.getRouteUnits().iterator().next().getDataSourceMapper().getActualName(), is("ds_1"));
101114
}

infra/common/src/main/java/com/sphereex/dbplusengine/infra/state/datasource/DataSourceStateManager.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.sphereex.dbplusengine.infra.state.datasource;
1919

2020
import com.sphereex.dbplusengine.SphereEx;
21+
import com.sphereex.dbplusengine.SphereEx.Type;
2122
import com.sphereex.dbplusengine.infra.state.datasource.exception.UnavailableDataSourceException;
2223
import lombok.AccessLevel;
2324
import lombok.NoArgsConstructor;
@@ -54,7 +55,8 @@ public final class DataSourceStateManager {
5455

5556
private volatile boolean forceStart;
5657

57-
private final AtomicBoolean initialized = new AtomicBoolean(false);
58+
@SphereEx
59+
private final Map<String, AtomicBoolean> databaseInitializedFlags = new ConcurrentHashMap<>();
5860

5961
/**
6062
* Get data source state manager.
@@ -73,10 +75,13 @@ public static DataSourceStateManager getInstance() {
7375
* @param storageDataSourceStates storage node data source state
7476
* @param forceStart whether to force start
7577
*/
78+
@SphereEx(Type.MODIFY)
7679
public void initStates(final String databaseName, final Map<String, StorageUnit> storageUnits, final Map<String, DataSourceState> storageDataSourceStates, final boolean forceStart) {
7780
this.forceStart = forceStart;
81+
AtomicBoolean initialized = databaseInitializedFlags.getOrDefault(databaseName, new AtomicBoolean(false));
7882
if (initialized.compareAndSet(false, true)) {
7983
storageUnits.forEach((key, value) -> initState(databaseName, storageDataSourceStates, key, value.getDataSource()));
84+
databaseInitializedFlags.putIfAbsent(databaseName, initialized);
8085
}
8186
}
8287

@@ -89,12 +94,16 @@ private void initState(final String databaseName, final Map<String, DataSourceSt
8994
}
9095
}
9196

97+
@SphereEx(Type.MODIFY)
9298
private void checkState(final String databaseName, final String actualDataSourceName, final DataSource dataSource) {
9399
try (Connection ignored = dataSource.getConnection()) {
94100
dataSourceStates.put(getCacheKey(databaseName, actualDataSourceName), DataSourceState.ENABLED);
95-
} catch (final SQLException ex) {
96-
ShardingSpherePreconditions.checkState(forceStart, () -> new UnavailableDataSourceException(actualDataSourceName, ex));
97-
log.error("Data source unavailable, ignored with the -f parameter.", ex);
101+
// CHECKSTYLE:OFF
102+
} catch (final Exception ex) {
103+
// CHECKSTYLE:ON
104+
ShardingSpherePreconditions.checkState(forceStart, () -> new UnavailableDataSourceException(databaseName, actualDataSourceName, ex));
105+
log.error("Data source `{}` in `{}` is unavailable, ignored with the -f parameter.", actualDataSourceName, databaseName, ex);
106+
dataSourceStates.put(getCacheKey(databaseName, actualDataSourceName), DataSourceState.DISABLED);
98107
}
99108
}
100109

@@ -119,12 +128,10 @@ public Map<String, DataSource> getEnabledDataSources(final String databaseName,
119128
* @return enabled data sources
120129
*/
121130
public Map<String, DataSource> getEnabledDataSources(final String databaseName, final Map<String, DataSource> dataSources) {
122-
if (dataSources.isEmpty() || !initialized.get()) {
131+
if (dataSources.isEmpty() || !databaseInitializedFlags.getOrDefault(databaseName, new AtomicBoolean(false)).get()) {
123132
return dataSources;
124133
}
125-
Map<String, DataSource> result = filterDisabledDataSources(databaseName, dataSources);
126-
checkForceConnection(result);
127-
return result;
134+
return filterDisabledDataSources(databaseName, dataSources);
128135
}
129136

130137
private Map<String, DataSource> filterDisabledDataSources(final String databaseName, final Map<String, DataSource> dataSources) {

infra/common/src/main/java/com/sphereex/dbplusengine/infra/state/datasource/exception/UnavailableDataSourceException.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717

1818
package com.sphereex.dbplusengine.infra.state.datasource.exception;
1919

20-
import com.sphereex.dbplusengine.SphereEx;
2120
import org.apache.shardingsphere.infra.exception.core.external.server.ShardingSphereServerException;
2221

23-
import java.sql.SQLException;
24-
2522
/**
2623
* Data source state exception.
2724
*/
@@ -33,12 +30,7 @@ public final class UnavailableDataSourceException extends ShardingSphereServerEx
3330

3431
private static final int ERROR_CODE = 1;
3532

36-
@SphereEx
37-
public UnavailableDataSourceException(final String dataSourceName) {
38-
super(ERROR_CATEGORY, ERROR_CODE, String.format("Data source '%s' is unavailable.", dataSourceName));
39-
}
40-
41-
public UnavailableDataSourceException(final String dataSourceName, final SQLException cause) {
42-
super(ERROR_CATEGORY, ERROR_CODE, String.format("Data source '%s' is unavailable.", dataSourceName), cause);
33+
public UnavailableDataSourceException(final String databaseName, final String dataSourceName, final Exception cause) {
34+
super(ERROR_CATEGORY, ERROR_CODE, String.format("Data source '%s' in database '%s' is unavailable.", dataSourceName, databaseName), cause);
4335
}
4436
}

0 commit comments

Comments
 (0)