Skip to content

Commit 3c19064

Browse files
feat(stream): Add recovery feature for nats source (risingwavelabs#12073)
Signed-off-by: tabVersion <[email protected]> Co-authored-by: tabVersion <[email protected]>
1 parent 52192e6 commit 3c19064

File tree

5 files changed

+65
-17
lines changed

5 files changed

+65
-17
lines changed

src/connector/src/common.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use std::time::Duration;
1818

1919
use anyhow::Ok;
20+
use async_nats::jetstream::consumer::DeliverPolicy;
2021
use async_nats::jetstream::{self};
2122
use aws_sdk_kinesis::Client as KinesisClient;
2223
use clickhouse::Client;
@@ -386,6 +387,39 @@ impl NatsCommon {
386387
Ok(subscription)
387388
}
388389

390+
pub(crate) async fn build_consumer(
391+
&self,
392+
split_id: i32,
393+
start_sequence: Option<u64>,
394+
) -> anyhow::Result<
395+
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
396+
> {
397+
let context = self.build_context().await?;
398+
let stream = self.build_or_get_stream(context.clone()).await?;
399+
let name = format!("risingwave-consumer-{}-{}", self.subject, split_id);
400+
let mut config = jetstream::consumer::pull::Config {
401+
ack_policy: jetstream::consumer::AckPolicy::None,
402+
..Default::default()
403+
};
404+
match start_sequence {
405+
Some(v) => {
406+
let consumer = stream
407+
.get_or_create_consumer(&name, {
408+
config.deliver_policy = DeliverPolicy::ByStartSequence {
409+
start_sequence: v + 1,
410+
};
411+
config
412+
})
413+
.await?;
414+
Ok(consumer)
415+
}
416+
None => {
417+
let consumer = stream.get_or_create_consumer(&name, config).await?;
418+
Ok(consumer)
419+
}
420+
}
421+
}
422+
389423
pub(crate) async fn build_or_get_stream(
390424
&self,
391425
jetstream: jetstream::Context,

src/connector/src/source/nats/enumerator/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ impl SplitEnumerator for NatsSplitEnumerator {
4444
// TODO: to simplify the logic, return 1 split for first version
4545
let nats_split = NatsSplit {
4646
subject: self.subject.clone(),
47-
split_num: 1,
47+
split_num: 0, // be the same as `from_nats_jetstream_message`
48+
start_sequence: None,
4849
};
4950

5051
Ok(vec![nats_split])

src/connector/src/source/nats/source/message.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use async_nats;
1816

1917
use crate::source::base::SourceMessage;
2018
use crate::source::SourceMeta;
2119

2220
impl SourceMessage {
23-
pub fn from_nats_message(message: async_nats::Message) -> Self {
21+
pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self {
2422
SourceMessage {
2523
key: None,
26-
payload: Some(message.payload.to_vec()),
27-
// Nats message doesn't have offset
28-
offset: "".to_string(),
29-
split_id: Arc::from(""),
24+
payload: Some(message.message.payload.to_vec()),
25+
// For nats jetstream, use sequence id as offset
26+
offset: message.info().unwrap().stream_sequence.to_string(),
27+
split_id: "0".into(),
3028
meta: SourceMeta::Empty,
3129
}
3230
}

src/connector/src/source/nats/source/reader.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@
1313
// limitations under the License.
1414

1515
use anyhow::Result;
16+
use async_nats::jetstream::consumer;
1617
use async_trait::async_trait;
1718
use futures::StreamExt;
1819
use futures_async_stream::try_stream;
1920

2021
use crate::parser::ParserConfig;
2122
use crate::source::common::{into_chunk_stream, CommonSplitReader};
23+
use crate::source::nats::split::NatsSplit;
2224
use crate::source::nats::NatsProperties;
2325
use crate::source::{
2426
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader,
2527
};
2628

2729
pub struct NatsSplitReader {
28-
subscriber: async_nats::Subscriber,
30+
consumer: consumer::Consumer<consumer::pull::Config>,
2931
properties: NatsProperties,
3032
parser_config: ParserConfig,
3133
source_ctx: SourceContextRef,
@@ -44,9 +46,16 @@ impl SplitReader for NatsSplitReader {
4446
) -> Result<Self> {
4547
// TODO: to simplify the logic, return 1 split for first version
4648
assert!(splits.len() == 1);
47-
let subscriber = properties.common.build_subscriber().await?;
49+
let splits = splits
50+
.into_iter()
51+
.map(|split| split.into_nats().unwrap())
52+
.collect::<Vec<NatsSplit>>();
53+
let consumer = properties
54+
.common
55+
.build_consumer(0, splits[0].start_sequence)
56+
.await?;
4857
Ok(Self {
49-
subscriber,
58+
consumer,
5059
properties,
5160
parser_config,
5261
source_ctx,
@@ -64,11 +73,12 @@ impl CommonSplitReader for NatsSplitReader {
6473
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
6574
async fn into_data_stream(self) {
6675
let capacity = self.source_ctx.source_ctrl_opts.chunk_size;
76+
let messages = self.consumer.messages().await?;
6777
#[for_await]
68-
for msgs in self.subscriber.ready_chunks(capacity) {
78+
for msgs in messages.ready_chunks(capacity) {
6979
let mut msg_vec = Vec::with_capacity(capacity);
7080
for msg in msgs {
71-
msg_vec.push(SourceMessage::from_nats_message(msg));
81+
msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?));
7282
}
7383
yield msg_vec;
7484
}

src/connector/src/source/nats/split.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct NatsSplit {
2525
// TODO: to simplify the logic, return 1 split for first version. May use parallelism in
2626
// future.
2727
pub(crate) split_num: i32,
28-
// nats does not provide offset
28+
pub(crate) start_sequence: Option<u64>,
2929
}
3030

3131
impl SplitMetaData for NatsSplit {
@@ -44,11 +44,16 @@ impl SplitMetaData for NatsSplit {
4444
}
4545

4646
impl NatsSplit {
47-
pub fn new(subject: String, split_num: i32) -> Self {
48-
Self { subject, split_num }
47+
pub fn new(subject: String, split_num: i32, start_sequence: Option<u64>) -> Self {
48+
Self {
49+
subject,
50+
split_num,
51+
start_sequence,
52+
}
4953
}
5054

51-
pub fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> {
55+
pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> {
56+
self.start_sequence = Some(start_sequence.as_str().parse::<u64>().unwrap());
5257
Ok(())
5358
}
5459
}

0 commit comments

Comments
 (0)