Skip to content

Commit 05b4358

Browse files
committed
Finish getting tests to work with shuttle
1 parent 6baca5f commit 05b4358

File tree

10 files changed

+75
-74
lines changed

10 files changed

+75
-74
lines changed

.cargo/config.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[build]
22
# Setting cfg here means our IDE and CLI both use the same values.
3-
#rustflags = "--cfg shuttle"
3+
#rustflags = "--cfg loom"

Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tracing = { version = "0.1.40", features = ["log"] }
4343
twox-hash = { version = "1.6.3", optional = true }
4444
once_cell = "1.19.0"
4545
ctx-thread = "0.1.1"
46+
shuttle = { version = "0.7.1", optional = true }
4647

4748
[target.'cfg(windows)'.dependencies.windows]
4849
version = "0.52.0"
@@ -66,12 +67,10 @@ test-log = "0.2.14"
6667
[target.'cfg(loom)'.dev-dependencies]
6768
loom = "0.7"
6869

69-
[target.'cfg(shuttle)'.dependencies]
70-
shuttle = "0.7.1"
71-
7270
[features]
7371
default = []
7472
testing = ["dep:fdlimit", "dep:rayon", "dep:twox-hash"]
73+
shuttle = ["dep:shuttle"]
7574

7675
[[bench]]
7776
name = "possum"

src/concurrency/mod.rs

+37-18
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,54 @@
11
pub(crate) mod sync;
22

3-
#[cfg(not(shuttle))]
3+
#[cfg(not(feature = "shuttle"))]
44
pub use std::thread;
55

66
// This isn't available in loom or shuttle yet. Unfortunately for shuttle it means threads are
77
// spawned outside its control, and it doesn't work.
8-
#[cfg(shuttle)]
8+
#[cfg(feature = "shuttle")]
99
pub use shuttle::thread;
1010

11-
#[cfg(not(shuttle))]
11+
#[cfg(not(feature = "shuttle"))]
1212
pub(crate) fn run_blocking<F, R>(f: F) -> R
1313
where
14-
F: FnOnce() -> R+Send,
15-
R: Send,
14+
F: FnOnce() -> R + Send,
15+
R: Send,
1616
{
17-
// let (sender, receiver) = std::sync::mpsc::channel();
18-
// std::thread::scope(|scope|{
19-
// scope.spawn(f)
20-
// });
21-
unimplemented!()
17+
if false {
18+
let (sender, receiver) = std::sync::mpsc::channel();
19+
let tx_thread = std::thread::scope(|scope| {
20+
scope.spawn(|| {
21+
let res = f();
22+
sender.send(res).unwrap();
23+
});
24+
receiver.recv().unwrap()
25+
});
26+
tx_thread
27+
} else {
28+
f()
29+
}
2230
}
2331

24-
#[cfg(shuttle)]
32+
#[cfg(feature = "shuttle")]
2533
pub(crate) fn run_blocking<F, R>(f: F) -> R
2634
where
27-
F: FnOnce() -> R+Send,
28-
R: Send,
29-
35+
F: FnOnce() -> R + Send,
36+
R: Send,
3037
{
31-
let (sender, receiver) = shuttle::sync::mpsc::channel();
32-
std::thread::scope(|scope| {
33-
scope.spawn(f)
34-
})
38+
use std::sync::mpsc;
39+
let (sender, receiver) = mpsc::channel();
40+
let tx_thread = std::thread::scope(|scope| {
41+
scope.spawn(||{
42+
let res = f();
43+
sender.send(res).unwrap();
44+
});
45+
loop {
46+
shuttle::thread::yield_now();
47+
match receiver.try_recv() {
48+
Err(mpsc::TryRecvError::Empty) => continue,
49+
default => return default.unwrap()
50+
}
51+
}
52+
});
53+
tx_thread
3554
}

src/concurrency/sync.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::StableDeref;
22
use std::ops::{Deref, DerefMut};
33

4-
#[cfg(shuttle)]
4+
#[cfg(feature = "shuttle")]
55
use shuttle::sync;
6-
#[cfg(not(shuttle))]
6+
#[cfg(not(feature = "shuttle"))]
77
use std::sync;
88

99
use sync::Mutex as InnerMutex;

src/exclusive_file.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,6 @@ impl ExclusiveFile {
131131

132132
impl Drop for ExclusiveFile {
133133
fn drop(&mut self) {
134-
debug!("dropping exclusive file {}", self.id.deref());
134+
debug!("dropping exclusive file {}", self.id);
135135
}
136136
}

src/file_id.rs

-8
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ impl Debug for FileId {
1515
}
1616
}
1717

18-
impl Deref for FileId {
19-
type Target = FileIdInner;
20-
21-
fn deref(&self) -> &Self::Target {
22-
&self.0
23-
}
24-
}
25-
2618
impl std::str::FromStr for FileId {
2719
type Err = std::num::ParseIntError;
2820

src/handle.rs

+14-31
Original file line numberDiff line numberDiff line change
@@ -224,25 +224,17 @@ impl Handle {
224224
&self,
225225
behaviour: TransactionBehavior,
226226
) -> rusqlite::Result<OwnedTx> {
227-
unsafe {
228-
Ok(self
229-
.start_transaction(|conn, handle| {
230-
let conn_ptr = std::ptr::from_mut(conn);
231-
let conn_void_ptr: CanSend<*mut ()> = std::mem::transmute(conn_ptr);
232-
type TxRes<'a> = rusqlite::Result<rusqlite::Transaction<'a>>;
233-
let tx_thread = std::thread::spawn(move||{
234-
eprintln!("hello from transaction thread");
235-
let conn: &mut Connection = std::mem::transmute(conn_void_ptr);
236-
let tx_res: TxRes = conn.transaction_with_behavior(behaviour);
237-
CanSend(Box::into_raw(Box::new(tx_res)))
238-
});
239-
let rtx_raw = tx_thread.join().unwrap();
240-
eprintln!("joined transaction thread");
241-
let rtx = Box::from_raw(rtx_raw.0 as *mut TxRes);
242-
Ok(Transaction::new((*rtx)?, handle))
243-
})?
244-
.into())
245-
}
227+
Ok(self
228+
.start_transaction(|conn, handle| {
229+
let tx_thread = run_blocking(|| {
230+
eprintln!("hello from transaction thread");
231+
conn.transaction_with_behavior(behaviour).map(CanSend)
232+
});
233+
eprintln!("joined transaction thread");
234+
let rtx = tx_thread?.0;
235+
Ok(Transaction::new(rtx, handle))
236+
})?
237+
.into())
246238
}
247239

248240
/// Starts a deferred transaction (the default). There is no guaranteed read-only transaction
@@ -274,15 +266,6 @@ impl Handle {
274266
Ok(reader)
275267
}
276268

277-
// pub(crate) fn associated_read<'h, H>(handle: H) -> rusqlite::Result<Reader<'h, H>> where H: WithHandle {
278-
// let reader = Reader {
279-
// owned_tx: handle.as_ref().start_deferred_transaction()?,
280-
// handle,
281-
// reads: Default::default(),
282-
// };
283-
// Ok(reader)
284-
// }
285-
286269
pub fn read_single(&self, key: &[u8]) -> Result<Option<SnapshotValue<Value>>> {
287270
let mut reader = self.read()?;
288271
let Some(value) = reader.add(key)? else {
@@ -467,9 +450,9 @@ impl Handle {
467450
Ok(())
468451
}
469452

470-
pub fn delete_prefix(&self, prefix: &[u8]) -> PubResult<()> {
453+
pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> {
471454
let mut tx = self.start_deferred_transaction()?;
472-
for item in tx.list_items(prefix)? {
455+
for item in tx.list_items(prefix.as_ref())? {
473456
tx.delete_key(&item.key)?;
474457
}
475458
tx.commit()?.complete();
@@ -583,4 +566,4 @@ impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {
583566

584567
struct CanSend<T>(T);
585568

586-
unsafe impl<T> Send for CanSend<T> {}
569+
unsafe impl<T> Send for CanSend<T> {}

src/testing.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub mod torrent_storage;
22

33
use std::hash::Hasher;
4-
use std::io::{copy, SeekFrom, Write};
4+
use std::io::{BufReader, copy, SeekFrom, Write};
55

66
use anyhow::{ensure, Result};
77
use rand::Rng;
@@ -90,6 +90,7 @@ pub fn readable_repeated_bytes(byte: u8, limit: usize) -> Vec<u8> {
9090
pub fn condense_repeated_bytes(r: impl Read) -> (Option<u8>, u64) {
9191
let mut count = 0;
9292
let mut byte = None;
93+
let r = BufReader::new(r);
9394
for b in r.bytes() {
9495
let b = b.unwrap();
9596
match byte {
@@ -118,12 +119,12 @@ pub fn check_concurrency(
118119
loom::model(move || f().unwrap());
119120
Ok(())
120121
}
121-
#[cfg(shuttle)]
122+
#[cfg(feature = "shuttle")]
122123
{
123124
shuttle::check_random(move || f().unwrap(), iterations_hint);
124125
Ok(())
125126
}
126-
#[cfg(all(not(loom), not(shuttle)))]
127+
#[cfg(all(not(loom), not(feature = "shuttle")))]
127128
if false {
128129
for _ in 0..1000 {
129130
f()?

src/tests.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@ fn test_inc_array() {
5353
/// Show that replacing keys doesn't cause a key earlier in the same values file to be punched. This
5454
/// occurred because there were file_id values in the manifest file that had the wrong type, and so
5555
/// the query that looked for the starting offset for hole punching would punch out the whole file
56-
/// thinking it was empty.
56+
/// thinking it was empty. Note sometimes this test fails and there's extra values files floating
57+
/// around. I haven't figured out why.
5758
#[test]
5859
#[cfg(not(miri))]
5960
fn test_replace_keys() -> Result<()> {
6061
check_concurrency(
6162
|| {
6263
let tempdir = test_tempdir("test_replace_keys")?;
6364
let handle = Handle::new(tempdir.path.clone())?;
65+
handle.delete_prefix("")?;
6466
let a = "a".as_bytes().to_vec();
6567
let b = "b".as_bytes().to_vec();
6668
let block_size: usize = handle.block_size().try_into()?;
@@ -92,9 +94,13 @@ fn test_replace_keys() -> Result<()> {
9294
// There can be multiple value files if the value puncher is holding onto a file when another
9395
// write occurs.
9496
for value_file in values_files {
95-
let mut file = File::open(&value_file.path)?;
97+
let path = &value_file.path;
98+
eprintln!("{:?}", path);
99+
let mut file = File::open(path)?;
100+
// file.sync_all()?;
96101
for region in seekhole::Iter::new(&mut file) {
97102
let region = region?;
103+
eprintln!("{:?}", region);
98104
if matches!(region.region_type, seekhole::RegionType::Data) {
99105
allocated_space += region.length();
100106
}

tests/simple_tests.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ fn torrent_storage_small() -> Result<()> {
228228
}
229229
Ok(())
230230
},
231-
100,
231+
1,
232232
)
233233
}
234234

@@ -244,7 +244,7 @@ fn torrent_storage_big() -> Result<()> {
244244
view_snapshot_values: true,
245245
})
246246
},
247-
100,
247+
1,
248248
)
249249
}
250250

@@ -288,7 +288,7 @@ fn torrent_storage_inner(opts: TorrentStorageOpts) -> Result<()> {
288288
let piece_data = Arc::clone(&piece_data);
289289
let start_delay = Duration::from_micros(1000 * (index / 2) as u64);
290290
let handle = Arc::clone(&handle);
291-
join_handles.push(std::thread::spawn(move || -> Result<()> {
291+
join_handles.push(thread::spawn(move || -> Result<()> {
292292
let key = offset_key(offset);
293293
sleep(start_delay);
294294
debug!("starting block write");
@@ -481,13 +481,14 @@ fn reads_update_last_used() -> Result<()> {
481481
let uniform = UniformDuration::new(Duration::from_nanos(0), LAST_USED_RESOLUTION);
482482
for _ in 0..100 {
483483
let dither = uniform.sample(&mut rng);
484-
sleep(LAST_USED_RESOLUTION + dither);
484+
// This needs to be a real sleep or the timestamps sqlite generates don't progress.
485+
std::thread::sleep(LAST_USED_RESOLUTION + dither);
485486
let new_read_ts = handle.read_single(&key)?.unwrap().last_used();
486487
assert!(new_read_ts > read_ts);
487488
}
488489
Ok(())
489490
},
490-
100,
491+
10,
491492
)
492493
}
493494

0 commit comments

Comments
 (0)