Skip to content

Commit f00b037

Browse files
authored
Merge pull request #22 from sharding-sphere/dev
update from origin
2 parents 0603664 + 71af502 commit f00b037

File tree

11 files changed

+278
-57
lines changed

11 files changed

+278
-57
lines changed
+12-11
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,24 @@
1515
* </p>
1616
*/
1717

18-
package io.shardingsphere.opentracing.config;
19-
20-
import com.google.common.base.Optional;
18+
package io.shardingsphere.core.executor;
2119

2220
/**
23-
* Parse config.
21+
* Sharding execute callback.
22+
*
23+
* @author zhangliang
2424
*
25-
* @author gaohongtao
26-
* @author wangkai
25+
* @param <I> type of input value
26+
* @param <O> type of output value
2727
*/
28-
interface ConfigurationParser {
28+
public interface ShardingExecuteCallback<I, O> {
2929

3030
/**
31-
* Parse config item to config value.
31+
* Execute callback.
3232
*
33-
* @param configItem config item
34-
* @return config value
33+
* @param input input value
34+
* @return execute result
35+
* @throws Exception throw when execute failure
3536
*/
36-
Optional<String> parse(String configItem);
37+
O execute(I input) throws Exception;
3738
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2016-2018 shardingsphere.io.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
* </p>
16+
*/
17+
18+
package io.shardingsphere.core.executor;
19+
20+
import com.google.common.collect.Lists;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.google.common.util.concurrent.ListeningExecutorService;
23+
import com.google.common.util.concurrent.MoreExecutors;
24+
import lombok.Getter;
25+
26+
import java.util.ArrayList;
27+
import java.util.Collection;
28+
import java.util.Collections;
29+
import java.util.Iterator;
30+
import java.util.LinkedList;
31+
import java.util.List;
32+
import java.util.concurrent.Callable;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.TimeUnit;
37+
38+
/**
39+
* Sharding execute engine.
40+
*
41+
* @author zhangliang
42+
*/
43+
public final class ShardingExecuteEngine implements AutoCloseable {
44+
45+
private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingThreadFactoryBuilder.build("Executor-Engine-Closer"));
46+
47+
@Getter
48+
private final ListeningExecutorService executorService;
49+
50+
public ShardingExecuteEngine(final int executorSize) {
51+
executorService = MoreExecutors.listeningDecorator(
52+
0 == executorSize ? Executors.newCachedThreadPool(ShardingThreadFactoryBuilder.build()) : Executors.newFixedThreadPool(executorSize, ShardingThreadFactoryBuilder.build()));
53+
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
54+
}
55+
56+
/**
57+
* Execute all callbacks.
58+
*
59+
* @param inputs sharding execute callbacks
60+
* @param callback sharding execute callback
61+
* @param <I> type of input value
62+
* @param <O> type of return value
63+
* @return execute result
64+
* @throws Exception throw if execute failure
65+
*/
66+
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
67+
if (inputs.isEmpty()) {
68+
return Collections.emptyList();
69+
}
70+
Iterator<I> inputIterator = inputs.iterator();
71+
I firstInput = inputIterator.next();
72+
Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
73+
return getResults(callback.execute(firstInput), restFutures);
74+
}
75+
76+
private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
77+
Collection<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
78+
for (final I each : inputs) {
79+
result.add(executorService.submit(new Callable<O>() {
80+
81+
@Override
82+
public O call() throws Exception {
83+
return callback.execute(each);
84+
}
85+
}));
86+
}
87+
return result;
88+
}
89+
90+
private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
91+
List<O> result = new LinkedList<>();
92+
result.add(firstResult);
93+
for (ListenableFuture<O> each : restFutures) {
94+
result.add(each.get());
95+
}
96+
return result;
97+
}
98+
99+
@Override
100+
public void close() {
101+
SHUTDOWN_EXECUTOR.execute(new Runnable() {
102+
103+
@Override
104+
public void run() {
105+
try {
106+
executorService.shutdown();
107+
while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
108+
executorService.shutdownNow();
109+
}
110+
} catch (final InterruptedException ex) {
111+
Thread.currentThread().interrupt();
112+
}
113+
}
114+
});
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2016-2018 shardingsphere.io.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
* </p>
16+
*/
17+
18+
package io.shardingsphere.core.executor;
19+
20+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21+
import lombok.AccessLevel;
22+
import lombok.NoArgsConstructor;
23+
24+
import java.util.concurrent.ThreadFactory;
25+
26+
/**
27+
* Sharding thread factory builder.
28+
*
29+
* @author zhangliang
30+
*/
31+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
32+
public final class ShardingThreadFactoryBuilder {
33+
34+
private static final String NAME_FORMAT_PREFIX = "Sharding-Sphere-";
35+
36+
private static final String DEFAULT_EXECUTOR_NAME_FORMAT = NAME_FORMAT_PREFIX + "%d";
37+
38+
/**
39+
* Build default sharding thread factory.
40+
*
41+
* @return default sharding thread factory
42+
*/
43+
public static ThreadFactory build() {
44+
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(DEFAULT_EXECUTOR_NAME_FORMAT).build();
45+
}
46+
47+
/**
48+
* Build sharding thread factory.
49+
*
50+
* @param nameFormat thread name format
51+
* @return sharding thread factory
52+
*/
53+
public static ThreadFactory build(final String nameFormat) {
54+
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(NAME_FORMAT_PREFIX + nameFormat).build();
55+
}
56+
}

sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecuteCallback.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package io.shardingsphere.core.executor;
1919

20+
import io.shardingsphere.core.constant.SQLType;
21+
2022
/**
2123
* Statement execute callback interface.
2224
*
@@ -25,14 +27,12 @@
2527
*
2628
* @param <T> class type of return value
2729
*/
28-
public interface ExecuteCallback<T> {
30+
public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatementUnit, T> {
2931

3032
/**
31-
* Execute task.
33+
* Get SQL type.
3234
*
33-
* @param baseStatementUnit statement execute unit
34-
* @return execute result
35-
* @throws Exception execute exception
35+
* @return SQL type
3636
*/
37-
T execute(BaseStatementUnit baseStatementUnit) throws Exception;
37+
SQLType getSQLType();
3838
}

sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecutorEngine.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.common.eventbus.EventBus;
2121
import com.google.common.util.concurrent.ListeningExecutorService;
2222
import com.google.common.util.concurrent.MoreExecutors;
23-
import io.shardingsphere.core.constant.SQLType;
2423
import io.shardingsphere.core.event.ShardingEventBusInstance;
2524
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
2625
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
@@ -68,22 +67,20 @@ public ExecutorEngine(final int executorSize) {
6867
/**
6968
* Execute.
7069
*
71-
* @param sqlType SQL type
7270
* @param baseStatementUnits statement execute unitS
7371
* @param executeCallback prepared statement execute callback
7472
* @param <T> class type of return value
7573
* @return execute result
7674
* @throws SQLException SQL exception
7775
*/
78-
public <T> List<T> execute(
79-
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
76+
public <T> List<T> execute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
8077
if (baseStatementUnits.isEmpty()) {
8178
return Collections.emptyList();
8279
}
83-
OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size() > 1);
80+
OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSQLType(), baseStatementUnits.size() > 1);
8481
shardingEventBus.post(event);
8582
try {
86-
List<T> result = getExecuteResults(sqlType, baseStatementUnits, executeCallback);
83+
List<T> result = getExecuteResults(baseStatementUnits, executeCallback);
8784
event.setExecuteSuccess();
8885
return result;
8986
// CHECKSTYLE:OFF
@@ -97,16 +94,16 @@ public <T> List<T> execute(
9794
}
9895
}
9996

100-
protected abstract <T> List<T> getExecuteResults(SQLType sqlType, Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
97+
protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
10198

102-
protected final <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit,
103-
final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
99+
protected final <T> T executeInternal(
100+
final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
104101
T result;
105102
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
106103
ExecutorDataMap.setDataMap(dataMap);
107104
List<SQLExecutionEvent> events = new LinkedList<>();
108105
for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
109-
SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, baseStatementUnit, each);
106+
SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(executeCallback.getSQLType(), baseStatementUnit, each);
110107
events.add(event);
111108
shardingEventBus.post(event);
112109
}

sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/batch/BatchPreparedStatementExecutor.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,17 @@ public final class BatchPreparedStatementExecutor {
5555
* @throws SQLException SQL exception
5656
*/
5757
public int[] executeBatch() throws SQLException {
58-
return accumulate(executorEngine.execute(sqlType, batchPreparedStatementUnits, new ExecuteCallback<int[]>() {
58+
return accumulate(executorEngine.execute(batchPreparedStatementUnits, new ExecuteCallback<int[]>() {
5959

6060
@Override
6161
public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
6262
return baseStatementUnit.getStatement().executeBatch();
6363
}
64+
65+
@Override
66+
public SQLType getSQLType() {
67+
return sqlType;
68+
}
6469
}));
6570
}
6671

sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/connection/ConnectionStrictlyExecutorEngine.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package io.shardingsphere.core.executor.type.connection;
1919

2020
import com.google.common.util.concurrent.ListenableFuture;
21-
import io.shardingsphere.core.constant.SQLType;
2221
import io.shardingsphere.core.executor.BaseStatementUnit;
2322
import io.shardingsphere.core.executor.ExecuteCallback;
2423
import io.shardingsphere.core.executor.ExecutorEngine;
@@ -46,10 +45,10 @@ public ConnectionStrictlyExecutorEngine(final int executorSize) {
4645
}
4746

4847
@Override
49-
protected <T> List<T> getExecuteResults(final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
48+
protected <T> List<T> getExecuteResults(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
5049
Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups = getBaseStatementUnitGroups(baseStatementUnits);
51-
Collection<T> firstOutputs = syncExecute(sqlType, baseStatementUnitGroups.remove(baseStatementUnitGroups.keySet().iterator().next()), executeCallback);
52-
Collection<ListenableFuture<Collection<T>>> restResultFutures = asyncExecute(sqlType, baseStatementUnitGroups, executeCallback);
50+
Collection<T> firstOutputs = syncExecute(baseStatementUnitGroups.remove(baseStatementUnitGroups.keySet().iterator().next()), executeCallback);
51+
Collection<ListenableFuture<Collection<T>>> restResultFutures = asyncExecute(baseStatementUnitGroups, executeCallback);
5352
return getResultList(firstOutputs, restResultFutures);
5453
}
5554

@@ -65,8 +64,7 @@ private Map<String, Collection<BaseStatementUnit>> getBaseStatementUnitGroups(fi
6564
return result;
6665
}
6766

68-
private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(
69-
final SQLType sqlType, final Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups, final ExecuteCallback<T> executeCallback) {
67+
private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(final Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups, final ExecuteCallback<T> executeCallback) {
7068
Collection<ListenableFuture<Collection<T>>> result = new ArrayList<>(baseStatementUnitGroups.size());
7169
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
7270
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
@@ -77,7 +75,7 @@ private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(
7775
public Collection<T> call() throws Exception {
7876
Collection<T> result = new LinkedList<>();
7977
for (BaseStatementUnit each : baseStatementUnits) {
80-
result.add(executeInternal(sqlType, each, executeCallback, isExceptionThrown, dataMap));
78+
result.add(executeInternal(each, executeCallback, isExceptionThrown, dataMap));
8179
}
8280
return result;
8381
}
@@ -86,10 +84,10 @@ public Collection<T> call() throws Exception {
8684
return result;
8785
}
8886

89-
private <T> Collection<T> syncExecute(final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
87+
private <T> Collection<T> syncExecute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
9088
Collection<T> result = new LinkedList<>();
9189
for (BaseStatementUnit each : baseStatementUnits) {
92-
result.add(executeInternal(sqlType, each, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap()));
90+
result.add(executeInternal(each, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap()));
9391
}
9492
return result;
9593
}

0 commit comments

Comments
 (0)