Skip to content

Commit 95a61b2

Browse files
hanahmilyxydonne
authored andcommitted
fixed apache#199 Only sharding table conditions, reuse PreparedStatement object cause data routing errors
1 parent 8587315 commit 95a61b2

File tree

8 files changed

+220
-68
lines changed

8 files changed

+220
-68
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 1999-2015 dangdang.com.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
* </p>
16+
*/
17+
18+
package com.dangdang.ddframe.rdb.sharding.jdbc;
19+
20+
import java.sql.Connection;
21+
import java.sql.PreparedStatement;
22+
import java.sql.SQLException;
23+
import java.util.Objects;
24+
25+
/**
26+
* 在PreparedStatement复用的时候,用Connection对象和SQL来区分同一个库的会话.
27+
*
28+
* @author gaohongtao
29+
*/
30+
final class BackendPreparedStatementWrapper extends BackendStatementWrapper {
31+
32+
private final String sql;
33+
34+
BackendPreparedStatementWrapper(final PreparedStatement statement, final String sql) throws SQLException {
35+
super(statement);
36+
this.sql = sql;
37+
}
38+
39+
@Override
40+
boolean isBelongTo(final Connection connection, final String sql) {
41+
return super.isBelongTo(connection, sql) && Objects.equals(sql, this.sql);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 1999-2015 dangdang.com.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
* </p>
16+
*/
17+
18+
package com.dangdang.ddframe.rdb.sharding.jdbc;
19+
20+
import lombok.AccessLevel;
21+
import lombok.Getter;
22+
23+
import java.sql.Connection;
24+
import java.sql.SQLException;
25+
import java.sql.Statement;
26+
import java.util.Objects;
27+
28+
/**
29+
* Statement对象包装类.
30+
* 在Statement复用的时候,用Connection对象来区分同一个库的会话
31+
*
32+
* @author gaohongtao
33+
*/
34+
class BackendStatementWrapper {
35+
36+
private final Connection connection;
37+
38+
@Getter(AccessLevel.PACKAGE)
39+
private final Statement statement;
40+
41+
BackendStatementWrapper(final Statement statement) throws SQLException {
42+
this.statement = statement;
43+
this.connection = statement.getConnection();
44+
}
45+
46+
boolean isBelongTo(final Connection connection, final String sql) {
47+
return Objects.equals(connection, this.connection);
48+
}
49+
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -173,22 +173,22 @@ private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
173173
return result;
174174
}
175175

176-
protected PreparedStatement generateStatement(final Connection conn, final String shardingSql) throws SQLException {
176+
protected BackendStatementWrapper generateStatement(final Connection conn, final String shardingSql) throws SQLException {
177177
if (null != autoGeneratedKeys) {
178178
getGeneratedKeyContext().setAutoGeneratedKeys(autoGeneratedKeys);
179-
return conn.prepareStatement(shardingSql, autoGeneratedKeys);
179+
return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, autoGeneratedKeys), shardingSql);
180180
}
181181
if (null != columnIndexes) {
182182
getGeneratedKeyContext().setColumnIndexes(columnIndexes);
183-
return conn.prepareStatement(shardingSql, columnIndexes);
183+
return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, columnIndexes), shardingSql);
184184
}
185185
if (null != columnNames) {
186186
getGeneratedKeyContext().setColumnNames(columnNames);
187-
return conn.prepareStatement(shardingSql, columnNames);
187+
return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, columnNames), shardingSql);
188188
}
189189
if (0 != getResultSetHoldability()) {
190-
return conn.prepareStatement(shardingSql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
190+
return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), shardingSql);
191191
}
192-
return conn.prepareStatement(shardingSql, getResultSetType(), getResultSetConcurrency());
192+
return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, getResultSetType(), getResultSetConcurrency()), shardingSql);
193193
}
194194
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java

+21-12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
2626
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
2727
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult;
28+
import com.google.common.base.Function;
2829
import com.google.common.collect.Iterators;
2930
import com.google.common.collect.Lists;
3031
import com.google.common.collect.Table;
@@ -53,6 +54,13 @@
5354
*/
5455
public class ShardingStatement extends AbstractStatementAdapter {
5556

57+
private static final Function<BackendStatementWrapper, Statement> TRANSFORM_FUNCTION = new Function<BackendStatementWrapper, Statement>() {
58+
@Override
59+
public Statement apply(final BackendStatementWrapper input) {
60+
return input.getStatement();
61+
}
62+
};
63+
5664
@Getter(AccessLevel.PROTECTED)
5765
private final ShardingConnection shardingConnection;
5866

@@ -65,7 +73,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
6573
@Getter
6674
private final int resultSetHoldability;
6775

68-
private final Deque<List<Statement>> cachedRoutedStatements = Lists.newLinkedList();
76+
private final Deque<List<BackendStatementWrapper>> cachedRoutedStatements = Lists.newLinkedList();
6977

7078
@Getter(AccessLevel.PROTECTED)
7179
@Setter(AccessLevel.PROTECTED)
@@ -94,8 +102,8 @@ public ShardingStatement(final ShardingConnection shardingConnection, final int
94102
this.resultSetType = resultSetType;
95103
this.resultSetConcurrency = resultSetConcurrency;
96104
this.resultSetHoldability = resultSetHoldability;
97-
cachedRoutedStatements.add(new LinkedList<Statement>());
98-
cachedRoutedStatements.add(new LinkedList<Statement>());
105+
cachedRoutedStatements.add(new LinkedList<BackendStatementWrapper>());
106+
cachedRoutedStatements.add(new LinkedList<BackendStatementWrapper>());
99107
}
100108

101109
@Override
@@ -258,7 +266,7 @@ private Table<Integer, Integer, Object> subTable(final int[] columnIndexes) {
258266

259267
protected void clearRouteContext() throws SQLException {
260268
setCurrentResultSet(null);
261-
List<Statement> firstList = cachedRoutedStatements.pollFirst();
269+
List<BackendStatementWrapper> firstList = cachedRoutedStatements.pollFirst();
262270
cachedRoutedStatements.getFirst().addAll(firstList);
263271
firstList.clear();
264272
cachedRoutedStatements.addLast(firstList);
@@ -279,10 +287,10 @@ private StatementExecutor generateExecutor(final String sql) throws SQLException
279287
}
280288

281289
protected Statement getStatement(final Connection connection, final String sql) throws SQLException {
282-
Statement statement = null;
283-
for (Iterator<Statement> iterator = cachedRoutedStatements.getFirst().iterator(); iterator.hasNext();) {
284-
Statement each = iterator.next();
285-
if (each.getConnection() == connection) {
290+
BackendStatementWrapper statement = null;
291+
for (Iterator<BackendStatementWrapper> iterator = cachedRoutedStatements.getFirst().iterator(); iterator.hasNext();) {
292+
BackendStatementWrapper each = iterator.next();
293+
if (each.isBelongTo(connection, sql)) {
286294
statement = each;
287295
iterator.remove();
288296
}
@@ -291,17 +299,17 @@ protected Statement getStatement(final Connection connection, final String sql)
291299
statement = generateStatement(connection, sql);
292300
}
293301
cachedRoutedStatements.getLast().add(statement);
294-
return statement;
302+
return statement.getStatement();
295303
}
296304

297-
protected Statement generateStatement(final Connection connection, final String sql) throws SQLException {
305+
protected BackendStatementWrapper generateStatement(final Connection connection, final String sql) throws SQLException {
298306
Statement result;
299307
if (0 == resultSetHoldability) {
300308
result = connection.createStatement(resultSetType, resultSetConcurrency);
301309
} else {
302310
result = connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
303311
}
304-
return result;
312+
return new BackendStatementWrapper(result);
305313
}
306314

307315
@Override
@@ -329,6 +337,7 @@ protected void clearRouteStatements() {
329337

330338
@Override
331339
public Collection<? extends Statement> getRoutedStatements() {
332-
return Lists.newArrayList(Iterators.concat(cachedRoutedStatements.getFirst().iterator(), cachedRoutedStatements.getLast().iterator()));
340+
return Lists.newArrayList(Iterators.concat(Iterators.transform(cachedRoutedStatements.getFirst().iterator(), TRANSFORM_FUNCTION),
341+
Iterators.transform(cachedRoutedStatements.getLast().iterator(), TRANSFORM_FUNCTION)));
333342
}
334343
}

sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/integrate/tbl/pstatement/ShardingTablesOnlyForPStatementWithDMLTest.java

+35-36
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,19 @@
1717

1818
package com.dangdang.ddframe.rdb.integrate.tbl.pstatement;
1919

20-
import static org.hamcrest.CoreMatchers.is;
21-
import static org.junit.Assert.assertThat;
22-
23-
import java.sql.Connection;
24-
import java.sql.PreparedStatement;
25-
import java.sql.SQLException;
26-
2720
import com.dangdang.ddframe.rdb.integrate.tbl.AbstractShardingTablesOnlyDBUnitTest;
21+
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
2822
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
2923
import org.dbunit.DatabaseUnitException;
3024
import org.junit.Before;
3125
import org.junit.Test;
3226

33-
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
27+
import java.sql.Connection;
28+
import java.sql.PreparedStatement;
29+
import java.sql.SQLException;
30+
31+
import static org.hamcrest.CoreMatchers.is;
32+
import static org.junit.Assert.assertThat;
3433

3534
public final class ShardingTablesOnlyForPStatementWithDMLTest extends AbstractShardingTablesOnlyDBUnitTest {
3635

@@ -44,9 +43,9 @@ public void init() throws SQLException {
4443
@Test
4544
public void assertInsert() throws SQLException, DatabaseUnitException {
4645
String sql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (?, ?, ?)";
47-
for (int i = 1; i <= 10; i++) {
48-
try (Connection connection = shardingDataSource.getConnection()) {
49-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
46+
try (Connection connection = shardingDataSource.getConnection();
47+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
48+
for (int i = 1; i <= 10; i++) {
5049
preparedStatement.setInt(1, i);
5150
preparedStatement.setInt(2, i);
5251
preparedStatement.setString(3, "insert");
@@ -59,9 +58,9 @@ public void assertInsert() throws SQLException, DatabaseUnitException {
5958
@Test
6059
public void assertInsertWithAllPlaceholders() throws SQLException, DatabaseUnitException {
6160
String sql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (?, ?, ?)";
62-
for (int i = 1; i <= 10; i++) {
63-
try (Connection connection = shardingDataSource.getConnection()) {
64-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
61+
try (Connection connection = shardingDataSource.getConnection();
62+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
63+
for (int i = 1; i <= 10; i++) {
6564
preparedStatement.setInt(1, i);
6665
preparedStatement.setInt(2, i);
6766
preparedStatement.setString(3, "insert");
@@ -75,8 +74,8 @@ public void assertInsertWithAllPlaceholders() throws SQLException, DatabaseUnitE
7574
public void assertInsertWithoutPlaceholder() throws SQLException, DatabaseUnitException {
7675
String sql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (%s, %s, 'insert')";
7776
for (int i = 1; i <= 10; i++) {
78-
try (Connection connection = shardingDataSource.getConnection()) {
79-
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i));
77+
try (Connection connection = shardingDataSource.getConnection();
78+
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
8079
preparedStatement.executeUpdate();
8180
}
8281
}
@@ -87,8 +86,8 @@ public void assertInsertWithoutPlaceholder() throws SQLException, DatabaseUnitEx
8786
public void assertInsertWithPlaceholdersForShardingKeys() throws SQLException, DatabaseUnitException {
8887
String sql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (%s, %s, ?)";
8988
for (int i = 1; i <= 10; i++) {
90-
try (Connection connection = shardingDataSource.getConnection()) {
91-
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i));
89+
try (Connection connection = shardingDataSource.getConnection();
90+
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
9291
preparedStatement.setString(1, "insert");
9392
preparedStatement.executeUpdate();
9493
}
@@ -100,8 +99,8 @@ public void assertInsertWithPlaceholdersForShardingKeys() throws SQLException, D
10099
public void assertInsertWithPlaceholdersForNotShardingKeys() throws SQLException, DatabaseUnitException {
101100
String sql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (%s, %s, ?)";
102101
for (int i = 1; i <= 10; i++) {
103-
try (Connection connection = shardingDataSource.getConnection()) {
104-
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i));
102+
try (Connection connection = shardingDataSource.getConnection();
103+
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
105104
preparedStatement.setString(1, "insert");
106105
preparedStatement.executeUpdate();
107106
}
@@ -113,10 +112,10 @@ public void assertInsertWithPlaceholdersForNotShardingKeys() throws SQLException
113112
public void assertUpdateWithoutAlias() throws SQLException, DatabaseUnitException {
114113
ShardingDataSource shardingDataSource = getShardingDataSource();
115114
String sql = "UPDATE `t_order` SET `status` = ? WHERE `order_id` = ? AND `user_id` = ?";
116-
for (int i = 10; i < 12; i++) {
117-
for (int j = 0; j < 10; j++) {
118-
try (Connection connection = shardingDataSource.getConnection()) {
119-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
115+
try (Connection connection = shardingDataSource.getConnection();
116+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
117+
for (int i = 10; i < 12; i++) {
118+
for (int j = 0; j < 10; j++) {
120119
preparedStatement.setString(1, "updated");
121120
preparedStatement.setInt(2, i * 100 + j);
122121
preparedStatement.setInt(3, i);
@@ -131,10 +130,10 @@ public void assertUpdateWithoutAlias() throws SQLException, DatabaseUnitExceptio
131130
public void assertUpdateWithAlias() throws SQLException, DatabaseUnitException {
132131
ShardingDataSource shardingDataSource = getShardingDataSource();
133132
String sql = "UPDATE `t_order` as o SET o.`status` = ? WHERE o.`order_id` = ? AND o.`user_id` = ?";
134-
for (int i = 10; i < 12; i++) {
135-
for (int j = 0; j < 10; j++) {
136-
try (Connection connection = shardingDataSource.getConnection()) {
137-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
133+
try (Connection connection = shardingDataSource.getConnection();
134+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
135+
for (int i = 10; i < 12; i++) {
136+
for (int j = 0; j < 10; j++) {
138137
preparedStatement.setString(1, "updated");
139138
preparedStatement.setInt(2, i * 100 + j);
140139
preparedStatement.setInt(3, i);
@@ -149,8 +148,8 @@ public void assertUpdateWithAlias() throws SQLException, DatabaseUnitException {
149148
public void assertUpdateWithoutShardingValue() throws SQLException, DatabaseUnitException {
150149
ShardingDataSource shardingDataSource = getShardingDataSource();
151150
String sql = "UPDATE `t_order` SET `status` = ? WHERE `status` = ?";
152-
try (Connection connection = shardingDataSource.getConnection()) {
153-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
151+
try (Connection connection = shardingDataSource.getConnection();
152+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
154153
preparedStatement.setString(1, "updated");
155154
preparedStatement.setString(2, "init");
156155
assertThat(preparedStatement.executeUpdate(), is(20));
@@ -162,10 +161,10 @@ public void assertUpdateWithoutShardingValue() throws SQLException, DatabaseUnit
162161
public void assertDeleteWithoutAlias() throws SQLException, DatabaseUnitException {
163162
ShardingDataSource shardingDataSource = getShardingDataSource();
164163
String sql = "DELETE `t_order` WHERE `order_id` = ? AND `user_id` = ? AND `status` = ?";
165-
for (int i = 10; i < 12; i++) {
166-
for (int j = 0; j < 10; j++) {
167-
try (Connection connection = shardingDataSource.getConnection()) {
168-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
164+
try (Connection connection = shardingDataSource.getConnection();
165+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
166+
for (int i = 10; i < 12; i++) {
167+
for (int j = 0; j < 10; j++) {
169168
preparedStatement.setInt(1, i * 100 + j);
170169
preparedStatement.setInt(2, i);
171170
preparedStatement.setString(3, "init");
@@ -180,8 +179,8 @@ public void assertDeleteWithoutAlias() throws SQLException, DatabaseUnitExceptio
180179
public void assertDeleteWithoutShardingValue() throws SQLException, DatabaseUnitException {
181180
ShardingDataSource shardingDataSource = getShardingDataSource();
182181
String sql = "DELETE `t_order` WHERE `status` = ?";
183-
try (Connection connection = shardingDataSource.getConnection()) {
184-
PreparedStatement preparedStatement = connection.prepareStatement(sql);
182+
try (Connection connection = shardingDataSource.getConnection();
183+
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
185184
preparedStatement.setString(1, "init");
186185
assertThat(preparedStatement.executeUpdate(), is(20));
187186
}

0 commit comments

Comments
 (0)