Skip to content

Commit 2b0af72

Browse files
authored
Merge pull request #24 from sharding-sphere/dev
update from origin
2 parents 548fc0c + 81abe77 commit 2b0af72

File tree

45 files changed

+694
-1049
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+694
-1049
lines changed

sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteEngine.java

+98-4
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import com.google.common.util.concurrent.ListenableFuture;
2222
import com.google.common.util.concurrent.ListeningExecutorService;
2323
import com.google.common.util.concurrent.MoreExecutors;
24-
import lombok.Getter;
2524

2625
import java.util.ArrayList;
2726
import java.util.Collection;
2827
import java.util.Collections;
2928
import java.util.Iterator;
3029
import java.util.LinkedList;
3130
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Map.Entry;
3233
import java.util.concurrent.Callable;
3334
import java.util.concurrent.ExecutionException;
3435
import java.util.concurrent.ExecutorService;
@@ -44,7 +45,6 @@ public final class ShardingExecuteEngine implements AutoCloseable {
4445

4546
private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingThreadFactoryBuilder.build("Executor-Engine-Closer"));
4647

47-
@Getter
4848
private final ListeningExecutorService executorService;
4949

5050
public ShardingExecuteEngine(final int executorSize) {
@@ -56,7 +56,7 @@ public ShardingExecuteEngine(final int executorSize) {
5656
/**
5757
* Execute all callbacks.
5858
*
59-
* @param inputs sharding execute callbacks
59+
* @param inputs input values
6060
* @param callback sharding execute callback
6161
* @param <I> type of input value
6262
* @param <O> type of return value
@@ -70,7 +70,28 @@ public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteC
7070
Iterator<I> inputIterator = inputs.iterator();
7171
I firstInput = inputIterator.next();
7272
Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
73-
return getResults(callback.execute(firstInput), restFutures);
73+
return getResults(syncExecute(firstInput, callback), restFutures);
74+
}
75+
76+
/**
77+
* Execute all callbacks.
78+
*
79+
* @param inputs input values
80+
* @param firstCallback first sharding execute callback
81+
* @param callback sharding execute callback
82+
* @param <I> type of input value
83+
* @param <O> type of return value
84+
* @return execute result
85+
* @throws Exception throw if execute failure
86+
*/
87+
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws Exception {
88+
if (inputs.isEmpty()) {
89+
return Collections.emptyList();
90+
}
91+
Iterator<I> inputIterator = inputs.iterator();
92+
I firstInput = inputIterator.next();
93+
Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
94+
return getResults(syncExecute(firstInput, firstCallback), restFutures);
7495
}
7596

7697
private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
@@ -87,6 +108,10 @@ public O call() throws Exception {
87108
return result;
88109
}
89110

111+
private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws Exception {
112+
return callback.execute(input);
113+
}
114+
90115
private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
91116
List<O> result = new LinkedList<>();
92117
result.add(firstResult);
@@ -96,6 +121,75 @@ private <O> List<O> getResults(final O firstResult, final Collection<ListenableF
96121
return result;
97122
}
98123

124+
/**
125+
* execute all callbacks for group.
126+
*
127+
* @param inputs input value's map
128+
* @param callback sharding execute callback
129+
* @param <I> type of input value
130+
* @param <O> type of return value
131+
* @return execute result
132+
* @throws Exception throw if execute failure
133+
*/
134+
public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
135+
if (inputs.isEmpty()) {
136+
return Collections.emptyList();
137+
}
138+
String firstKey = inputs.keySet().iterator().next();
139+
Collection<I> firstInputs = inputs.remove(firstKey);
140+
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
141+
return getGroupResults(syncGroupExecute(firstKey, firstInputs, callback), restResultFutures);
142+
}
143+
144+
/**
145+
* execute all callbacks for group.
146+
*
147+
* @param inputs input value's map
148+
* @param callback sharding execute callback
149+
* @param firstCallback first sharding execute callback
150+
* @param <I> type of input value
151+
* @param <O> type of return value
152+
* @return execute result
153+
* @throws Exception throw if execute failure
154+
*/
155+
public <I, O> List<O> groupExecute(
156+
final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
157+
if (inputs.isEmpty()) {
158+
return Collections.emptyList();
159+
}
160+
String firstKey = inputs.keySet().iterator().next();
161+
Collection<I> firstInputs = inputs.remove(firstKey);
162+
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
163+
return getGroupResults(syncGroupExecute(firstKey, firstInputs, firstCallback), restResultFutures);
164+
}
165+
166+
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
167+
Collection<ListenableFuture<Collection<O>>> result = new ArrayList<>(inputs.size());
168+
for (final Entry<String, Collection<I>> entry : inputs.entrySet()) {
169+
result.add(executorService.submit(new Callable<Collection<O>>() {
170+
171+
@Override
172+
public Collection<O> call() throws Exception {
173+
return callback.execute(entry.getKey(), entry.getValue());
174+
}
175+
}));
176+
}
177+
return result;
178+
}
179+
180+
private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
181+
return callback.execute(dataSourceName, inputs);
182+
}
183+
184+
private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
185+
List<O> result = new LinkedList<>();
186+
result.addAll(firstResults);
187+
for (ListenableFuture<Collection<O>> each : restFutures) {
188+
result.addAll(each.get());
189+
}
190+
return result;
191+
}
192+
99193
@Override
100194
public void close() {
101195
SHUTDOWN_EXECUTOR.execute(new Runnable() {

sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecuteCallback.java renamed to sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingGroupExecuteCallback.java

+12-25
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,25 @@
1717

1818
package io.shardingsphere.core.executor;
1919

20-
import io.shardingsphere.core.constant.SQLType;
21-
22-
import java.util.Map;
20+
import java.util.Collection;
2321

2422
/**
25-
* Statement execute callback interface.
26-
*
27-
* @author gaohongtao
23+
* Sharding group execute callback.
24+
*
2825
* @author zhangliang
2926
*
30-
* @param <T> class type of return value
27+
* @param <I> type of input value
28+
* @param <O> type of output value
3129
*/
32-
public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatementUnit, T> {
33-
34-
/**
35-
* Get SQL type.
36-
*
37-
* @return SQL type
38-
*/
39-
SQLType getSQLType();
40-
41-
/**
42-
* Judge is exception thrown or not.
43-
*
44-
* @return is exception thrown or not
45-
*/
46-
boolean isExceptionThrown();
30+
public interface ShardingGroupExecuteCallback<I, O> {
4731

4832
/**
49-
* Get data map.
33+
* Execute callback.
5034
*
51-
* @return data map
35+
* @param key input key
36+
* @param values input values
37+
* @return execute result
38+
* @throws Exception throw when execute failure
5239
*/
53-
Map<String, Object> getDataMap();
40+
Collection<O> execute(String key, Collection<I> values) throws Exception;
5441
}

sharding-core/src/main/java/io/shardingsphere/core/metadata/ShardingMetaData.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package io.shardingsphere.core.metadata;
1919

20-
import com.google.common.util.concurrent.MoreExecutors;
2120
import io.shardingsphere.core.constant.DatabaseType;
21+
import io.shardingsphere.core.executor.ShardingExecuteEngine;
2222
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
2323
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
2424
import io.shardingsphere.core.metadata.table.executor.TableMetaDataConnectionManager;
@@ -27,7 +27,6 @@
2727
import lombok.Getter;
2828

2929
import java.util.Map;
30-
import java.util.concurrent.ExecutorService;
3130

3231
/**
3332
* Sharding meta data.
@@ -41,9 +40,9 @@ public final class ShardingMetaData {
4140

4241
private final ShardingTableMetaData table;
4342

44-
public ShardingMetaData(final Map<String, String> dataSourceURLs, final ShardingRule shardingRule,
45-
final DatabaseType databaseType, final ExecutorService executorService, final TableMetaDataConnectionManager connectionManager) {
43+
public ShardingMetaData(final Map<String, String> dataSourceURLs, final ShardingRule shardingRule,
44+
final DatabaseType databaseType, final ShardingExecuteEngine shardingExecuteEngine, final TableMetaDataConnectionManager connectionManager) {
4645
dataSource = new ShardingDataSourceMetaData(dataSourceURLs, shardingRule, databaseType);
47-
table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, MoreExecutors.listeningDecorator(executorService), connectionManager).load(shardingRule));
46+
table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, shardingExecuteEngine, connectionManager).load(shardingRule));
4847
}
4948
}

sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataInitializer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package io.shardingsphere.core.metadata.table.executor;
1919

2020
import com.google.common.base.Optional;
21-
import com.google.common.util.concurrent.ListeningExecutorService;
2221
import io.shardingsphere.core.exception.ShardingException;
22+
import io.shardingsphere.core.executor.ShardingExecuteEngine;
2323
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
2424
import io.shardingsphere.core.metadata.table.TableMetaData;
2525
import io.shardingsphere.core.rule.ShardingRule;
@@ -45,9 +45,9 @@ public final class TableMetaDataInitializer {
4545
private final TableMetaDataLoader tableMetaDataLoader;
4646

4747
public TableMetaDataInitializer(
48-
final ShardingDataSourceMetaData shardingDataSourceMetaData, final ListeningExecutorService executorService, final TableMetaDataConnectionManager connectionManager) {
48+
final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine shardingExecuteEngine, final TableMetaDataConnectionManager connectionManager) {
4949
this.connectionManager = connectionManager;
50-
tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executorService, connectionManager);
50+
tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, shardingExecuteEngine, connectionManager);
5151
}
5252

5353
/**

sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataLoader.java

+11-25
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package io.shardingsphere.core.metadata.table.executor;
1919

20-
import com.google.common.util.concurrent.Futures;
21-
import com.google.common.util.concurrent.ListenableFuture;
22-
import com.google.common.util.concurrent.ListeningExecutorService;
2320
import io.shardingsphere.core.exception.ShardingException;
21+
import io.shardingsphere.core.executor.ShardingExecuteEngine;
22+
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
2423
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
2524
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
2625
import io.shardingsphere.core.metadata.table.ColumnMetaData;
@@ -38,9 +37,6 @@
3837
import java.util.LinkedList;
3938
import java.util.List;
4039
import java.util.Map;
41-
import java.util.Map.Entry;
42-
import java.util.concurrent.Callable;
43-
import java.util.concurrent.ExecutionException;
4440

4541
/**
4642
* Table meta data loader.
@@ -52,7 +48,7 @@ public final class TableMetaDataLoader {
5248

5349
private final ShardingDataSourceMetaData shardingDataSourceMetaData;
5450

55-
private final ListeningExecutorService executorService;
51+
private final ShardingExecuteEngine shardingExecuteEngine;
5652

5753
private final TableMetaDataConnectionManager connectionManager;
5854

@@ -70,27 +66,17 @@ public TableMetaData load(final String logicTableName, final ShardingRule shardi
7066
}
7167

7268
private List<TableMetaData> load(final Map<String, Collection<String>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) {
73-
List<ListenableFuture<Collection<TableMetaData>>> futures = new LinkedList<>();
74-
for (Entry<String, Collection<String>> entry : dataNodeGroups.entrySet()) {
75-
final String dataSourceName = shardingDataSourceNames.getRawMasterDataSourceName(entry.getKey());
76-
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(entry.getKey());
77-
final String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
78-
final Collection<String> actualTableNames = entry.getValue();
79-
futures.add(executorService.submit(new Callable<Collection<TableMetaData>>() {
69+
try {
70+
return shardingExecuteEngine.groupExecute(dataNodeGroups, new ShardingGroupExecuteCallback<String, TableMetaData>() {
8071

8172
@Override
82-
public Collection<TableMetaData> call() throws SQLException {
83-
return load(dataSourceName, catalog, actualTableNames);
73+
public Collection<TableMetaData> execute(final String dataSourceName, final Collection<String> actualTableNames) throws SQLException {
74+
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
75+
final String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
76+
return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, actualTableNames);
8477
}
85-
}));
86-
}
87-
List<TableMetaData> result = new LinkedList<>();
88-
try {
89-
for (Collection<TableMetaData> each : Futures.allAsList(futures).get()) {
90-
result.addAll(each);
91-
}
92-
return result;
93-
} catch (final InterruptedException | ExecutionException ex) {
78+
});
79+
} catch (final Exception ex) {
9480
throw new ShardingException(ex);
9581
}
9682
}

0 commit comments

Comments
 (0)