Skip to content

Commit d60b02f

Browse files
committed
added creation of offsets during parallel compression + fmt + clippy
1 parent 4d83236 commit d60b02f

File tree

4 files changed

+131
-40
lines changed

4 files changed

+131
-40
lines changed

src/graphs/bvgraph/codecs/factories.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl<E: Endianness> MemoryFactory<E, Box<[u32]>> {
188188
bytes[file_len..].fill(0);
189189
Ok(Self {
190190
// Safety: the length is a multiple of 16.
191-
data: unsafe { std::mem::transmute(bytes.into_boxed_slice()) },
191+
data: unsafe { std::mem::transmute::<Box<[u8]>, Box<[u32]>>(bytes.into_boxed_slice()) },
192192
_marker: core::marker::PhantomData,
193193
})
194194
}

src/graphs/bvgraph/comp/impls.rs

Lines changed: 98 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use dsi_progress_logger::prelude::*;
1212
use lender::prelude::*;
1313
use std::fs::File;
1414
use std::io::{BufReader, BufWriter};
15-
use std::path::Path;
15+
use std::path::{Path, PathBuf};
1616

1717
/// A queue that pulls jobs with ids in a contiguous initial segment of the
1818
/// natural numbers from an iterator out of order and implement an iterator in
@@ -42,15 +42,17 @@ impl<I: Iterator> TaskQueue<I> {
4242

4343
impl<I: Iterator> Iterator for TaskQueue<I>
4444
where
45-
I::Item: JobId + Copy,
45+
I::Item: JobId,
4646
{
4747
type Item = I::Item;
4848

4949
fn next(&mut self) -> Option<Self::Item> {
5050
loop {
51-
if let Some(Some(item)) = self.jobs.get(self.next_id) {
52-
self.next_id += 1;
53-
return Some(*item);
51+
if let Some(item) = self.jobs.get_mut(self.next_id) {
52+
if item.is_some() {
53+
self.next_id += 1;
54+
return item.take();
55+
}
5456
}
5557
if let Some(item) = self.iter.next() {
5658
let id = item.id();
@@ -66,12 +68,15 @@ where
6668
}
6769

6870
/// A compression job.
69-
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone, Copy)]
71+
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
7072
struct Job {
7173
job_id: usize,
7274
first_node: usize,
7375
last_node: usize,
76+
chunk_graph_path: PathBuf,
7477
written_bits: u64,
78+
chunk_offsets_path: PathBuf,
79+
offsets_written_bits: u64,
7580
num_arcs: u64,
7681
}
7782

@@ -284,6 +289,7 @@ impl BVComp<()> {
284289
let basename = basename.as_ref();
285290

286291
let graph_path = basename.with_extension(GRAPH_EXTENSION);
292+
let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
287293

288294
let (tx, rx) = std::sync::mpsc::channel();
289295

@@ -293,51 +299,70 @@ impl BVComp<()> {
293299
let cp_flags = &compression_flags;
294300

295301
for (thread_id, mut thread_lender) in iter.enumerate() {
296-
let file_path = thread_path(thread_id);
302+
let tmp_path = thread_path(thread_id);
303+
let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
304+
let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
297305
let tx = tx.clone();
298306
// Spawn the thread
299307
s.spawn(move |_| {
300308
log::info!("Thread {} started", thread_id);
301309
let first_node;
302-
303-
let (mut bvcomp, mut written_bits) =
304-
if let Some((node_id, successors)) = thread_lender.next() {
310+
let mut bvcomp;
311+
let mut offsets_writer;
312+
let mut written_bits;
313+
let mut offsets_written_bits;
314+
315+
match thread_lender.next() {
316+
None => return,
317+
Some((node_id, successors)) => {
305318
first_node = node_id;
319+
320+
offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
321+
BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
322+
));
323+
306324
let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
307-
BufWriter::new(File::create(&file_path).unwrap()),
325+
BufWriter::new(File::create(&chunk_graph_path).unwrap()),
308326
));
309327
let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags);
310328

311-
let mut bvcomp = BVComp::new(
329+
bvcomp = BVComp::new(
312330
codes_encoder,
313331
cp_flags.compression_window,
314332
cp_flags.max_ref_count,
315333
cp_flags.min_interval_length,
316334
node_id,
317335
);
318-
let written_bits = bvcomp.push(successors).unwrap();
319-
(bvcomp, written_bits)
320-
} else {
321-
return;
322-
};
336+
written_bits = bvcomp.push(successors).unwrap();
337+
offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
338+
}
339+
};
323340

324341
let mut last_node = first_node;
325-
written_bits += bvcomp
326-
.extend(thread_lender.inspect(|(x, _)| last_node = *x))
327-
.unwrap();
342+
let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
343+
for_! ( (_, succ) in iter_nodes {
344+
let node_bits = bvcomp.push(succ.into_iter()).unwrap();
345+
written_bits += node_bits;
346+
offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
347+
});
348+
328349
let num_arcs = bvcomp.arcs;
329350
bvcomp.flush().unwrap();
330-
// TODO written_bits += bvcomp.flush().unwrap();
351+
331352
log::info!(
332-
"Finished Compression thread {} and wrote {} bits",
353+
"Finished Compression thread {} and wrote {} bits for the graph and {} bits for the offsets",
333354
thread_id,
334-
written_bits
355+
written_bits,
356+
offsets_written_bits,
335357
);
336358
tx.send(Job {
337359
job_id: thread_id,
338360
first_node,
339361
last_node,
362+
chunk_graph_path,
340363
written_bits,
364+
chunk_offsets_path,
365+
offsets_written_bits,
341366
num_arcs,
342367
})
343368
.unwrap()
@@ -346,15 +371,19 @@ impl BVComp<()> {
346371

347372
drop(tx);
348373

349-
// setup the final bitstream from the end, because the first thread
350-
// already wrote the first chunk
351374
let file = File::create(&graph_path)
352375
.with_context(|| format!("Could not create graph {}", graph_path.display()))?;
353-
354-
let mut result_writer =
376+
let mut graph_writer =
355377
<BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
356378

379+
let file = File::create(&offsets_path)
380+
.with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
381+
let mut offsets_writer =
382+
<BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
383+
offsets_writer.write_gamma(0)?;
384+
357385
let mut total_written_bits: u64 = 0;
386+
let mut total_offsets_written_bits: u64 = 0;
358387
let mut total_arcs: u64 = 0;
359388

360389
let mut next_node = 0;
@@ -364,7 +393,10 @@ impl BVComp<()> {
364393
job_id,
365394
first_node,
366395
last_node,
396+
chunk_graph_path,
367397
written_bits,
398+
chunk_offsets_path,
399+
offsets_written_bits,
368400
num_arcs,
369401
} in TaskQueue::new(rx.iter())
370402
{
@@ -378,36 +410,60 @@ impl BVComp<()> {
378410

379411
next_node = last_node + 1;
380412
total_arcs += num_arcs;
381-
// compute the path of the bitstream created by this thread
382-
let file_path = thread_path(job_id);
383413
log::info!(
384414
"Copying {} [{}..{}) bits from {} to {}",
385415
written_bits,
386416
total_written_bits,
387417
total_written_bits + written_bits,
388-
file_path.display(),
389-
basename.display()
418+
chunk_graph_path.display(),
419+
graph_path.display()
390420
);
391421
total_written_bits += written_bits;
392422

393423
let mut reader =
394424
<BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
395-
File::open(&file_path)
396-
.with_context(|| format!("Could not open {}", file_path.display()))?,
425+
File::open(&chunk_graph_path)
426+
.with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
397427
)));
398-
result_writer
428+
graph_writer
399429
.copy_from(&mut reader, written_bits)
400430
.with_context(|| {
401431
format!(
402432
"Could not copy from {} to {}",
403-
file_path.display(),
433+
chunk_graph_path.display(),
404434
graph_path.display()
405435
)
406436
})?;
437+
438+
log::info!(
439+
"Copying offsets {} [{}..{}) bits from {} to {}",
440+
written_bits,
441+
total_offsets_written_bits,
442+
total_offsets_written_bits + offsets_written_bits,
443+
chunk_offsets_path.display(),
444+
offsets_path.display()
445+
);
446+
total_offsets_written_bits += offsets_written_bits;
447+
448+
let mut reader =
449+
<BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
450+
File::open(&chunk_offsets_path)
451+
.with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
452+
)));
453+
offsets_writer
454+
.copy_from(&mut reader, offsets_written_bits)
455+
.with_context(|| {
456+
format!(
457+
"Could not copy from {} to {}",
458+
chunk_offsets_path.display(),
459+
offsets_path.display()
460+
)
461+
})?;
407462
}
408463

409-
log::info!("Flushing the merged Compression bitstream");
410-
result_writer.flush()?;
464+
log::info!("Flushing the merged bitstreams");
465+
graph_writer.flush()?;
466+
offsets_writer.flush()?;
411467

412468
log::info!("Writing the .properties file");
413469
let properties = compression_flags
@@ -427,6 +483,11 @@ impl BVComp<()> {
427483
total_written_bits,
428484
total_written_bits as f64 / total_arcs as f64
429485
);
486+
log::info!(
487+
"Created offsets file with {} bits for {:.4} bits/node",
488+
total_offsets_written_bits,
489+
total_offsets_written_bits as f64 / num_nodes as f64
490+
);
430491

431492
// cleanup the temp files
432493
std::fs::remove_dir_all(tmp_dir).with_context(|| {

src/utils/sort_pairs.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,12 @@ impl BatchIterator<()> {
254254
file_path: P,
255255
batch: &mut [(usize, usize)],
256256
) -> anyhow::Result<Self> {
257-
Self::new_from_vec_labeled(file_path, unsafe { core::mem::transmute(batch) }, &(), ())
257+
Self::new_from_vec_labeled(
258+
file_path,
259+
unsafe { core::mem::transmute::<&mut [(usize, usize)], &mut [Triple<()>]>(batch) },
260+
&(),
261+
(),
262+
)
258263
}
259264
/// Dumps the given triples in `file_path` and returns an iterator over
260265
/// them, assuming they are already sorted.
@@ -264,7 +269,7 @@ impl BatchIterator<()> {
264269
) -> anyhow::Result<Self> {
265270
Self::new_from_vec_sorted_labeled(
266271
file_path,
267-
unsafe { core::mem::transmute(batch) },
272+
unsafe { core::mem::transmute::<&[(usize, usize)], &[Triple<()>]>(batch) },
268273
&(),
269274
(),
270275
)

tests/test_par_bvcomp.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
66
*/
77

8+
use std::io::BufReader;
89
use std::path::PathBuf;
910

1011
use anyhow::Result;
@@ -88,8 +89,32 @@ fn _test_par_bvcomp(basename: &str) -> Result<()> {
8889
}
8990

9091
pr.done();
92+
93+
let offsets_path = tmp_basename.with_extension(OFFSETS_EXTENSION);
94+
let mut offsets_reader =
95+
<BufBitReader<BE, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
96+
std::fs::File::open(&offsets_path)
97+
.expect(&format!("Could not open {}", offsets_path.display())),
98+
)));
99+
100+
let mut pr = ProgressLogger::default();
101+
pr.display_memory(true)
102+
.item_name("node")
103+
.expected_updates(Some(graph.num_nodes()));
104+
pr.start("Checking that the generated offsets are correct...");
105+
106+
let mut offset = 0;
107+
for (real_offset, _degree) in comp_graph.offset_deg_iter().by_ref() {
108+
let gap_offset = offsets_reader.read_gamma().unwrap();
109+
offset += gap_offset;
110+
assert_eq!(offset, real_offset);
111+
pr.light_update();
112+
}
113+
pr.done();
114+
91115
// cancel the file at the end
92116
std::fs::remove_file(tmp_basename.with_extension(GRAPH_EXTENSION))?;
117+
std::fs::remove_file(tmp_basename.with_extension(OFFSETS_EXTENSION))?;
93118
std::fs::remove_file(tmp_basename.with_extension(PROPERTIES_EXTENSION))?;
94119
log::info!("\n");
95120
}

0 commit comments

Comments
 (0)