Skip to content

Commit 6f67ec1

Browse files
authored
Issue 340: Fix event writer implementation and add flush (#341)
Added flush for event writer Updated implementation of Event Writer Signed-off-by: dellThejas <[email protected]>
1 parent 45aa5dc commit 6f67ec1

File tree

6 files changed

+116
-15
lines changed

6 files changed

+116
-15
lines changed

integration_test/src/event_writer_tests.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,7 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory:
271271
i += 1;
272272
}
273273
// the data should write successfully.
274-
for rx in receivers {
275-
let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot");
276-
assert!(reply.is_ok());
277-
}
274+
assert!(writer.flush().await.is_ok());
278275

279276
let segment_name = ScopedSegment {
280277
scope: scope_name.clone(),
@@ -364,6 +361,9 @@ async fn test_write_correctness_with_routing_key(writer: &mut EventWriter, facto
364361
}
365362
i += 1;
366363
}
364+
365+
assert!(writer.flush().await.is_ok());
366+
367367
let first_segment = ScopedSegment {
368368
scope: scope_name.clone(),
369369
stream: stream_name.clone(),

integration_test/src/reader_group_tests.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,12 @@ fn test_read_offline_with_offset(client_factory: &ClientFactoryAsync) {
165165
event.value.as_slice(),
166166
"Corrupted event read"
167167
);
168-
let offset_map = HashMap::from([(
168+
let mut offset_map = HashMap::new();
169+
170+
offset_map.insert(
169171
slice.meta.scoped_segment.clone(),
170172
event.offset_in_segment + EVENT_SIZE as i64 + 8,
171-
)]);
173+
);
172174

173175
// Segment slice is dropped here and it will update the RG state with the offsets.
174176
// Now mark the reader offline

src/event/writer.rs

+60-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use crate::util::get_random_u128;
1717
use pravega_client_channel::{create_channel, ChannelSender};
1818
use pravega_client_shared::{ScopedStream, WriterId};
1919

20+
use std::collections::VecDeque;
2021
use tokio::sync::oneshot;
22+
use tokio::sync::oneshot::error::TryRecvError;
2123
use tracing::info_span;
2224
use tracing_futures::Instrument;
2325

@@ -75,6 +77,7 @@ use tracing_futures::Instrument;
7577
pub struct EventWriter {
7678
writer_id: WriterId,
7779
sender: ChannelSender<Incoming>,
80+
event_handles: VecDeque<oneshot::Receiver<Result<(), Error>>>,
7881
}
7982

8083
impl EventWriter {
@@ -93,6 +96,7 @@ impl EventWriter {
9396
EventWriter {
9497
writer_id,
9598
sender: tx,
99+
event_handles: VecDeque::new(),
96100
}
97101
}
98102

@@ -118,10 +122,13 @@ impl EventWriter {
118122
pub async fn write_event(&mut self, event: Vec<u8>) -> oneshot::Receiver<Result<(), Error>> {
119123
let size = event.len();
120124
let (tx, rx) = oneshot::channel();
125+
let (tx_flush, rx_flush) = oneshot::channel();
121126
let routing_info = RoutingInfo::RoutingKey(None);
122-
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
127+
if let Some(pending_event) =
128+
PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush))
129+
{
123130
let append_event = Incoming::AppendEvent(pending_event);
124-
self.writer_event_internal(append_event, size, rx).await
131+
self.writer_event_internal(append_event, size, rx, rx_flush).await
125132
} else {
126133
rx
127134
}
@@ -137,10 +144,13 @@ impl EventWriter {
137144
) -> oneshot::Receiver<Result<(), Error>> {
138145
let size = event.len();
139146
let (tx, rx) = oneshot::channel();
147+
let (tx_flush, rx_flush) = oneshot::channel();
140148
let routing_info = RoutingInfo::RoutingKey(Some(routing_key));
141-
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
149+
if let Some(pending_event) =
150+
PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush))
151+
{
142152
let append_event = Incoming::AppendEvent(pending_event);
143-
self.writer_event_internal(append_event, size, rx).await
153+
self.writer_event_internal(append_event, size, rx, rx_flush).await
144154
} else {
145155
rx
146156
}
@@ -151,8 +161,14 @@ impl EventWriter {
151161
append_event: Incoming,
152162
size: usize,
153163
rx: oneshot::Receiver<Result<(), Error>>,
164+
rx_flush: oneshot::Receiver<Result<(), Error>>,
154165
) -> oneshot::Receiver<Result<(), Error>> {
155-
if let Err(_e) = self.sender.send((append_event, size)).await {
166+
if let Err(err) = self.clear_initial_complete_events() {
167+
// fail fast upon checking previous write events
168+
let (tx_error, rx_error) = oneshot::channel();
169+
tx_error.send(Err(err)).expect("send error");
170+
rx_error
171+
} else if let Err(_e) = self.sender.send((append_event, size)).await {
156172
let (tx_error, rx_error) = oneshot::channel();
157173
tx_error
158174
.send(Err(Error::InternalFailure {
@@ -161,9 +177,48 @@ impl EventWriter {
161177
.expect("send error");
162178
rx_error
163179
} else {
180+
self.event_handles.push_back(rx_flush);
164181
rx
165182
}
166183
}
184+
185+
/// Flush data.
186+
///
187+
/// It will wait until all pending appends have acknowledgment.
188+
pub async fn flush(&mut self) -> Result<(), Error> {
189+
while let Some(receiver) = self.event_handles.pop_front() {
190+
let recv = receiver.await.map_err(|e| Error::InternalFailure {
191+
msg: format!("oneshot error {:?}", e),
192+
})?;
193+
194+
recv?;
195+
}
196+
Ok(())
197+
}
198+
199+
/// Clear initial completed events from flush queue.
200+
fn clear_initial_complete_events(&mut self) -> Result<(), Error> {
201+
while let Some(mut receiver) = self.event_handles.pop_front() {
202+
let try_recv = receiver.try_recv();
203+
204+
match try_recv {
205+
Err(TryRecvError::Empty) => {
206+
self.event_handles.push_front(receiver);
207+
break;
208+
}
209+
Err(TryRecvError::Closed) => {
210+
let res = try_recv.map_err(|e| Error::InternalFailure {
211+
msg: format!("Trying to flush a closed channel {:?}", e),
212+
})?;
213+
214+
return res;
215+
}
216+
Ok(_) => {}
217+
}
218+
}
219+
220+
Ok(())
221+
}
167222
}
168223

169224
impl Drop for EventWriter {

src/segment/event.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub(crate) struct PendingEvent {
5050
pub(crate) data: Vec<u8>,
5151
pub(crate) conditional_offset: Option<i64>,
5252
pub(crate) oneshot_sender: oneshot::Sender<Result<(), Error>>,
53+
pub(crate) flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
5354
}
5455

5556
impl PendingEvent {
@@ -59,6 +60,7 @@ impl PendingEvent {
5960
data: Vec<u8>,
6061
conditional_offset: Option<i64>,
6162
oneshot_sender: oneshot::Sender<Result<(), Error>>,
63+
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
6264
) -> Option<Self> {
6365
if data.len() > PendingEvent::MAX_WRITE_SIZE {
6466
warn!(
@@ -82,19 +84,27 @@ impl PendingEvent {
8284
data,
8385
conditional_offset,
8486
oneshot_sender,
87+
flush_oneshot_sender,
8588
})
8689
}
8790
}
8891

89-
pub(crate) fn with_header(
92+
pub(crate) fn with_header_flush(
9093
routing_info: RoutingInfo,
9194
data: Vec<u8>,
9295
conditional_offset: Option<i64>,
9396
oneshot_sender: oneshot::Sender<Result<(), Error>>,
97+
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
9498
) -> Option<PendingEvent> {
9599
let cmd = EventCommand { data };
96100
match cmd.write_fields() {
97-
Ok(data) => PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender),
101+
Ok(data) => PendingEvent::new(
102+
routing_info,
103+
data,
104+
conditional_offset,
105+
oneshot_sender,
106+
flush_oneshot_sender,
107+
),
98108
Err(e) => {
99109
warn!("failed to serialize event to event command, sending this error back to caller");
100110
oneshot_sender
@@ -107,13 +117,38 @@ impl PendingEvent {
107117
}
108118
}
109119

120+
pub(crate) fn with_header(
121+
routing_info: RoutingInfo,
122+
data: Vec<u8>,
123+
conditional_offset: Option<i64>,
124+
oneshot_sender: oneshot::Sender<Result<(), Error>>,
125+
) -> Option<PendingEvent> {
126+
PendingEvent::with_header_flush(routing_info, data, conditional_offset, oneshot_sender, None)
127+
}
128+
129+
pub(crate) fn without_header_flush(
130+
routing_info: RoutingInfo,
131+
data: Vec<u8>,
132+
conditional_offset: Option<i64>,
133+
oneshot_sender: oneshot::Sender<Result<(), Error>>,
134+
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
135+
) -> Option<PendingEvent> {
136+
PendingEvent::new(
137+
routing_info,
138+
data,
139+
conditional_offset,
140+
oneshot_sender,
141+
flush_oneshot_sender,
142+
)
143+
}
144+
110145
pub(crate) fn without_header(
111146
routing_info: RoutingInfo,
112147
data: Vec<u8>,
113148
conditional_offset: Option<i64>,
114149
oneshot_sender: oneshot::Sender<Result<(), Error>>,
115150
) -> Option<PendingEvent> {
116-
PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender)
151+
PendingEvent::without_header_flush(routing_info, data, conditional_offset, oneshot_sender, None)
117152
}
118153

119154
pub(crate) fn is_empty(&self) -> bool {

src/segment/reactor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ pub(crate) mod test {
298298
) -> oneshot::Receiver<Result<(), Error>> {
299299
let (oneshot_sender, oneshot_receiver) = tokio::sync::oneshot::channel();
300300
let routing_info = RoutingInfo::RoutingKey(Some("routing_key".to_string()));
301-
let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender)
301+
let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender, None)
302302
.expect("create pending event");
303303
sender.send((Incoming::AppendEvent(event), size)).await.unwrap();
304304
oneshot_receiver

src/segment/writer.rs

+9
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,14 @@ impl SegmentWriter {
377377
acked.event_id
378378
);
379379
}
380+
if let Some(flush_sender) = acked.event.flush_oneshot_sender {
381+
if flush_sender.send(Result::Ok(())).is_err() {
382+
info!(
383+
"failed to send ack back to caller using oneshot due to Receiver dropped: event id {:?}",
384+
acked.event_id
385+
);
386+
}
387+
}
380388

381389
// ack up to event id
382390
if acked.event_id == event_id {
@@ -784,6 +792,7 @@ pub(crate) mod test {
784792
vec![1; size],
785793
offset,
786794
oneshot_sender,
795+
None,
787796
)
788797
.expect("create pending event");
789798
sender.send((Incoming::AppendEvent(event), size)).await.unwrap();

0 commit comments

Comments
 (0)