Skip to content

Commit 91edb88

Browse files
ryoquntaiki-e
authored andcommitted
Add select_biased! macro (#1040)
1 parent d1ab079 commit 91edb88

File tree

4 files changed

+178
-25
lines changed

4 files changed

+178
-25
lines changed

crossbeam-channel/src/select.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ enum Timeout {
177177
fn run_select(
178178
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179179
timeout: Timeout,
180+
is_biased: bool,
180181
) -> Option<(Token, usize, *const u8)> {
181182
if handles.is_empty() {
182183
// Wait until the timeout and return.
@@ -193,8 +194,10 @@ fn run_select(
193194
}
194195
}
195196

196-
// Shuffle the operations for fairness.
197-
utils::shuffle(handles);
197+
if !is_biased {
198+
// Shuffle the operations for fairness.
199+
utils::shuffle(handles);
200+
}
198201

199202
// Create a token, which serves as a temporary variable that gets initialized in this function
200203
// and is later used by a call to `channel::read()` or `channel::write()` that completes the
@@ -325,6 +328,7 @@ fn run_select(
325328
fn run_ready(
326329
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
327330
timeout: Timeout,
331+
is_biased: bool,
328332
) -> Option<usize> {
329333
if handles.is_empty() {
330334
// Wait until the timeout and return.
@@ -341,8 +345,10 @@ fn run_ready(
341345
}
342346
}
343347

344-
// Shuffle the operations for fairness.
345-
utils::shuffle(handles);
348+
if !is_biased {
349+
// Shuffle the operations for fairness.
350+
utils::shuffle(handles);
351+
}
346352

347353
loop {
348354
let backoff = Backoff::new();
@@ -450,8 +456,9 @@ fn run_ready(
450456
#[inline]
451457
pub fn try_select<'a>(
452458
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459+
is_biased: bool,
453460
) -> Result<SelectedOperation<'a>, TrySelectError> {
454-
match run_select(handles, Timeout::Now) {
461+
match run_select(handles, Timeout::Now, is_biased) {
455462
None => Err(TrySelectError),
456463
Some((token, index, ptr)) => Ok(SelectedOperation {
457464
token,
@@ -467,12 +474,13 @@ pub fn try_select<'a>(
467474
#[inline]
468475
pub fn select<'a>(
469476
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477+
is_biased: bool,
470478
) -> SelectedOperation<'a> {
471479
if handles.is_empty() {
472480
panic!("no operations have been added to `Select`");
473481
}
474482

475-
let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
483+
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
476484
SelectedOperation {
477485
token,
478486
index,
@@ -487,10 +495,11 @@ pub fn select<'a>(
487495
pub fn select_timeout<'a>(
488496
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
489497
timeout: Duration,
498+
is_biased: bool,
490499
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
491500
match Instant::now().checked_add(timeout) {
492-
Some(deadline) => select_deadline(handles, deadline),
493-
None => Ok(select(handles)),
501+
Some(deadline) => select_deadline(handles, deadline, is_biased),
502+
None => Ok(select(handles, is_biased)),
494503
}
495504
}
496505

@@ -499,8 +508,9 @@ pub fn select_timeout<'a>(
499508
pub(crate) fn select_deadline<'a>(
500509
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
501510
deadline: Instant,
511+
is_biased: bool,
502512
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
503-
match run_select(handles, Timeout::At(deadline)) {
513+
match run_select(handles, Timeout::At(deadline), is_biased) {
504514
None => Err(SelectTimeoutError),
505515
Some((token, index, ptr)) => Ok(SelectedOperation {
506516
token,
@@ -764,7 +774,7 @@ impl<'a> Select<'a> {
764774
/// }
765775
/// ```
766776
pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
767-
try_select(&mut self.handles)
777+
try_select(&mut self.handles, false)
768778
}
769779

770780
/// Blocks until one of the operations becomes ready and selects it.
@@ -811,7 +821,7 @@ impl<'a> Select<'a> {
811821
/// }
812822
/// ```
813823
pub fn select(&mut self) -> SelectedOperation<'a> {
814-
select(&mut self.handles)
824+
select(&mut self.handles, false)
815825
}
816826

817827
/// Blocks for a limited time until one of the operations becomes ready and selects it.
@@ -861,7 +871,7 @@ impl<'a> Select<'a> {
861871
&mut self,
862872
timeout: Duration,
863873
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
864-
select_timeout(&mut self.handles, timeout)
874+
select_timeout(&mut self.handles, timeout, false)
865875
}
866876

867877
/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
@@ -913,7 +923,7 @@ impl<'a> Select<'a> {
913923
&mut self,
914924
deadline: Instant,
915925
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
916-
select_deadline(&mut self.handles, deadline)
926+
select_deadline(&mut self.handles, deadline, false)
917927
}
918928

919929
/// Attempts to find a ready operation without blocking.
@@ -952,7 +962,7 @@ impl<'a> Select<'a> {
952962
/// }
953963
/// ```
954964
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
955-
match run_ready(&mut self.handles, Timeout::Now) {
965+
match run_ready(&mut self.handles, Timeout::Now, false) {
956966
None => Err(TryReadyError),
957967
Some(index) => Ok(index),
958968
}
@@ -1005,7 +1015,7 @@ impl<'a> Select<'a> {
10051015
panic!("no operations have been added to `Select`");
10061016
}
10071017

1008-
run_ready(&mut self.handles, Timeout::Never).unwrap()
1018+
run_ready(&mut self.handles, Timeout::Never, false).unwrap()
10091019
}
10101020

10111021
/// Blocks for a limited time until one of the operations becomes ready.
@@ -1098,7 +1108,7 @@ impl<'a> Select<'a> {
10981108
/// }
10991109
/// ```
11001110
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1101-
match run_ready(&mut self.handles, Timeout::At(deadline)) {
1111+
match run_ready(&mut self.handles, Timeout::At(deadline), false) {
11021112
None => Err(ReadyTimeoutError),
11031113
Some(index) => Ok(index),
11041114
}

crossbeam-channel/src/select_macro.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal {
750750
$cases:tt
751751
) => {{
752752
let _oper: $crate::SelectedOperation<'_> = {
753-
let _oper = $crate::internal::select(&mut $sel);
753+
let _oper = $crate::internal::select(&mut $sel, _IS_BIASED);
754754

755755
// Erase the lifetime so that `sel` can be dropped early even without NLL.
756756
unsafe { ::std::mem::transmute(_oper) }
@@ -772,7 +772,7 @@ macro_rules! crossbeam_channel_internal {
772772
$cases:tt
773773
) => {{
774774
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
775-
let _oper = $crate::internal::try_select(&mut $sel);
775+
let _oper = $crate::internal::try_select(&mut $sel, _IS_BIASED);
776776

777777
// Erase the lifetime so that `sel` can be dropped early even without NLL.
778778
unsafe { ::std::mem::transmute(_oper) }
@@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal {
802802
$cases:tt
803803
) => {{
804804
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
805-
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout);
805+
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, _IS_BIASED);
806806

807807
// Erase the lifetime so that `sel` can be dropped early even without NLL.
808808
unsafe { ::std::mem::transmute(_oper) }
@@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal {
985985
///
986986
/// This macro allows you to define a set of channel operations, wait until any one of them becomes
987987
/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
988-
/// among them is selected.
988+
/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased
989+
/// selection.
989990
///
990991
/// It is also possible to define a `default` case that gets executed if none of the operations are
991992
/// ready, either right away or for a certain duration of time.
@@ -1109,8 +1110,33 @@ macro_rules! crossbeam_channel_internal {
11091110
#[macro_export]
11101111
macro_rules! select {
11111112
($($tokens:tt)*) => {
1112-
$crate::crossbeam_channel_internal!(
1113-
$($tokens)*
1114-
)
1113+
{
1114+
const _IS_BIASED: bool = false;
1115+
1116+
$crate::crossbeam_channel_internal!(
1117+
$($tokens)*
1118+
)
1119+
}
1120+
};
1121+
}
1122+
1123+
/// Selects from a set of channel operations.
1124+
///
1125+
/// This macro allows you to define a list of channel operations, wait until any one of them
1126+
/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the
1127+
/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use
1128+
/// [`select!`] for the unbiased selection.
1129+
///
1130+
/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax.
1131+
#[macro_export]
1132+
macro_rules! select_biased {
1133+
($($tokens:tt)*) => {
1134+
{
1135+
const _IS_BIASED: bool = true;
1136+
1137+
$crate::crossbeam_channel_internal!(
1138+
$($tokens)*
1139+
)
1140+
}
11151141
};
11161142
}

crossbeam-channel/tests/mpsc.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ macro_rules! select {
176176
(
177177
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178178
) => ({
179+
const _IS_BIASED: bool = false;
180+
179181
cc::crossbeam_channel_internal! {
180182
$(
181183
$meth(($rx).inner) -> res => {

crossbeam-channel/tests/select_macro.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::ops::Deref;
99
use std::thread;
1010
use std::time::{Duration, Instant};
1111

12-
use crossbeam_channel::{after, bounded, never, select, tick, unbounded};
12+
use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
1313
use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
1414
use crossbeam_utils::thread::scope;
1515

@@ -943,7 +943,122 @@ fn fairness_send() {
943943
assert!(hits.iter().all(|x| *x >= COUNT / 4));
944944
}
945945

946-
#[allow(clippy::or_fun_call)] // This is intentional.
946+
#[test]
947+
fn unfairness() {
948+
#[cfg(miri)]
949+
const COUNT: usize = 100;
950+
#[cfg(not(miri))]
951+
const COUNT: usize = 10_000;
952+
953+
let (s1, r1) = unbounded::<()>();
954+
let (s2, r2) = unbounded::<()>();
955+
let (s3, r3) = unbounded::<()>();
956+
957+
for _ in 0..COUNT {
958+
s1.send(()).unwrap();
959+
s2.send(()).unwrap();
960+
}
961+
s3.send(()).unwrap();
962+
963+
let mut hits = [0usize; 3];
964+
for _ in 0..COUNT {
965+
select_biased! {
966+
recv(r1) -> _ => hits[0] += 1,
967+
recv(r2) -> _ => hits[1] += 1,
968+
recv(r3) -> _ => hits[2] += 1,
969+
}
970+
}
971+
assert_eq!(hits, [COUNT, 0, 0]);
972+
973+
for _ in 0..COUNT {
974+
select_biased! {
975+
recv(r1) -> _ => hits[0] += 1,
976+
recv(r2) -> _ => hits[1] += 1,
977+
recv(r3) -> _ => hits[2] += 1,
978+
}
979+
}
980+
assert_eq!(hits, [COUNT, COUNT, 0]);
981+
}
982+
983+
#[test]
984+
fn unfairness_timeout() {
985+
#[cfg(miri)]
986+
const COUNT: usize = 100;
987+
#[cfg(not(miri))]
988+
const COUNT: usize = 10_000;
989+
990+
let (s1, r1) = unbounded::<()>();
991+
let (s2, r2) = unbounded::<()>();
992+
let (s3, r3) = unbounded::<()>();
993+
994+
for _ in 0..COUNT {
995+
s1.send(()).unwrap();
996+
s2.send(()).unwrap();
997+
}
998+
s3.send(()).unwrap();
999+
1000+
let mut hits = [0usize; 3];
1001+
for _ in 0..COUNT {
1002+
select_biased! {
1003+
recv(r1) -> _ => hits[0] += 1,
1004+
recv(r2) -> _ => hits[1] += 1,
1005+
recv(r3) -> _ => hits[2] += 1,
1006+
default(ms(1000)) => unreachable!(),
1007+
}
1008+
}
1009+
assert_eq!(hits, [COUNT, 0, 0]);
1010+
1011+
for _ in 0..COUNT {
1012+
select_biased! {
1013+
recv(r1) -> _ => hits[0] += 1,
1014+
recv(r2) -> _ => hits[1] += 1,
1015+
recv(r3) -> _ => hits[2] += 1,
1016+
default(ms(1000)) => unreachable!(),
1017+
}
1018+
}
1019+
assert_eq!(hits, [COUNT, COUNT, 0]);
1020+
}
1021+
1022+
#[test]
1023+
fn unfairness_try() {
1024+
#[cfg(miri)]
1025+
const COUNT: usize = 100;
1026+
#[cfg(not(miri))]
1027+
const COUNT: usize = 10_000;
1028+
1029+
let (s1, r1) = unbounded::<()>();
1030+
let (s2, r2) = unbounded::<()>();
1031+
let (s3, r3) = unbounded::<()>();
1032+
1033+
for _ in 0..COUNT {
1034+
s1.send(()).unwrap();
1035+
s2.send(()).unwrap();
1036+
}
1037+
s3.send(()).unwrap();
1038+
1039+
let mut hits = [0usize; 3];
1040+
for _ in 0..COUNT {
1041+
select_biased! {
1042+
recv(r1) -> _ => hits[0] += 1,
1043+
recv(r2) -> _ => hits[1] += 1,
1044+
recv(r3) -> _ => hits[2] += 1,
1045+
default() => unreachable!(),
1046+
}
1047+
}
1048+
assert_eq!(hits, [COUNT, 0, 0]);
1049+
1050+
for _ in 0..COUNT {
1051+
select_biased! {
1052+
recv(r1) -> _ => hits[0] += 1,
1053+
recv(r2) -> _ => hits[1] += 1,
1054+
recv(r3) -> _ => hits[2] += 1,
1055+
default() => unreachable!(),
1056+
}
1057+
}
1058+
assert_eq!(hits, [COUNT, COUNT, 0]);
1059+
}
1060+
1061+
#[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
9471062
#[test]
9481063
fn references() {
9491064
let (s, r) = unbounded::<i32>();

0 commit comments

Comments
 (0)