Skip to content

fix(buffers): deadlock when seeking after entire write fails to be flushed #17657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,12 +901,8 @@ where
while self.last_reader_record_id < ledger_last {
match self.next().await {
Ok(maybe_record) => {
if maybe_record.is_none() && self.last_reader_record_id == 0 {
// We've hit a point where there's no more data to read. If our "last reader record
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
// up, so we just pin ourselves to where the ledger says we left off, and we're good
// to go.
self.last_reader_record_id = ledger_last;
if maybe_record.is_none() {
// We've hit the end of the current data file so we've gone as far as we can.
break;
}
}
Expand Down
95 changes: 95 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,98 @@ async fn reader_doesnt_block_from_partial_write_on_last_record() {
let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record");
fut.instrument(parent.or_current()).await;
}

#[tokio::test]
async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() {
// When initializing, the reader will be catching up to the last record it read, which involves
// reading individual records in the current reader data file until a record is returned whose
// record ID matches the "last record ID read" field from the ledger.
//
// If the current data file contains a valid last record when we initialize, but that last
// record is _behind_ the last record read as tracked by the ledger, then we need to ensure we
// can break out of the catch-up loop when we get to the end of the current data file.
//
// Our existing logic for corrupted event detection, and the writer's own initialization logic,
// will emit an error message when we realize that data is missing based on record ID gaps.
let _a = install_tracing_helpers();

let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create a regular buffer, no customizations required.
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;

// Write two records, and then read and acknowledge both.
//
// This puts the buffer into a state where there's data in the current data file, and
// the ledger has a non-zero record ID for where it thinks the reader needs to be. This
// ensures that the reader actually does at least two calls to `Reader::next` during
// `Reader::seek_to_next_record`, which is necessary to ensure that the reader leaves
// the default state of `self.last_reader_record_id == 0`.
let first_bytes_written = writer
.write_record(SizedRecord::new(64))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

let second_bytes_written = writer
.write_record(SizedRecord::new(68))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

writer.close();

let first_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(64), first_read);
acknowledge(first_read).await;

let second_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(68), second_read);
acknowledge(second_read).await;

let third_read = reader.next().await.expect("should not fail to read record");
assert!(third_read.is_none());

ledger.flush().expect("should not fail to flush ledger");

// Grab the current writer data file path before dropping the buffer.
let data_file_path = ledger.get_current_writer_data_file_path();
drop(reader);
drop(writer);
drop(ledger);

// Open the data file and truncate the second record. This will ensure that the reader
// hits EOF after the first read, which we need to do in order to exercise the logic
// that breaks out of the loop.
let initial_len = first_bytes_written as u64 + second_bytes_written as u64;
let target_len = first_bytes_written as u64;
set_file_length(&data_file_path, initial_len, target_len)
.await
.expect("should not fail to truncate data file");

// Now reopen the buffer, which should complete in a timely fashion without an immediate error.
let reopen = timeout(
Duration::from_millis(500),
create_default_buffer_v2::<_, SizedRecord>(data_dir),
)
.await;
assert!(
reopen.is_ok(),
"failed to reopen buffer in a timely fashion; likely deadlock"
);
}
});

let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file");
fut.instrument(parent.or_current()).await;
}
107 changes: 106 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use tracing::Instrument;
use super::{create_buffer_v2_with_max_data_file_size, read_next, read_next_some};
use crate::{
assert_buffer_is_empty, assert_buffer_records, assert_buffer_size, assert_enough_bytes_written,
assert_reader_writer_v2_file_positions, await_timeout, set_data_file_length,
assert_reader_last_writer_next_positions, assert_reader_writer_v2_file_positions,
await_timeout, set_data_file_length,
test::{acknowledge, install_tracing_helpers, with_temp_dir, MultiEventRecord, SizedRecord},
variants::disk_v2::{
common::{DEFAULT_FLUSH_INTERVAL, MAX_FILE_ID},
Expand Down Expand Up @@ -820,3 +821,107 @@ async fn writer_updates_ledger_when_buffered_writer_reports_implicit_flush() {
})
.await;
}

#[tokio::test]
async fn reader_writer_positions_aligned_through_multiple_files_and_records() {
// This test ensures that the reader/writer position stay aligned through multiple records and
// data files. This is to say, that, if we write 5 records, each with 10 events, and then read
// and acknowledge all of those events... the writer's next record ID should be 51 (the 50th
// event would correspond to ID 50, so next ID would be 51) and the reader's last read record ID
// should be 50.
//
// Testing this across multiple data files isn't super germane to the position logic, but it
// just ensures we're also testing that aspect.

let _a = install_tracing_helpers();
let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create our buffer with an arbitrarily low maximum data file size. We'll use this to
// cntrol how many records make it into a given data file. Just another way to ernsure
// we're testing the position logic with multiple writes to one data file, one write to
// a data file, etc.
let (mut writer, mut reader, ledger) =
create_buffer_v2_with_max_data_file_size(data_dir, 256).await;

// We'll write multi-event records with N events based on these sizes, and as we do so,
// we'll assert that our writer position moves as expected after the write, and that
// after reading and acknowledging, the reader position also moves as expected.
let record_sizes = &[176, 52, 91, 137, 54, 87];

let mut expected_writer_position = ledger.state().get_next_writer_record_id();
let mut expected_reader_position = ledger.state().get_last_reader_record_id();
let mut trailing_reader_position_delta = 0;

for record_size in record_sizes {
// Initial check before writing/reading the next record.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record = MultiEventRecord::new(*record_size);
assert_eq!(record.event_count() as u32, *record_size);

writer
.write_record(record)
.await
.expect("write should not fail");
writer.flush().await.expect("flush should not fail");

expected_writer_position += u64::from(*record_size);

// Make sure the writer position advanced after flushing.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record_via_read = read_next_some(&mut reader).await;
assert_eq!(record_via_read, MultiEventRecord::new(*record_size));
acknowledge(record_via_read).await;

// Increment the expected reader position by the trailing reader position delta, and
// then now that we've done a read, we should be able to have seen actually move
// forward.
expected_reader_position += trailing_reader_position_delta;
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

// Set the trailing reader position delta to the record we just read.
//
// We do it this way because reads themselves have to drive acknowledgement logic to
// then drive updates to the ledger, so we will only see the change in the reader's
// position the _next_ time we do a read.
trailing_reader_position_delta = u64::from(*record_size);
}

// Close the writer and do a final read, thus driving the acknowledgement logic, and
// position update logic, before we do our final position check.
writer.close();
assert_eq!(reader.next().await, Ok(None));

// Calculate the absolute reader/writer positions we would expect based on all of the
// records/events written and read. This is to double check our work and make sure that
// the "expected" positions didn't hide any bugs from us.
let expected_final_reader_position =
record_sizes.iter().copied().map(u64::from).sum::<u64>();
let expected_final_writer_position = expected_final_reader_position + 1;

assert_reader_last_writer_next_positions!(
ledger,
expected_final_reader_position,
expected_final_writer_position
);
}
});

let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records");
fut.instrument(parent.or_current()).await;
}
18 changes: 18 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ macro_rules! assert_reader_writer_v2_file_positions {
}};
}

#[macro_export]
macro_rules! assert_reader_last_writer_next_positions {
($ledger:expr, $reader_expected:expr, $writer_expected:expr) => {{
let reader_actual = $ledger.state().get_last_reader_record_id();
let writer_actual = $ledger.state().get_next_writer_record_id();
assert_eq!(
$reader_expected, reader_actual,
"expected reader last read record ID of {}, got {} instead",
$reader_expected, reader_actual,
);
assert_eq!(
$writer_expected, writer_actual,
"expected writer next record ID of {}, got {} instead",
$writer_expected, writer_actual,
);
}};
}

#[macro_export]
macro_rules! assert_enough_bytes_written {
($written:expr, $record_type:ty, $record_payload_size:expr) => {
Expand Down