Skip to content

Commit 08fc246

Browse files
authored
feat(source): store source split state as jsonb (risingwavelabs#8602)
1 parent cfc0349 commit 08fc246

File tree

19 files changed

+174
-104
lines changed

19 files changed

+174
-104
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/src/array/jsonb_array.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,16 @@ impl JsonbVal {
164164
let v = Value::from_sql(&Type::JSONB, buf).ok()?;
165165
Some(Self(v.into()))
166166
}
167+
168+
pub fn take(mut self) -> Value {
169+
self.0.take()
170+
}
171+
}
172+
173+
impl From<Value> for JsonbVal {
174+
fn from(v: Value) -> Self {
175+
Self(v.into())
176+
}
167177
}
168178

169179
impl JsonbRef<'_> {

src/connector/src/macros.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,18 @@ macro_rules! impl_split {
7676
}
7777
}
7878

79-
fn encode_to_bytes(&self) -> Bytes {
80-
Bytes::from(ConnectorSplit::from(self).encode_to_vec())
79+
fn encode_to_json(&self) -> JsonbVal {
80+
use serde_json::json;
81+
let inner = self.encode_to_json_inner().take();
82+
json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
8183
}
8284

83-
fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
84-
SplitImpl::try_from(&ConnectorSplit::decode(bytes)?)
85+
fn restore_from_json(value: JsonbVal) -> Result<Self> {
86+
let mut value = value.take();
87+
let json_obj = value.as_object_mut().unwrap();
88+
let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string();
89+
let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
90+
Self::restore_from_json_inner(&split_type, inner_value.into())
8591
}
8692
}
8793

@@ -98,6 +104,21 @@ macro_rules! impl_split {
98104
$( Self::$variant_name(inner) => Self::$variant_name(inner.copy_with_offset(start_offset)), )*
99105
}
100106
}
107+
108+
pub fn encode_to_json_inner(&self) -> JsonbVal {
109+
match self {
110+
$( Self::$variant_name(inner) => inner.encode_to_json(), )*
111+
}
112+
}
113+
114+
fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
115+
match split_type.to_lowercase().as_str() {
116+
$( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )*
117+
other => {
118+
Err(anyhow!("connector '{}' is not supported", other))
119+
}
120+
}
121+
}
101122
}
102123
}
103124
}

src/connector/src/source/base.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ use enum_as_inner::EnumAsInner;
2222
use futures::stream::BoxStream;
2323
use itertools::Itertools;
2424
use parking_lot::Mutex;
25-
use prost::Message;
26-
use risingwave_common::array::StreamChunk;
25+
use risingwave_common::array::{JsonbVal, StreamChunk};
2726
use risingwave_common::catalog::TableId;
2827
use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError};
28+
use risingwave_common::types::Scalar;
2929
use risingwave_pb::connector_service::TableSchema;
3030
use risingwave_pb::source::ConnectorSplit;
3131
use serde::{Deserialize, Serialize};
@@ -66,6 +66,9 @@ use crate::source::pulsar::{
6666
};
6767
use crate::{impl_connector_properties, impl_split, impl_split_enumerator, impl_split_reader};
6868

69+
const SPLIT_TYPE_FIELD: &str = "split_type";
70+
const SPLIT_INFO_FIELD: &str = "split_info";
71+
6972
/// [`SplitEnumerator`] fetches the split metadata from the external source service.
7073
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
7174
#[async_trait]
@@ -403,8 +406,18 @@ impl Eq for SourceMessage {}
403406
/// The metadata of a split.
404407
pub trait SplitMetaData: Sized {
405408
fn id(&self) -> SplitId;
406-
fn encode_to_bytes(&self) -> Bytes;
407-
fn restore_from_bytes(bytes: &[u8]) -> Result<Self>;
409+
fn encode_to_bytes(&self) -> Bytes {
410+
self.encode_to_json()
411+
.as_scalar_ref()
412+
.value_serialize()
413+
.into()
414+
}
415+
fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
416+
Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
417+
}
418+
419+
fn encode_to_json(&self) -> JsonbVal;
420+
fn restore_from_json(value: JsonbVal) -> Result<Self>;
408421
}
409422

410423
/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
@@ -427,6 +440,7 @@ mod tests {
427440
let get_value = split_impl.into_kafka().unwrap();
428441
println!("{:?}", get_value);
429442
assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
443+
assert_eq!(split.encode_to_json(), get_value.encode_to_json());
430444

431445
Ok(())
432446
}
@@ -441,6 +455,21 @@ mod tests {
441455
split_impl.encode_to_bytes(),
442456
restored_split_impl.encode_to_bytes()
443457
);
458+
assert_eq!(
459+
split_impl.encode_to_json(),
460+
restored_split_impl.encode_to_json()
461+
);
462+
463+
let encoded_split = split_impl.encode_to_json();
464+
let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
465+
assert_eq!(
466+
split_impl.encode_to_bytes(),
467+
restored_split_impl.encode_to_bytes()
468+
);
469+
assert_eq!(
470+
split_impl.encode_to_json(),
471+
restored_split_impl.encode_to_json()
472+
);
444473
Ok(())
445474
}
446475

src/connector/src/source/cdc/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::{SplitId, SplitMetaData};
@@ -31,12 +31,12 @@ impl SplitMetaData for CdcSplit {
3131
format!("{}", self.source_id).into()
3232
}
3333

34-
fn encode_to_bytes(&self) -> Bytes {
35-
Bytes::from(serde_json::to_string(self).unwrap())
34+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
35+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
3636
}
3737

38-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
39-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
38+
fn encode_to_json(&self) -> JsonbVal {
39+
serde_json::to_value(self.clone()).unwrap().into()
4040
}
4141
}
4242

src/connector/src/source/datagen/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::base::SplitMetaData;
@@ -32,12 +32,12 @@ impl SplitMetaData for DatagenSplit {
3232
format!("{}-{}", self.split_num, self.split_index).into()
3333
}
3434

35-
fn encode_to_bytes(&self) -> Bytes {
36-
Bytes::from(serde_json::to_string(self).unwrap())
35+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
36+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
3737
}
3838

39-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
40-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
39+
fn encode_to_json(&self) -> JsonbVal {
40+
serde_json::to_value(self.clone()).unwrap().into()
4141
}
4242
}
4343

src/connector/src/source/filesystem/file_common.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
use anyhow::anyhow;
15+
use risingwave_common::array::JsonbVal;
1516
use serde::{Deserialize, Serialize};
1617

1718
use crate::source::{SplitId, SplitMetaData};
@@ -30,12 +31,12 @@ impl SplitMetaData for FsSplit {
3031
self.name.as_str().into()
3132
}
3233

33-
fn encode_to_bytes(&self) -> bytes::Bytes {
34-
bytes::Bytes::from(serde_json::to_string(self).unwrap())
34+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
35+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
3536
}
3637

37-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
38-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
38+
fn encode_to_json(&self) -> JsonbVal {
39+
serde_json::to_value(self.clone()).unwrap().into()
3940
}
4041
}
4142

src/connector/src/source/google_pubsub/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::{SplitId, SplitMetaData};
@@ -47,12 +47,12 @@ impl PubsubSplit {
4747
}
4848

4949
impl SplitMetaData for PubsubSplit {
50-
fn encode_to_bytes(&self) -> Bytes {
51-
Bytes::from(serde_json::to_string(self).unwrap())
50+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
51+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
5252
}
5353

54-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
55-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
54+
fn encode_to_json(&self) -> JsonbVal {
55+
serde_json::to_value(self.clone()).unwrap().into()
5656
}
5757

5858
fn id(&self) -> SplitId {

src/connector/src/source/kafka/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::{SplitId, SplitMetaData};
@@ -32,12 +32,12 @@ impl SplitMetaData for KafkaSplit {
3232
format!("{}", self.partition).into()
3333
}
3434

35-
fn encode_to_bytes(&self) -> Bytes {
36-
Bytes::from(serde_json::to_string(self).unwrap())
35+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
36+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
3737
}
3838

39-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
40-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
39+
fn encode_to_json(&self) -> JsonbVal {
40+
serde_json::to_value(self.clone()).unwrap().into()
4141
}
4242
}
4343

src/connector/src/source/kinesis/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::{SplitId, SplitMetaData};
@@ -39,12 +39,12 @@ impl SplitMetaData for KinesisSplit {
3939
self.shard_id.clone()
4040
}
4141

42-
fn encode_to_bytes(&self) -> Bytes {
43-
Bytes::from(serde_json::to_string(self).unwrap())
42+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
43+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
4444
}
4545

46-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
47-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
46+
fn encode_to_json(&self) -> JsonbVal {
47+
serde_json::to_value(self.clone()).unwrap().into()
4848
}
4949
}
5050

src/connector/src/source/nexmark/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::{SplitId, SplitMetaData};
@@ -31,12 +31,12 @@ impl SplitMetaData for NexmarkSplit {
3131
format!("{}-{}", self.split_num, self.split_index).into()
3232
}
3333

34-
fn encode_to_bytes(&self) -> Bytes {
35-
Bytes::from(serde_json::to_string(self).unwrap())
34+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
35+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
3636
}
3737

38-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
39-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
38+
fn encode_to_json(&self) -> JsonbVal {
39+
serde_json::to_value(self.clone()).unwrap().into()
4040
}
4141
}
4242

src/connector/src/source/pulsar/split.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use bytes::Bytes;
16+
use risingwave_common::array::JsonbVal;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::source::pulsar::topic::Topic;
@@ -46,11 +46,11 @@ impl SplitMetaData for PulsarSplit {
4646
self.topic.to_string().into()
4747
}
4848

49-
fn encode_to_bytes(&self) -> Bytes {
50-
Bytes::from(serde_json::to_string(self).unwrap())
49+
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
50+
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
5151
}
5252

53-
fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
54-
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
53+
fn encode_to_json(&self) -> JsonbVal {
54+
serde_json::to_value(self.clone()).unwrap().into()
5555
}
5656
}

0 commit comments

Comments
 (0)