Skip to content

Commit 0c801e6

Browse files
authored
Short-circuit select if there's any error (#61)
1 parent d5d38fe commit 0c801e6

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

channels/src/main/java/com/softwaremill/jox/Select.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ public static <U> Object selectOrClosed(SelectClause<? extends U>... clauses) th
100100

101101
@SafeVarargs
102102
private static <U> Object doSelectOrClosed(SelectClause<? extends U>... clauses) throws InterruptedException {
103+
// short-circuiting if any of the channels is in error; otherwise, we might have selected a clause, for which
104+
// a value was available immediately - even though a channel for a clause appearing later was in error
105+
var anyError = getAnyChannelInError(clauses);
106+
if (anyError != null) {
107+
return anyError;
108+
}
109+
103110
// check that the clause doesn't refer to a channel that is already used in a different clause
104111
var allRendezvous = verifyChannelsUnique_getAreAllRendezvous(clauses);
105112

@@ -132,6 +139,20 @@ private static boolean verifyChannelsUnique_getAreAllRendezvous(SelectClause<?>[
132139
return allRendezvous;
133140
}
134141

142+
private static ChannelError getAnyChannelInError(SelectClause<?>[] clauses) {
143+
for (var clause : clauses) {
144+
var ch = clause.getChannel();
145+
if (ch != null) {
146+
// if a channel is in error, closedForSend() will return that information
147+
var closedForSend = clause.getChannel().closedForSend();
148+
if (closedForSend instanceof ChannelError ce) {
149+
return ce;
150+
}
151+
}
152+
}
153+
return null;
154+
}
155+
135156
public static <T> SelectClause<T> defaultClause(T value) {
136157
return defaultClause(() -> value);
137158
}

channels/src/test/java/com/softwaremill/jox/SelectReceiveTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,21 @@ void testBufferExpandedWhenSelecting() throws InterruptedException {
222222
void testSelectFromNone() throws InterruptedException {
223223
assertEquals(new ChannelDone(), selectOrClosed());
224224
}
225+
226+
@Test
227+
public void testSelect_immediate_withError() throws InterruptedException {
228+
// given
229+
Channel<String> ch1 = new Channel<>(2);
230+
ch1.send("x");
231+
232+
var e = new RuntimeException("boom!");
233+
Channel<String> ch2 = new Channel<>(2);
234+
ch2.error(e);
235+
236+
// when
237+
var result = selectOrClosed(ch1.receiveClause(), ch2.receiveClause());
238+
239+
// then
240+
assertEquals(new ChannelError(e), result);
241+
}
225242
}

0 commit comments

Comments
 (0)