Skip to content

Commit c1c8d10

Browse files
committed
Merge remote-tracking branch 'origin/tokio-1.38.x' into forward-port-1.38.x
2 parents 5f3296d + aa303bc commit c1c8d10

File tree

8 files changed

+84
-54
lines changed

8 files changed

+84
-54
lines changed

.cirrus.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
only_if: $CIRRUS_TAG == '' && ($CIRRUS_PR != '' || $CIRRUS_BRANCH == 'master' || $CIRRUS_BRANCH =~ 'tokio-.*')
22
auto_cancellation: $CIRRUS_BRANCH != 'master' && $CIRRUS_BRANCH !=~ 'tokio-.*'
33
freebsd_instance:
4-
image_family: freebsd-14-1
4+
image_family: freebsd-14-2
55
env:
66
RUST_STABLE: stable
77
RUST_NIGHTLY: nightly-2024-05-05

.github/workflows/ci.yml

+23-5
Original file line numberDiff line numberDiff line change
@@ -475,10 +475,18 @@ jobs:
475475
runs-on: ubuntu-latest
476476
steps:
477477
- uses: actions/checkout@v4
478-
- name: Check semver
478+
- name: Check `tokio` semver
479479
uses: obi1kenobi/cargo-semver-checks-action@v2
480480
with:
481481
rust-toolchain: ${{ env.rust_stable }}
482+
package: tokio
483+
release-type: minor
484+
- name: Check semver for rest of the workspace
485+
if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }}
486+
uses: obi1kenobi/cargo-semver-checks-action@v2
487+
with:
488+
rust-toolchain: ${{ env.rust_stable }}
489+
exclude: tokio
482490
release-type: minor
483491

484492
cross-check:
@@ -710,7 +718,14 @@ jobs:
710718
toolchain: ${{ env.rust_min }}
711719
- uses: Swatinem/rust-cache@v2
712720
- name: "check --workspace --all-features"
713-
run: cargo check --workspace --all-features
721+
run: |
722+
if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then
723+
# Only check `tokio` crate as the PR is backporting to an earlier tokio release.
724+
cargo check -p tokio --all-features
725+
else
726+
# Check all crates in the workspace
727+
cargo check --workspace --all-features
728+
fi
714729
env:
715730
RUSTFLAGS: "" # remove -Dwarnings
716731

@@ -1006,10 +1021,10 @@ jobs:
10061021
targets: ${{ matrix.target }}
10071022

10081023
# Install dependencies
1009-
- name: Install cargo-hack, wasmtime, and cargo-wasi
1024+
- name: Install cargo-hack, wasmtime
10101025
uses: taiki-e/install-action@v2
10111026
with:
1012-
tool: cargo-hack,wasmtime,cargo-wasi
1027+
tool: cargo-hack,wasmtime
10131028

10141029
- uses: Swatinem/rust-cache@v2
10151030
- name: WASI test tokio full
@@ -1035,9 +1050,12 @@ jobs:
10351050

10361051
- name: test tests-integration --features wasi-rt
10371052
# TODO: this should become: `cargo hack wasi test --each-feature`
1038-
run: cargo wasi test --test rt_yield --features wasi-rt
1053+
run: cargo test --target ${{ matrix.target }} --test rt_yield --features wasi-rt
10391054
if: matrix.target == 'wasm32-wasip1'
10401055
working-directory: tests-integration
1056+
env:
1057+
CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --"
1058+
RUSTFLAGS: -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864
10411059

10421060
- name: test tests-integration --features wasi-threads-rt
10431061
run: cargo test --target ${{ matrix.target }} --features wasi-threads-rt

Cargo.toml

+13
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,16 @@ members = [
1717

1818
[workspace.metadata.spellcheck]
1919
config = "spellcheck.toml"
20+
21+
[workspace.lints.rust]
22+
unexpected_cfgs = { level = "warn", check-cfg = [
23+
'cfg(fuzzing)',
24+
'cfg(loom)',
25+
'cfg(mio_unsupported_force_poll_poll)',
26+
'cfg(tokio_allow_from_blocking_fd)',
27+
'cfg(tokio_internal_mt_counters)',
28+
'cfg(tokio_no_parking_lot)',
29+
'cfg(tokio_no_tuning_tests)',
30+
'cfg(tokio_taskdump)',
31+
'cfg(tokio_unstable)',
32+
] }

examples/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,6 @@ path = "named-pipe-multi-client.rs"
9595
[[example]]
9696
name = "dump"
9797
path = "dump.rs"
98+
99+
[lints]
100+
workspace = true

tokio/CHANGELOG.md

+14
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,20 @@ Yanked. Please use 1.39.1 instead.
303303
[#6709]: https://github.com/tokio-rs/tokio/pull/6709
304304
[#6710]: https://github.com/tokio-rs/tokio/pull/6710
305305

306+
# 1.38.2 (April 2nd, 2025)
307+
308+
This release fixes a soundness issue in the broadcast channel. The channel
309+
accepts values that are `Send` but `!Sync`. Previously, the channel called
310+
`clone()` on these values without synchronizing. This release fixes the channel
311+
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
312+
reporting the issue).
313+
314+
### Fixed
315+
316+
- sync: synchronize `clone()` call in broadcast channel ([#7232])
317+
318+
[#7232]: https://github.com/tokio-rs/tokio/pull/7232
319+
306320
# 1.38.1 (July 16th, 2024)
307321

308322
This release fixes the bug identified as ([#6682]), which caused timers not

tokio/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,6 @@ allowed_external_types = [
173173
"bytes::buf::buf_mut::BufMut",
174174
"tokio_macros::*",
175175
]
176+
177+
[lints]
178+
workspace = true

tokio/src/runtime/tests/queue.rs

-20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::runtime::scheduler::multi_thread::{queue, Stats};
2-
use crate::runtime::task::{self, Schedule, Task, TaskHarnessScheduleHooks};
32

43
use std::cell::RefCell;
54
use std::thread;
@@ -272,22 +271,3 @@ fn stress2() {
272271
assert_eq!(num_pop, NUM_TASKS);
273272
}
274273
}
275-
276-
#[allow(dead_code)]
277-
struct Runtime;
278-
279-
impl Schedule for Runtime {
280-
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
281-
None
282-
}
283-
284-
fn schedule(&self, _task: task::Notified<Self>) {
285-
unreachable!();
286-
}
287-
288-
fn hooks(&self) -> TaskHarnessScheduleHooks {
289-
TaskHarnessScheduleHooks {
290-
task_terminate_callback: None,
291-
}
292-
}
293-
}

tokio/src/sync/broadcast.rs

+27-28
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
119119
use crate::loom::cell::UnsafeCell;
120120
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121-
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
121+
use crate::loom::sync::{Arc, Mutex, MutexGuard};
122122
use crate::runtime::coop::cooperative;
123123
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124124
use crate::util::WakeList;
@@ -304,7 +304,7 @@ use self::error::{RecvError, SendError, TryRecvError};
304304
/// Data shared between senders and receivers.
305305
struct Shared<T> {
306306
/// slots in the channel.
307-
buffer: Box<[RwLock<Slot<T>>]>,
307+
buffer: Box<[Mutex<Slot<T>>]>,
308308

309309
/// Mask a position -> index.
310310
mask: usize,
@@ -348,7 +348,7 @@ struct Slot<T> {
348348
///
349349
/// The value is set by `send` when the write lock is held. When a reader
350350
/// drops, `rem` is decremented. When it hits zero, the value is dropped.
351-
val: UnsafeCell<Option<T>>,
351+
val: Option<T>,
352352
}
353353

354354
/// An entry in the wait queue.
@@ -386,7 +386,7 @@ generate_addr_of_methods! {
386386
}
387387

388388
struct RecvGuard<'a, T> {
389-
slot: RwLockReadGuard<'a, Slot<T>>,
389+
slot: MutexGuard<'a, Slot<T>>,
390390
}
391391

392392
/// Receive a value future.
@@ -395,11 +395,15 @@ struct Recv<'a, T> {
395395
receiver: &'a mut Receiver<T>,
396396

397397
/// Entry in the waiter `LinkedList`.
398-
waiter: UnsafeCell<Waiter>,
398+
waiter: WaiterCell,
399399
}
400400

401-
unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
402-
unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
401+
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
402+
// from `Recv`.
403+
struct WaiterCell(UnsafeCell<Waiter>);
404+
405+
unsafe impl Send for WaiterCell {}
406+
unsafe impl Sync for WaiterCell {}
403407

404408
/// Max number of receivers. Reserve space to lock.
405409
const MAX_RECEIVERS: usize = usize::MAX >> 2;
@@ -467,12 +471,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
467471
(tx, rx)
468472
}
469473

470-
unsafe impl<T: Send> Send for Sender<T> {}
471-
unsafe impl<T: Send> Sync for Sender<T> {}
472-
473-
unsafe impl<T: Send> Send for Receiver<T> {}
474-
unsafe impl<T: Send> Sync for Receiver<T> {}
475-
476474
impl<T> Sender<T> {
477475
/// Creates the sending-half of the [`broadcast`] channel.
478476
///
@@ -511,10 +509,10 @@ impl<T> Sender<T> {
511509
let mut buffer = Vec::with_capacity(capacity);
512510

513511
for i in 0..capacity {
514-
buffer.push(RwLock::new(Slot {
512+
buffer.push(Mutex::new(Slot {
515513
rem: AtomicUsize::new(0),
516514
pos: (i as u64).wrapping_sub(capacity as u64),
517-
val: UnsafeCell::new(None),
515+
val: None,
518516
}));
519517
}
520518

@@ -600,7 +598,7 @@ impl<T> Sender<T> {
600598
tail.pos = tail.pos.wrapping_add(1);
601599

602600
// Get the slot
603-
let mut slot = self.shared.buffer[idx].write();
601+
let mut slot = self.shared.buffer[idx].lock();
604602

605603
// Track the position
606604
slot.pos = pos;
@@ -609,7 +607,7 @@ impl<T> Sender<T> {
609607
slot.rem.with_mut(|v| *v = rem);
610608

611609
// Write the value
612-
slot.val = UnsafeCell::new(Some(value));
610+
slot.val = Some(value);
613611

614612
// Release the slot lock before notifying the receivers.
615613
drop(slot);
@@ -696,7 +694,7 @@ impl<T> Sender<T> {
696694
while low < high {
697695
let mid = low + (high - low) / 2;
698696
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
699-
if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
697+
if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
700698
low = mid + 1;
701699
} else {
702700
high = mid;
@@ -738,7 +736,7 @@ impl<T> Sender<T> {
738736
let tail = self.shared.tail.lock();
739737

740738
let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
741-
self.shared.buffer[idx].read().rem.load(SeqCst) == 0
739+
self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
742740
}
743741

744742
/// Returns the number of active receivers.
@@ -1058,7 +1056,7 @@ impl<T> Receiver<T> {
10581056
let idx = (self.next & self.shared.mask as u64) as usize;
10591057

10601058
// The slot holding the next value to read
1061-
let mut slot = self.shared.buffer[idx].read();
1059+
let mut slot = self.shared.buffer[idx].lock();
10621060

10631061
if slot.pos != self.next {
10641062
// Release the `slot` lock before attempting to acquire the `tail`
@@ -1075,7 +1073,7 @@ impl<T> Receiver<T> {
10751073
let mut tail = self.shared.tail.lock();
10761074

10771075
// Acquire slot lock again
1078-
slot = self.shared.buffer[idx].read();
1076+
slot = self.shared.buffer[idx].lock();
10791077

10801078
// Make sure the position did not change. This could happen in the
10811079
// unlikely event that the buffer is wrapped between dropping the
@@ -1367,12 +1365,12 @@ impl<'a, T> Recv<'a, T> {
13671365
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
13681366
Recv {
13691367
receiver,
1370-
waiter: UnsafeCell::new(Waiter {
1368+
waiter: WaiterCell(UnsafeCell::new(Waiter {
13711369
queued: AtomicBool::new(false),
13721370
waker: None,
13731371
pointers: linked_list::Pointers::new(),
13741372
_p: PhantomPinned,
1375-
}),
1373+
})),
13761374
}
13771375
}
13781376

@@ -1384,7 +1382,7 @@ impl<'a, T> Recv<'a, T> {
13841382
is_unpin::<&mut Receiver<T>>();
13851383

13861384
let me = self.get_unchecked_mut();
1387-
(me.receiver, &me.waiter)
1385+
(me.receiver, &me.waiter.0)
13881386
}
13891387
}
13901388
}
@@ -1418,6 +1416,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14181416
// `Shared::notify_rx` before we drop the object.
14191417
let queued = self
14201418
.waiter
1419+
.0
14211420
.with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
14221421

14231422
// If the waiter is queued, we need to unlink it from the waiters list.
@@ -1432,6 +1431,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14321431
// `Relaxed` order suffices because we hold the tail lock.
14331432
let queued = self
14341433
.waiter
1434+
.0
14351435
.with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
14361436

14371437
if queued {
@@ -1440,7 +1440,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14401440
// safety: tail lock is held and the wait node is verified to be in
14411441
// the list.
14421442
unsafe {
1443-
self.waiter.with_mut(|ptr| {
1443+
self.waiter.0.with_mut(|ptr| {
14441444
tail.waiters.remove((&mut *ptr).into());
14451445
});
14461446
}
@@ -1486,16 +1486,15 @@ impl<'a, T> RecvGuard<'a, T> {
14861486
where
14871487
T: Clone,
14881488
{
1489-
self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1489+
self.slot.val.clone()
14901490
}
14911491
}
14921492

14931493
impl<'a, T> Drop for RecvGuard<'a, T> {
14941494
fn drop(&mut self) {
14951495
// Decrement the remaining counter
14961496
if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497-
// Safety: Last receiver, drop the value
1498-
self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1497+
self.slot.val = None;
14991498
}
15001499
}
15011500
}

0 commit comments

Comments
 (0)