Skip to content

Commit b57b2ea

Browse files
shridsvangorktkaitchuck
authored
Issue 319: Enable delete readergroup (#342)
Enable delete ReaderGroup on the RUST and Python clients. Signed-off-by: Luis Liu <[email protected]> Co-authored-by: Luis Liu <[email protected]> Co-authored-by: Sandeep <[email protected]> Co-authored-by: Tom Kaitchuck <[email protected]> Co-authored-by: Luis Liu <[email protected]> Co-authored-by: Luis Liu <[email protected]>
1 parent 6f67ec1 commit b57b2ea

16 files changed

+303
-96
lines changed

config/src/credentials.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl Credentials {
5959

6060
pub fn keycloak(path: &str, disable_cert_verification: bool) -> Self {
6161
// read keycloak json
62-
let file = File::open(path.to_string()).expect("open keycloak.json");
62+
let file = File::open(path).expect("open keycloak.json");
6363
let mut buf_reader = BufReader::new(file);
6464
let mut buffer = Vec::new();
6565
buf_reader.read_to_end(&mut buffer).expect("read to the end");

config/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl ClientConfigBuilder {
211211
Err(format!(
212212
"is_tls_enabled option {} does not match scheme in uri {}",
213213
is_tls_enabled,
214-
self.controller_uri.as_ref().unwrap().to_string()
214+
**self.controller_uri.as_ref().unwrap()
215215
))
216216
} else {
217217
Ok(())

integration_test/src/event_reader_tests.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use pravega_client_shared::{
1919
};
2020
use pravega_controller_client::ControllerClient;
2121

22+
use pravega_wire_protocol::commands::NoSuchSegmentCommand;
23+
use pravega_wire_protocol::wire_commands::Replies;
2224
use std::sync::atomic::{AtomicUsize, Ordering};
2325
use std::sync::Arc;
2426
use std::thread;
@@ -399,7 +401,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
399401
let stream_name = Stream::from("testReaderStream".to_owned());
400402

401403
const NUM_EVENTS: usize = 10;
402-
const EVNET_SIZE: usize = 10;
404+
const EVENT_SIZE: usize = 10;
403405

404406
let new_stream =
405407
create_scope_stream(client_factory.controller_client(), &scope_name, &stream_name, 4).await;
@@ -411,7 +413,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
411413
stream_name.clone(),
412414
client_factory.clone(),
413415
NUM_EVENTS,
414-
EVNET_SIZE,
416+
EVENT_SIZE,
415417
)
416418
.await;
417419
}
@@ -432,7 +434,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
432434
loop {
433435
if let Some(event) = slice.next() {
434436
assert_eq!(
435-
vec![1; EVNET_SIZE],
437+
vec![1; EVENT_SIZE],
436438
event.value.as_slice(),
437439
"Corrupted event read"
438440
);
@@ -450,7 +452,20 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
450452
break;
451453
}
452454
}
453-
info!("test event stream reader read api passed");
455+
info!("read all events from the stream");
456+
client_factory
457+
.delete_reader_group(scope_name, "rg-read-api".to_string())
458+
.await
459+
.expect("Deletion of ReaderGroup failed");
460+
// Attempt acquiring a segment of a deleted ReaderGroup.
461+
let acquire_segment_result = reader.acquire_segment().await;
462+
//Verify the operation fails with EventReaderError
463+
assert!(
464+
acquire_segment_result.is_err(),
465+
"After reader group deletion acquire_segment API should return an error"
466+
);
467+
let reader_offline_result = reader.reader_offline().await;
468+
assert!(reader_offline_result.is_ok(), "The reader is already offline");
454469
}
455470

456471
fn test_multiple_readers(client_factory: &ClientFactoryAsync) {

integration_test/src/pravega_service.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,10 @@ impl PravegaService for PravegaStandaloneService {
137137
drop(src); // Close the file early
138138

139139
// Run the replace operation in memory
140-
let new_data: String;
141-
if enable {
142-
new_data = data.replace("INFO", "DEBUG");
140+
let new_data = if enable {
141+
data.replace("INFO", "DEBUG")
143142
} else {
144-
new_data = data.replace("DEBUG", "INFO");
143+
data.replace("DEBUG", "INFO")
145144
};
146145

147146
// Recreate the file and dump the processed contents to it

integration_test/src/transactional_event_writer_tests.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async fn test_multiple_transactions(factory: &ClientFactory, stream: ScopedStrea
191191
for mut transaction in transactions {
192192
let event = vec![1; 100];
193193
transaction.write_event(None, event).await.expect("write event");
194-
let timestamp = Timestamp { 0: 0 };
194+
let timestamp = Timestamp(0);
195195
transaction.commit(timestamp).await.expect("commit");
196196
}
197197

@@ -208,7 +208,7 @@ async fn test_multiple_transactions(factory: &ClientFactory, stream: ScopedStrea
208208
for mut transaction in transactions {
209209
let event = vec![1; 100];
210210
transaction.write_event(None, event).await.expect("write event");
211-
let timestamp = Timestamp { 0: 0 };
211+
let timestamp = Timestamp(0);
212212
transaction.commit(timestamp).await.expect("commit");
213213
}
214214
}

python/src/stream_manager.rs

+26
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,32 @@ impl StreamManager {
549549
Ok(reader_group)
550550
}
551551

552+
///
553+
/// Delete a ReaderGroup.
554+
///
555+
/// ```
556+
/// import pravega_client;
557+
/// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090")
558+
/// // Delete a ReaderGroup against an already created Pravega scope..
559+
/// manager.delete_reader_group_with_config("rg1", "scope", rg_config)
560+
///
561+
/// ```
562+
///
563+
#[pyo3(text_signature = "($self, reader_group_name, scope_name)")]
564+
pub fn delete_reader_group(&self, reader_group_name: &str, scope_name: &str) -> PyResult<()> {
565+
let scope = Scope::from(scope_name.to_string());
566+
567+
let handle = self.cf.runtime_handle();
568+
569+
let delete_result =
570+
handle.block_on(self.cf.delete_reader_group(scope, reader_group_name.to_string()));
571+
info!("Delete ReaderGroup {:?}", delete_result);
572+
match delete_result {
573+
Ok(_) => Ok(()),
574+
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
575+
}
576+
}
577+
552578
///
553579
/// Create a Binary I/O representation of a Pravega Stream. This ByteStream implements the
554580
/// APIs provided by [io.IOBase](https://docs.python.org/3/library/io.html#io.IOBase)

shared/src/naming_utils.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ impl NameUtils {
3939
);
4040
if let Some(transaction_id) = tx_id {
4141
format!(
42-
"{}{}{}{}",
42+
"{}{}{:016x}{:016x}",
4343
segment_name,
4444
TRANSACTION_DELIMITER,
45-
format!("{:016x}", (transaction_id.0 >> 64) as i64),
46-
format!("{:016x}", transaction_id.0 as i64)
45+
(transaction_id.0 >> 64) as i64,
46+
transaction_id.0 as i64,
4747
)
4848
} else {
4949
segment_name

src/client_factory.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::segment::metadata::SegmentMetadataClient;
2222
use crate::segment::raw_client::RawClientImpl;
2323
use crate::segment::reader::AsyncSegmentReaderImpl;
2424
use crate::sync::synchronizer::Synchronizer;
25-
use crate::sync::table::Table;
25+
use crate::sync::table::{Table, TableError};
2626
cfg_if::cfg_if! {
2727
if #[cfg(feature = "integration-test")] {
2828
use crate::test_utils::{RawClientWrapper, SegmentReaderWrapper};
@@ -155,6 +155,23 @@ impl ClientFactory {
155155
.await
156156
}
157157

158+
///
159+
/// Delete a ReaderGroup.
160+
///
161+
pub async fn delete_reader_group(
162+
&self,
163+
scope: Scope,
164+
reader_group_name: String,
165+
) -> Result<(), TableError> {
166+
info!(
167+
"Deleting reader group {:?} under scope {:?}",
168+
reader_group_name, scope
169+
);
170+
self.client_factory_async
171+
.delete_reader_group(scope, reader_group_name)
172+
.await
173+
}
174+
158175
pub async fn create_transactional_event_writer(
159176
&self,
160177
stream: ScopedStream,
@@ -316,6 +333,17 @@ impl ClientFactoryAsync {
316333
ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
317334
}
318335

336+
///
337+
/// Delete a ReaderGroup given for a given scope.
338+
///
339+
pub async fn delete_reader_group(
340+
&self,
341+
scope: Scope,
342+
reader_group_name: String,
343+
) -> Result<(), TableError> {
344+
ReaderGroup::delete(scope, reader_group_name, self.clone()).await
345+
}
346+
319347
pub async fn create_transactional_event_writer(
320348
&self,
321349
stream: ScopedStream,

src/event/reader.rs

+48-11
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ impl EventReader {
154154
.lock()
155155
.await
156156
.compute_segments_to_acquire_or_release(&reader)
157-
.await;
157+
.await
158+
.expect("should compute segments");
158159
// attempt acquiring the desired number of segments.
159160
if new_segments_to_acquire > 0 {
160161
for _ in 0..new_segments_to_acquire {
@@ -267,9 +268,20 @@ impl EventReader {
267268
/// is assumed dead.
268269
pub async fn release_segment(&mut self, mut slice: SegmentSlice) -> Result<(), EventReaderError> {
269270
info!(
270-
"releasing segment slice {} from reader {}",
271+
"releasing segment slice {} from reader {:?}",
271272
slice.meta.scoped_segment, self.id
272273
);
274+
// check if the reader is already offline.
275+
if self.meta.reader_offline {
276+
return Err(EventReaderError::StateError {
277+
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
278+
error_msg: format!("Reader already marked offline {:?}", self.id),
279+
source: SynchronizerError::SyncPreconditionError {
280+
error_msg: String::from("Precondition failure"),
281+
},
282+
},
283+
});
284+
}
273285
//update meta data.
274286
let scoped_segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str());
275287
self.meta.add_slices(slice.meta.clone());
@@ -322,6 +334,16 @@ impl EventReader {
322334
slice.meta.end_offset >= offset,
323335
"the offset where the segment slice is released should be less than the end offset"
324336
);
337+
if self.meta.reader_offline {
338+
return Err(EventReaderError::StateError {
339+
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
340+
error_msg: format!("Reader already marked offline {:?}", self.id),
341+
source: SynchronizerError::SyncPreconditionError {
342+
error_msg: String::from("Precondition failure"),
343+
},
344+
},
345+
});
346+
}
325347
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
326348
if slice.meta.read_offset != offset {
327349
self.meta.stop_reading(&segment);
@@ -361,7 +383,7 @@ impl EventReader {
361383
/// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
362384
/// is assumed dead.
363385
pub async fn reader_offline(&mut self) -> Result<(), EventReaderError> {
364-
if !self.meta.reader_offline {
386+
if !self.meta.reader_offline && self.rg_state.lock().await.check_online(&self.id).await {
365387
info!("Putting reader {:?} offline", self.id);
366388
// stop reading from all the segments.
367389
self.meta.stop_reading_all();
@@ -411,12 +433,23 @@ impl EventReader {
411433
mut slice: SegmentSlice,
412434
read_offset: i64,
413435
) -> Result<(), EventReaderError> {
436+
if self.meta.reader_offline {
437+
return Err(EventReaderError::StateError {
438+
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
439+
error_msg: format!("Reader already marked offline {:?}", self.id),
440+
source: SynchronizerError::SyncPreconditionError {
441+
error_msg: String::from("Precondition failure"),
442+
},
443+
},
444+
});
445+
}
414446
let new_segments_to_release = self
415447
.rg_state
416448
.lock()
417449
.await
418450
.compute_segments_to_acquire_or_release(&self.id)
419-
.await;
451+
.await
452+
.map_err(|err| EventReaderError::StateError { source: err })?;
420453
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
421454
// check if segments needs to be released from the reader
422455
if new_segments_to_release < 0 {
@@ -462,7 +495,10 @@ impl EventReader {
462495
if self.meta.reader_offline || !self.rg_state.lock().await.check_online(&self.id).await {
463496
return Err(EventReaderError::StateError {
464497
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
465-
error_msg: format!("Reader already marked offline {:?}", self.id),
498+
error_msg: format!(
499+
"Reader already marked offline {:?} or the ReaderGroup is deleted",
500+
self.id
501+
),
466502
source: SynchronizerError::SyncPreconditionError {
467503
error_msg: String::from("Precondition failure"),
468504
},
@@ -666,7 +702,8 @@ impl EventReader {
666702
.lock()
667703
.await
668704
.compute_segments_to_acquire_or_release(&self.id)
669-
.await;
705+
.await
706+
.expect("should compute segments");
670707
if new_segments_to_acquire <= 0 {
671708
Ok(None)
672709
} else {
@@ -1288,7 +1325,7 @@ mod tests {
12881325
rg_mock.expect_check_online().return_const(true);
12891326
rg_mock
12901327
.expect_compute_segments_to_acquire_or_release()
1291-
.return_const(0 as isize);
1328+
.return_once(move |_| Ok(0 as isize));
12921329
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
12931330
// create a new Event Reader with the segment slice data.
12941331
let mut reader = EventReader::init_event_reader(
@@ -1356,7 +1393,7 @@ mod tests {
13561393
rg_mock
13571394
.expect_compute_segments_to_acquire_or_release()
13581395
.with(predicate::eq(Reader::from("r1".to_string())))
1359-
.return_const(1 as isize);
1396+
.return_once(move |_| Ok(1 as isize));
13601397
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
13611398
rg_mock.expect_check_online().return_const(true);
13621399

@@ -1460,7 +1497,7 @@ mod tests {
14601497
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
14611498
rg_mock
14621499
.expect_compute_segments_to_acquire_or_release()
1463-
.return_const(0 as isize);
1500+
.return_once(move |_| Ok(0 as isize));
14641501
rg_mock.expect_check_online().return_const(true);
14651502
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
14661503
// create a new Event Reader with the segment slice data.
@@ -1535,7 +1572,7 @@ mod tests {
15351572
rg_mock.expect_check_online().return_const(true);
15361573
rg_mock
15371574
.expect_compute_segments_to_acquire_or_release()
1538-
.return_const(0 as isize);
1575+
.return_once(move |_| Ok(0 as isize));
15391576
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
15401577
// create a new Event Reader with the segment slice data.
15411578
let mut reader = EventReader::init_event_reader(
@@ -1619,7 +1656,7 @@ mod tests {
16191656
rg_mock.expect_check_online().return_const(true);
16201657
rg_mock
16211658
.expect_compute_segments_to_acquire_or_release()
1622-
.return_const(0 as isize);
1659+
.return_once(move |_| Ok(0 as isize));
16231660
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
16241661
// create a new Event Reader with the segment slice data.
16251662
let mut reader = EventReader::init_event_reader(

src/event/reader_group.rs

+11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::event::reader_group_state::{Offset, ReaderGroupStateError};
1414

1515
use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream};
1616

17+
use crate::sync::table::TableError;
18+
use crate::sync::Table;
1719
use serde::{Deserialize, Serialize};
1820
use serde_cbor::Error as CborError;
1921
use serde_cbor::{from_slice, to_vec};
@@ -143,6 +145,15 @@ impl ReaderGroup {
143145
}
144146
}
145147

148+
/// Delete a reader group.
149+
pub(crate) async fn delete(
150+
scope: Scope,
151+
name: String,
152+
client_factory: ClientFactoryAsync,
153+
) -> Result<(), TableError> {
154+
Table::delete(scope, name, client_factory).await
155+
}
156+
146157
/// Create a new EventReader under the ReaderGroup. This method panics if the reader is
147158
/// already part of the reader group.
148159
///

0 commit comments

Comments
 (0)