Skip to content

Commit 37a662a

Browse files
authored
fix(buffers): deadlock when seeking after entire write fails to be flushed (vectordotdev#17657)
## Context In vectordotdev#17644, a user reported disk buffers getting stuck in an infinite loop, and thus deadlocking, when restarting after a crash. They provided some very useful debug information, going as far to evaluate, add some logging, and get some values of internal state for the reader. When a disk buffer is initialized -- either for the first time or after Vector is restarted and the buffer must resume where it left off -- both the reader and writer perform a catch-up phase. For the writer, it checks the current data file and tries to figure out if the last record written matches where it believes it left off. For the reader, it actually has to dynamically seek to where it left off within the the given data file, since we can't just open the file and start from the beginning: data files are append-only. As part of the seek logic, there's a loop where we just call `Reader::next` until we read the record we supposedly left off on, and then we know we're caught up. This loop only breaks on two conditions: - `self.last_reader_record_id < ledger_last`, which implies we haven't yet read the last record we left off on (otherwise it would be equal to `ledger_last`) - `maybe_record.is_none() && self.last_reader_record_id == 0`, which would tell us that we reached EOF on the data file (no more records) but nothing was in the file (`last_reader_record_id` still being 0) While the first conditional is correct, the second one is not. The user that originally reported the issue [said as much](vectordotdev#17644 (comment)), but dropping the `&& self.last_reader_record_id == 0` fixes the issue. In this case, there can exist a scenario where Vector crashes and writes that the reader had read and acknowledged never actually make it to disk. Both the reader/writer are able to outpace the data on disk because the reader can read yet-to-be-flushed records since they exist as dirty pages in the page cache. When this happens, the reader may have indicated to the ledger that it, for example, has read up to record ID 10 while the last record _on disk_ when Vector starts up is record ID 5. When the seek logic runs, it knows the last read record ID was 10. It will do some number of reads while seeking, eventually reading record ID 5, and updating `self.last_reader_record_id` accordingly. On the next iteration of the loop, it tries to read but hits EOF: the data file indeed has nothing left. However, `self.last_reader_record_id < ledger_last` is still true while `maybe_record.is_none() && self.last_reader_record_id == 0` is not, as `self.last_reader_record_id` is set to `5`. Alas, deadlock. ## Solution The solution is painfully simple, and the user that originally reported the issue [said as much](vectordotdev#17644 (comment)): drop `&& self.last_reader_record_id == 0`. Given the loop's own condition, the inner check for `self.last_reader_record_id == 0` was redundant... but obviously also logically incorrect, too, in the case where we had missing writes. I'm still not entirely sure how existing tests didn't already catch this, but it was easy enough to spot the error once I knew where to look, and the resulting unit test I added convincingly showed that it was broken, and after making the change, indeed fixed. ## Reviewer Note(s) I added two unit tests: one for the fix as shown and one for what I thought was another bug. Turns out that the "other bug" wasn't a bug, and this unit test isn't _explicitly_ required, but it's a simple variation of other tests with a more straightforward invariant that it tries to demonstrate, so I just left it in. Fixes vectordotdev#17644.
1 parent e1b3357 commit 37a662a

File tree

4 files changed

+224
-7
lines changed

4 files changed

+224
-7
lines changed

lib/vector-buffers/src/variants/disk_v2/reader.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -901,12 +901,8 @@ where
901901
while self.last_reader_record_id < ledger_last {
902902
match self.next().await {
903903
Ok(maybe_record) => {
904-
if maybe_record.is_none() && self.last_reader_record_id == 0 {
905-
// We've hit a point where there's no more data to read. If our "last reader record
906-
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
907-
// up, so we just pin ourselves to where the ledger says we left off, and we're good
908-
// to go.
909-
self.last_reader_record_id = ledger_last;
904+
if maybe_record.is_none() {
905+
// We've hit the end of the current data file so we've gone as far as we can.
910906
break;
911907
}
912908
}

lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs

+95
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,98 @@ async fn reader_doesnt_block_from_partial_write_on_last_record() {
8787
let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record");
8888
fut.instrument(parent.or_current()).await;
8989
}
90+
91+
#[tokio::test]
92+
async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() {
93+
// When initializing, the reader will be catching up to the last record it read, which involves
94+
// reading individual records in the current reader data file until a record is returned whose
95+
// record ID matches the "last record ID read" field from the ledger.
96+
//
97+
// If the current data file contains a valid last record when we initialize, but that last
98+
// record is _behind_ the last record read as tracked by the ledger, then we need to ensure we
99+
// can break out of the catch-up loop when we get to the end of the current data file.
100+
//
101+
// Our existing logic for corrupted event detection, and the writer's own initialization logic,
102+
// will emit an error message when we realize that data is missing based on record ID gaps.
103+
let _a = install_tracing_helpers();
104+
105+
let fut = with_temp_dir(|dir| {
106+
let data_dir = dir.to_path_buf();
107+
108+
async move {
109+
// Create a regular buffer, no customizations required.
110+
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;
111+
112+
// Write two records, and then read and acknowledge both.
113+
//
114+
// This puts the buffer into a state where there's data in the current data file, and
115+
// the ledger has a non-zero record ID for where it thinks the reader needs to be. This
116+
// ensures that the reader actually does at least two calls to `Reader::next` during
117+
// `Reader::seek_to_next_record`, which is necessary to ensure that the reader leaves
118+
// the default state of `self.last_reader_record_id == 0`.
119+
let first_bytes_written = writer
120+
.write_record(SizedRecord::new(64))
121+
.await
122+
.expect("should not fail to write");
123+
writer.flush().await.expect("flush should not fail");
124+
125+
let second_bytes_written = writer
126+
.write_record(SizedRecord::new(68))
127+
.await
128+
.expect("should not fail to write");
129+
writer.flush().await.expect("flush should not fail");
130+
131+
writer.close();
132+
133+
let first_read = reader
134+
.next()
135+
.await
136+
.expect("should not fail to read record")
137+
.expect("should contain first record");
138+
assert_eq!(SizedRecord::new(64), first_read);
139+
acknowledge(first_read).await;
140+
141+
let second_read = reader
142+
.next()
143+
.await
144+
.expect("should not fail to read record")
145+
.expect("should contain first record");
146+
assert_eq!(SizedRecord::new(68), second_read);
147+
acknowledge(second_read).await;
148+
149+
let third_read = reader.next().await.expect("should not fail to read record");
150+
assert!(third_read.is_none());
151+
152+
ledger.flush().expect("should not fail to flush ledger");
153+
154+
// Grab the current writer data file path before dropping the buffer.
155+
let data_file_path = ledger.get_current_writer_data_file_path();
156+
drop(reader);
157+
drop(writer);
158+
drop(ledger);
159+
160+
// Open the data file and truncate the second record. This will ensure that the reader
161+
// hits EOF after the first read, which we need to do in order to exercise the logic
162+
// that breaks out of the loop.
163+
let initial_len = first_bytes_written as u64 + second_bytes_written as u64;
164+
let target_len = first_bytes_written as u64;
165+
set_file_length(&data_file_path, initial_len, target_len)
166+
.await
167+
.expect("should not fail to truncate data file");
168+
169+
// Now reopen the buffer, which should complete in a timely fashion without an immediate error.
170+
let reopen = timeout(
171+
Duration::from_millis(500),
172+
create_default_buffer_v2::<_, SizedRecord>(data_dir),
173+
)
174+
.await;
175+
assert!(
176+
reopen.is_ok(),
177+
"failed to reopen buffer in a timely fashion; likely deadlock"
178+
);
179+
}
180+
});
181+
182+
let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file");
183+
fut.instrument(parent.or_current()).await;
184+
}

lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs

+109-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use tracing::Instrument;
55
use super::{create_buffer_v2_with_max_data_file_size, read_next, read_next_some};
66
use crate::{
77
assert_buffer_is_empty, assert_buffer_records, assert_buffer_size, assert_enough_bytes_written,
8-
assert_reader_writer_v2_file_positions, await_timeout, set_data_file_length,
8+
assert_reader_last_writer_next_positions, assert_reader_writer_v2_file_positions,
9+
await_timeout, set_data_file_length,
910
test::{acknowledge, install_tracing_helpers, with_temp_dir, MultiEventRecord, SizedRecord},
1011
variants::disk_v2::{
1112
common::{DEFAULT_FLUSH_INTERVAL, MAX_FILE_ID},
@@ -820,3 +821,110 @@ async fn writer_updates_ledger_when_buffered_writer_reports_implicit_flush() {
820821
})
821822
.await;
822823
}
824+
825+
#[tokio::test]
826+
async fn reader_writer_positions_aligned_through_multiple_files_and_records() {
827+
// This test ensures that the reader/writer position stay aligned through multiple records and
828+
// data files. This is to say, that, if we write 5 records, each with 10 events, and then read
829+
// and acknowledge all of those events... the writer's next record ID should be 51 (the 50th
830+
// event would correspond to ID 50, so next ID would be 51) and the reader's last read record ID
831+
// should be 50.
832+
//
833+
// Testing this across multiple data files isn't super germane to the position logic, but it
834+
// just ensures we're also testing that aspect.
835+
836+
let _a = install_tracing_helpers();
837+
let fut = with_temp_dir(|dir| {
838+
let data_dir = dir.to_path_buf();
839+
840+
async move {
841+
// Create our buffer with an arbitrarily low maximum data file size. We'll use this to
842+
// control how many records make it into a given data file. Just another way to ensure
843+
// we're testing the position logic with multiple writes to one data file, one write to
844+
// a data file, etc.
845+
let (mut writer, mut reader, ledger) =
846+
create_buffer_v2_with_max_data_file_size(data_dir, 256).await;
847+
848+
// We'll write multi-event records with N events based on these sizes, and as we do so,
849+
// we'll assert that our writer position moves as expected after the write, and that
850+
// after reading and acknowledging, the reader position also moves as expected.
851+
let record_sizes = &[176, 52, 91, 137, 54, 87];
852+
853+
let mut expected_writer_position = ledger.state().get_next_writer_record_id();
854+
let mut expected_reader_position = ledger.state().get_last_reader_record_id();
855+
let mut trailing_reader_position_delta = 0;
856+
857+
for record_size in record_sizes {
858+
// Initial check before writing/reading the next record.
859+
assert_reader_last_writer_next_positions!(
860+
ledger,
861+
expected_reader_position,
862+
expected_writer_position
863+
);
864+
865+
let record = MultiEventRecord::new(*record_size);
866+
assert_eq!(
867+
record.event_count(),
868+
usize::try_from(*record_size).unwrap_or(usize::MAX)
869+
);
870+
871+
writer
872+
.write_record(record)
873+
.await
874+
.expect("write should not fail");
875+
writer.flush().await.expect("flush should not fail");
876+
877+
expected_writer_position += u64::from(*record_size);
878+
879+
// Make sure the writer position advanced after flushing.
880+
assert_reader_last_writer_next_positions!(
881+
ledger,
882+
expected_reader_position,
883+
expected_writer_position
884+
);
885+
886+
let record_via_read = read_next_some(&mut reader).await;
887+
assert_eq!(record_via_read, MultiEventRecord::new(*record_size));
888+
acknowledge(record_via_read).await;
889+
890+
// Increment the expected reader position by the trailing reader position delta, and
891+
// then now that we've done a read, we should be able to have seen actually move
892+
// forward.
893+
expected_reader_position += trailing_reader_position_delta;
894+
assert_reader_last_writer_next_positions!(
895+
ledger,
896+
expected_reader_position,
897+
expected_writer_position
898+
);
899+
900+
// Set the trailing reader position delta to the record we just read.
901+
//
902+
// We do it this way because reads themselves have to drive acknowledgement logic to
903+
// then drive updates to the ledger, so we will only see the change in the reader's
904+
// position the _next_ time we do a read.
905+
trailing_reader_position_delta = u64::from(*record_size);
906+
}
907+
908+
// Close the writer and do a final read, thus driving the acknowledgement logic, and
909+
// position update logic, before we do our final position check.
910+
writer.close();
911+
assert_eq!(reader.next().await, Ok(None));
912+
913+
// Calculate the absolute reader/writer positions we would expect based on all of the
914+
// records/events written and read. This is to double check our work and make sure that
915+
// the "expected" positions didn't hide any bugs from us.
916+
let expected_final_reader_position =
917+
record_sizes.iter().copied().map(u64::from).sum::<u64>();
918+
let expected_final_writer_position = expected_final_reader_position + 1;
919+
920+
assert_reader_last_writer_next_positions!(
921+
ledger,
922+
expected_final_reader_position,
923+
expected_final_writer_position
924+
);
925+
}
926+
});
927+
928+
let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records");
929+
fut.instrument(parent.or_current()).await;
930+
}

lib/vector-buffers/src/variants/disk_v2/tests/mod.rs

+18
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,24 @@ macro_rules! assert_reader_writer_v2_file_positions {
134134
}};
135135
}
136136

137+
#[macro_export]
138+
macro_rules! assert_reader_last_writer_next_positions {
139+
($ledger:expr, $reader_expected:expr, $writer_expected:expr) => {{
140+
let reader_actual = $ledger.state().get_last_reader_record_id();
141+
let writer_actual = $ledger.state().get_next_writer_record_id();
142+
assert_eq!(
143+
$reader_expected, reader_actual,
144+
"expected reader last read record ID of {}, got {} instead",
145+
$reader_expected, reader_actual,
146+
);
147+
assert_eq!(
148+
$writer_expected, writer_actual,
149+
"expected writer next record ID of {}, got {} instead",
150+
$writer_expected, writer_actual,
151+
);
152+
}};
153+
}
154+
137155
#[macro_export]
138156
macro_rules! assert_enough_bytes_written {
139157
($written:expr, $record_type:ty, $record_payload_size:expr) => {

0 commit comments

Comments
 (0)