Skip to content

Commit d56bc76

Browse files
authored
Fix deadlock when an invalid URI is presented to DefaultClusterTopologyRefresh (#3243)
* Add failing unit test for deadlock issue * Fix underlying deadlock issue
1 parent 0d48e88 commit d56bc76

File tree

2 files changed

+26
-5
lines changed

2 files changed

+26
-5
lines changed

src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -307,19 +307,21 @@ private void openConnections(ConnectionTracker tracker, Iterable<RedisURI> redis
307307

308308
for (RedisURI redisURI : redisURIs) {
309309

310-
if (redisURI.getHost() == null || tracker.contains(redisURI) || !isEventLoopActive()) {
311-
continue;
312-
}
310+
CompletableFuture<StatefulRedisConnection<String, String>> sync = new CompletableFuture<>();
313311

314312
try {
313+
314+
if (redisURI.getHost() == null || tracker.contains(redisURI) || !isEventLoopActive()) {
315+
continue;
316+
}
317+
315318
SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(redisURI);
316319

317320
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture = nodeConnectionFactory
318321
.connectToNodeAsync(StringCodec.UTF8, socketAddress);
319322

320323
// Note: timeout skew due to potential socket address resolution and connection work possible.
321324

322-
CompletableFuture<StatefulRedisConnection<String, String>> sync = new CompletableFuture<>();
323325
Timeout cancelTimeout = clientResources.timer().newTimeout(it -> {
324326

325327
String message = String.format("Unable to connect to [%s]: Timeout after %s", socketAddress,
@@ -360,7 +362,10 @@ private void openConnections(ConnectionTracker tracker, Iterable<RedisURI> redis
360362

361363
tracker.addConnection(redisURI, sync);
362364
} catch (RuntimeException e) {
363-
logger.warn(String.format("Unable to connect to [%s]", redisURI), e);
365+
String message = String.format("Unable to connect to [%s]", redisURI);
366+
logger.warn(message, e);
367+
sync.completeExceptionally(new RedisConnectionException(message, e));
368+
tracker.addConnection(redisURI, sync);
364369
}
365370
}
366371
}

src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import static io.lettuce.TestTags.UNIT_TEST;
2323
import static org.assertj.core.api.Assertions.*;
24+
import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD;
2425
import static org.mockito.ArgumentMatchers.*;
2526
import static org.mockito.Mockito.*;
2627
import static org.mockito.Mockito.anyLong;
@@ -40,6 +41,7 @@
4041
import java.util.concurrent.CompletionStage;
4142
import java.util.concurrent.TimeUnit;
4243

44+
import org.junit.jupiter.api.Assertions;
4345
import org.junit.jupiter.api.BeforeEach;
4446
import org.junit.jupiter.api.Tag;
4547
import org.junit.jupiter.api.Test;
@@ -455,6 +457,20 @@ void shouldCloseConnections() {
455457
verify(connection2).closeAsync();
456458
}
457459

460+
/**
461+
* @see <a href="https://github.com/redis/lettuce/issues/3240">Issue link</a>
462+
*/
463+
@Test
464+
@org.junit.jupiter.api.Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
465+
void shouldHandleInvalidUrisWithoutDeadlock() {
466+
List<RedisURI> seed = Arrays.asList(RedisURI.create("redis://localhost:$(INVALID_DATA):CONFIG"),
467+
RedisURI.create("redis://localhost:$(INVALID_DATA):CONFIG"));
468+
CompletionException completionException = Assertions.assertThrows(CompletionException.class,
469+
() -> sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join());
470+
assertThat(completionException)
471+
.hasRootCauseInstanceOf(DefaultClusterTopologyRefresh.CannotRetrieveClusterPartitions.class);
472+
}
473+
458474
@Test
459475
void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {
460476

0 commit comments

Comments
 (0)