Skip to content

Commit f87dfcd

Browse files
committed
Polishing
1 parent d4f7972 commit f87dfcd

File tree

3 files changed

+124
-108
lines changed

3 files changed

+124
-108
lines changed

src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,52 @@ public abstract class ConnectionPoolSupport {
6969
private ConnectionPoolSupport() {
7070
}
7171

72+
/**
73+
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
74+
* returned with {@link ObjectPool#returnObject(Object)}. By default, connections are validated by checking their
75+
* {@link StatefulConnection#isOpen()} method.
76+
*
77+
* @param connectionSupplier must not be {@code null}.
78+
* @param config must not be {@code null}.
79+
* @param <T> connection type.
80+
* @return the connection pool.
81+
*/
82+
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
83+
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config) {
84+
return createGenericObjectPool(connectionSupplier, config, true, (c) -> c.isOpen());
85+
}
86+
7287
/**
7388
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
7489
* returned with {@link ObjectPool#returnObject(Object)}.
7590
*
7691
* @param connectionSupplier must not be {@code null}.
7792
* @param config must not be {@code null}.
93+
* @param validationPredicate a {@link Predicate} to help validate connections
7894
* @param <T> connection type.
7995
* @return the connection pool.
8096
*/
8197
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
82-
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, Predicate<T> connectionValidator) {
83-
return createGenericObjectPool(connectionSupplier, config, true, connectionValidator);
98+
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, Predicate<T> validationPredicate) {
99+
return createGenericObjectPool(connectionSupplier, config, true, validationPredicate);
100+
}
101+
102+
/**
103+
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. By default, connections are validated by checking
104+
* their {@link StatefulConnection#isOpen()} method.
105+
*
106+
* @param connectionSupplier must not be {@code null}.
107+
* @param config must not be {@code null}.
108+
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
109+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
110+
* when invoking {@link StatefulConnection#close()}.
111+
* @param <T> connection type.
112+
* @return the connection pool.
113+
*/
114+
@SuppressWarnings("unchecked")
115+
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
116+
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections) {
117+
return createGenericObjectPool(connectionSupplier, config, wrapConnections, (c) -> c.isOpen());
84118
}
85119

86120
/**
@@ -89,24 +123,25 @@ private ConnectionPoolSupport() {
89123
* @param connectionSupplier must not be {@code null}.
90124
* @param config must not be {@code null}.
91125
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
92-
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
126+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
93127
* when invoking {@link StatefulConnection#close()}.
128+
* @param validationPredicate a {@link Predicate} to help validate connections
94129
* @param <T> connection type.
95130
* @return the connection pool.
96131
*/
97132
@SuppressWarnings("unchecked")
98133
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
99134
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections,
100-
Predicate<T> connectionValidator) {
135+
Predicate<T> validationPredicate) {
101136

102137
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
103138
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
104-
LettuceAssert.notNull(connectionValidator, "Connection validator must not be null");
139+
LettuceAssert.notNull(validationPredicate, "Connection validator must not be null");
105140

106141
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
107142

108143
GenericObjectPool<T> pool = new GenericObjectPool<T>(
109-
new EnhancedRedisPooledObjectFactory<T>(connectionSupplier, connectionValidator), config) {
144+
new RedisPooledObjectFactory<>(connectionSupplier, validationPredicate), config) {
110145

111146
@Override
112147
public T borrowObject() throws Exception {
@@ -149,20 +184,38 @@ public void returnObject(T obj) {
149184
*
150185
* @param connectionSupplier must not be {@code null}.
151186
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
152-
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
187+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
153188
* when invoking {@link StatefulConnection#close()}.
154189
* @param <T> connection type.
155190
* @return the connection pool.
156191
*/
157192
@SuppressWarnings("unchecked")
158193
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
159194
Supplier<T> connectionSupplier, boolean wrapConnections) {
195+
return createSoftReferenceObjectPool(connectionSupplier, wrapConnections, (c) -> c.isOpen());
196+
}
197+
198+
/**
199+
* Creates a new {@link SoftReferenceObjectPool} using the {@link Supplier}.
200+
*
201+
* @param connectionSupplier must not be {@code null}.
202+
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
203+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
204+
* when invoking {@link StatefulConnection#close()}.
205+
* @param validationPredicate a {@link Predicate} to help validate connections
206+
* @param <T> connection type.
207+
* @return the connection pool.
208+
*/
209+
@SuppressWarnings("unchecked")
210+
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
211+
Supplier<T> connectionSupplier, boolean wrapConnections, Predicate<T> validationPredicate) {
160212

161213
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
162214

163215
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
164216

165-
SoftReferenceObjectPool<T> pool = new SoftReferenceObjectPool<T>(new RedisPooledObjectFactory<>(connectionSupplier)) {
217+
SoftReferenceObjectPool<T> pool = new SoftReferenceObjectPool<T>(
218+
new RedisPooledObjectFactory<>(connectionSupplier, validationPredicate)) {
166219

167220
private final Lock lock = new ReentrantLock();
168221

@@ -205,8 +258,11 @@ private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>
205258

206259
private final Supplier<T> connectionSupplier;
207260

208-
RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
261+
private final Predicate<T> validationPredicate;
262+
263+
RedisPooledObjectFactory(Supplier<T> connectionSupplier, Predicate<T> validationPredicate) {
209264
this.connectionSupplier = connectionSupplier;
265+
this.validationPredicate = validationPredicate;
210266
}
211267

212268
@Override
@@ -226,7 +282,7 @@ public PooledObject<T> wrap(T obj) {
226282

227283
@Override
228284
public boolean validateObject(PooledObject<T> p) {
229-
return p.getObject().isOpen();
285+
return this.validationPredicate.test(p.getObject());
230286
}
231287

232288
}
@@ -254,43 +310,4 @@ public CompletableFuture<Void> returnObjectAsync(T o) throws Exception {
254310

255311
}
256312

257-
private static class EnhancedRedisPooledObjectFactory<T extends StatefulConnection<?, ?>>
258-
extends BasePooledObjectFactory<T> {
259-
260-
private final Supplier<T> connectionSupplier;
261-
262-
private final Predicate<T> connectionValidator;
263-
264-
EnhancedRedisPooledObjectFactory(Supplier<T> connectionSupplier, Predicate<T> connectionValidator) {
265-
this.connectionSupplier = connectionSupplier;
266-
this.connectionValidator = connectionValidator;
267-
}
268-
269-
@Override
270-
public T create() throws Exception {
271-
return connectionSupplier.get();
272-
}
273-
274-
@Override
275-
public PooledObject<T> wrap(T obj) {
276-
return new DefaultPooledObject<>(obj);
277-
}
278-
279-
@Override
280-
public boolean validateObject(PooledObject<T> p) {
281-
T connection = p.getObject();
282-
return connection.isOpen() && connectionValidator.test(connection);
283-
}
284-
285-
@Override
286-
public void destroyObject(PooledObject<T> p) throws Exception {
287-
try {
288-
p.getObject().close();
289-
} catch (Exception e) {
290-
e.printStackTrace();
291-
}
292-
}
293-
294-
}
295-
296313
}

src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,22 @@ void verifierShouldCatchTooFewParametersDeclarations() {
9999
@Test
100100
void shouldWorkWithPooledConnection() throws Exception {
101101

102+
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
103+
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>());
104+
105+
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
106+
107+
RedisCommandFactory factory = new RedisCommandFactory(connection);
108+
SimpleCommands commands = factory.getCommands(SimpleCommands.class);
109+
commands.get("foo");
110+
}
111+
112+
pool.close();
113+
}
114+
115+
@Test
116+
void shouldWorkWithPooledConnectionAndCustomValidation() throws Exception {
117+
102118
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
103119
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>(), connection -> {
104120
try {

src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java

Lines changed: 42 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,7 @@ static void afterClass() {
6666
void genericPoolShouldWorkWithWrappedConnections() throws Exception {
6767

6868
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
69-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
70-
try {
71-
return "PONG".equals(connection.sync().ping());
72-
} catch (Exception e) {
73-
return false;
74-
}
75-
});
69+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
7670

7771
borrowAndReturn(pool);
7872
borrowAndClose(pool);
@@ -97,13 +91,7 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
9791
poolConfig.setMaxIdle(2);
9892

9993
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
100-
.createGenericObjectPool(() -> client.connect(), poolConfig, connection -> {
101-
try {
102-
return "PONG".equals(connection.sync().ping());
103-
} catch (Exception e) {
104-
return false;
105-
}
106-
});
94+
.createGenericObjectPool(() -> client.connect(), poolConfig);
10795

10896
borrowAndReturn(pool);
10997
borrowAndClose(pool);
@@ -131,6 +119,21 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
131119
@Test
132120
void genericPoolShouldWorkWithPlainConnections() throws Exception {
133121

122+
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
123+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
124+
125+
borrowAndReturn(pool);
126+
127+
StatefulRedisConnection<String, String> connection = pool.borrowObject();
128+
assertThat(Proxy.isProxyClass(connection.getClass())).isFalse();
129+
pool.returnObject(connection);
130+
131+
pool.close();
132+
}
133+
134+
@Test
135+
void genericPoolShouldWorkWithValidationPredicate() throws Exception {
136+
134137
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
135138
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
136139
try {
@@ -166,17 +169,33 @@ void softReferencePoolShouldWorkWithPlainConnections() throws Exception {
166169
}
167170

168171
@Test
169-
void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception {
172+
void softReferencePoolShouldWorkWithValidationPredicate() throws Exception {
170173

171-
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
172-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
174+
SoftReferenceObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
175+
.createSoftReferenceObjectPool(() -> client.connect(), false, connection -> {
173176
try {
174177
return "PONG".equals(connection.sync().ping());
175178
} catch (Exception e) {
176179
return false;
177180
}
178181
});
179182

183+
borrowAndReturn(pool);
184+
185+
StatefulRedisConnection<String, String> connection = pool.borrowObject();
186+
assertThat(Proxy.isProxyClass(connection.getClass())).isFalse();
187+
pool.returnObject(connection);
188+
189+
connection.close();
190+
pool.close();
191+
}
192+
193+
@Test
194+
void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception {
195+
196+
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
197+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
198+
180199
StatefulRedisConnection<String, String> connection = pool.borrowObject();
181200
RedisCommands<String, String> sync = connection.sync();
182201
sync.set(key, value);
@@ -196,13 +215,7 @@ void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Excepti
196215
void wrappedConnectionShouldUseWrappers() throws Exception {
197216

198217
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
199-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
200-
try {
201-
return "PONG".equals(connection.sync().ping());
202-
} catch (Exception e) {
203-
return false;
204-
}
205-
});
218+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
206219

207220
StatefulRedisConnection<String, String> connection = pool.borrowObject();
208221
RedisCommands<String, String> sync = connection.sync();
@@ -227,13 +240,7 @@ void wrappedMasterSlaveConnectionShouldUseWrappers() throws Exception {
227240

228241
GenericObjectPool<StatefulRedisMasterReplicaConnection<String, String>> pool = ConnectionPoolSupport
229242
.createGenericObjectPool(() -> MasterReplica.connect(client, new StringCodec(), RedisURI.create(host, port)),
230-
new GenericObjectPoolConfig<>(), connection -> {
231-
try {
232-
return "PONG".equals(connection.sync().ping());
233-
} catch (Exception e) {
234-
return false;
235-
}
236-
});
243+
new GenericObjectPoolConfig<>());
237244

238245
StatefulRedisMasterReplicaConnection<String, String> connection = pool.borrowObject();
239246
RedisCommands<String, String> sync = connection.sync();
@@ -259,13 +266,7 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
259266
RedisURI.create(TestSettings.host(), 7379));
260267

261268
GenericObjectPool<StatefulRedisClusterConnection<String, String>> pool = ConnectionPoolSupport
262-
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>(), connection -> {
263-
try {
264-
return "PONG".equals(connection.sync().ping());
265-
} catch (Exception e) {
266-
return false;
267-
}
268-
});
269+
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>());
269270

270271
StatefulRedisClusterConnection<String, String> connection = pool.borrowObject();
271272
RedisAdvancedClusterCommands<String, String> sync = connection.sync();
@@ -292,13 +293,7 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
292293
void plainConnectionShouldNotUseWrappers() throws Exception {
293294

294295
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
295-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
296-
try {
297-
return "PONG".equals(connection.sync().ping());
298-
} catch (Exception e) {
299-
return false;
300-
}
301-
});
296+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
302297

303298
StatefulRedisConnection<String, String> connection = pool.borrowObject();
304299
RedisCommands<String, String> sync = connection.sync();
@@ -343,13 +338,7 @@ void softRefPoolShouldWorkWithWrappedConnections() throws Exception {
343338
void wrappedObjectClosedAfterReturn() throws Exception {
344339

345340
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
346-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true, connection -> {
347-
try {
348-
return "PONG".equals(connection.sync().ping());
349-
} catch (Exception e) {
350-
return false;
351-
}
352-
});
341+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true);
353342

354343
StatefulRedisConnection<String, String> connection = pool.borrowObject();
355344
RedisCommands<String, String> sync = connection.sync();
@@ -371,13 +360,7 @@ void wrappedObjectClosedAfterReturn() throws Exception {
371360
void tryWithResourcesReturnsConnectionToPool() throws Exception {
372361

373362
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
374-
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
375-
try {
376-
return "PONG".equals(connection.sync().ping());
377-
} catch (Exception e) {
378-
return false;
379-
}
380-
});
363+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
381364

382365
StatefulRedisConnection<String, String> usedConnection = null;
383366
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {

0 commit comments

Comments
 (0)