Skip to content

Commit 0c329e9

Browse files
authored
feat(frontend): support create iceberg source (#14971)
1 parent 86df42b commit 0c329e9

File tree

15 files changed

+343
-29
lines changed

15 files changed

+343
-29
lines changed

proto/plan_common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ enum FormatType {
136136
FORMAT_TYPE_CANAL = 5;
137137
FORMAT_TYPE_UPSERT = 6;
138138
FORMAT_TYPE_PLAIN = 7;
139+
FORMAT_TYPE_NONE = 8;
139140
}
140141

141142
enum EncodeType {
@@ -147,6 +148,7 @@ enum EncodeType {
147148
ENCODE_TYPE_JSON = 5;
148149
ENCODE_TYPE_BYTES = 6;
149150
ENCODE_TYPE_TEMPLATE = 7;
151+
ENCODE_TYPE_NONE = 8;
150152
}
151153

152154
enum RowFormatType {

src/connector/src/macros.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ macro_rules! for_all_classified_sources {
3636
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
3737
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
3838
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
39-
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
39+
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
40+
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
4041
}
4142
$(
4243
,$extra_args

src/connector/src/sink/catalog/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
205205
F::Plain => SinkFormat::AppendOnly,
206206
F::Upsert => SinkFormat::Upsert,
207207
F::Debezium => SinkFormat::Debezium,
208-
f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
208+
f @ (F::Unspecified
209+
| F::Native
210+
| F::DebeziumMongo
211+
| F::Maxwell
212+
| F::Canal
213+
| F::None) => {
209214
return Err(SinkError::Config(anyhow!(
210215
"sink format unsupported: {}",
211216
f.as_str_name()
@@ -217,7 +222,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
217222
E::Protobuf => SinkEncode::Protobuf,
218223
E::Template => SinkEncode::Template,
219224
E::Avro => SinkEncode::Avro,
220-
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
225+
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None) => {
221226
return Err(SinkError::Config(anyhow!(
222227
"sink encode unsupported: {}",
223228
e.as_str_name()

src/connector/src/sink/iceberg/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -927,7 +927,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
927927
}
928928

929929
/// Try to match our schema with iceberg schema.
930-
fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
930+
pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
931931
if rw_schema.fields.len() != arrow_schema.fields().len() {
932932
return Err(SinkError::Iceberg(anyhow!(
933933
"Schema length not match, ours is {}, and iceberg is {}",

src/connector/src/source/base.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub struct SourceEnumeratorContext {
150150
pub connector_client: Option<ConnectorClient>,
151151
}
152152

153-
#[derive(Clone, Copy, Debug, Default)]
153+
#[derive(Clone, Debug, Default)]
154154
pub struct SourceEnumeratorInfo {
155155
pub source_id: u32,
156156
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2024 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
17+
use async_trait::async_trait;
18+
use risingwave_common::types::JsonbVal;
19+
use serde::{Deserialize, Serialize};
20+
21+
use crate::parser::ParserConfig;
22+
use crate::source::{
23+
BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
24+
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
25+
};
26+
27+
pub const ICEBERG_CONNECTOR: &str = "iceberg";
28+
29+
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
30+
pub struct IcebergProperties {
31+
#[serde(rename = "catalog.type")]
32+
pub catalog_type: String,
33+
#[serde(rename = "s3.region")]
34+
pub region_name: String,
35+
#[serde(rename = "s3.endpoint", default)]
36+
pub endpoint: String,
37+
#[serde(rename = "s3.access.key", default)]
38+
pub s3_access: String,
39+
#[serde(rename = "s3.secret.key", default)]
40+
pub s3_secret: String,
41+
#[serde(rename = "warehouse.path")]
42+
pub warehouse_path: String,
43+
#[serde(rename = "database.name")]
44+
pub database_name: String,
45+
#[serde(rename = "table.name")]
46+
pub table_name: String,
47+
48+
#[serde(flatten)]
49+
pub unknown_fields: HashMap<String, String>,
50+
}
51+
52+
impl SourceProperties for IcebergProperties {
53+
type Split = IcebergSplit;
54+
type SplitEnumerator = IcebergSplitEnumerator;
55+
type SplitReader = IcebergFileReader;
56+
57+
const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
58+
}
59+
60+
impl UnknownFields for IcebergProperties {
61+
fn unknown_fields(&self) -> HashMap<String, String> {
62+
self.unknown_fields.clone()
63+
}
64+
}
65+
66+
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
67+
pub struct IcebergSplit {}
68+
69+
impl SplitMetaData for IcebergSplit {
70+
fn id(&self) -> SplitId {
71+
unimplemented!()
72+
}
73+
74+
fn restore_from_json(_value: JsonbVal) -> anyhow::Result<Self> {
75+
unimplemented!()
76+
}
77+
78+
fn encode_to_json(&self) -> JsonbVal {
79+
unimplemented!()
80+
}
81+
82+
fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> {
83+
unimplemented!()
84+
}
85+
}
86+
87+
#[derive(Debug, Clone)]
88+
pub struct IcebergSplitEnumerator {}
89+
90+
#[async_trait]
91+
impl SplitEnumerator for IcebergSplitEnumerator {
92+
type Properties = IcebergProperties;
93+
type Split = IcebergSplit;
94+
95+
async fn new(
96+
_properties: Self::Properties,
97+
_context: SourceEnumeratorContextRef,
98+
) -> anyhow::Result<Self> {
99+
Ok(Self {})
100+
}
101+
102+
async fn list_splits(&mut self) -> anyhow::Result<Vec<Self::Split>> {
103+
Ok(vec![])
104+
}
105+
}
106+
107+
#[derive(Debug)]
108+
pub struct IcebergFileReader {}
109+
110+
#[async_trait]
111+
impl SplitReader for IcebergFileReader {
112+
type Properties = IcebergProperties;
113+
type Split = IcebergSplit;
114+
115+
async fn new(
116+
_props: IcebergProperties,
117+
_splits: Vec<IcebergSplit>,
118+
_parser_config: ParserConfig,
119+
_source_ctx: SourceContextRef,
120+
_columns: Option<Vec<Column>>,
121+
) -> anyhow::Result<Self> {
122+
unimplemented!()
123+
}
124+
125+
fn into_stream(self) -> BoxChunkSourceStream {
126+
unimplemented!()
127+
}
128+
}

src/connector/src/source/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub use kafka::KAFKA_CONNECTOR;
3131
pub use kinesis::KINESIS_CONNECTOR;
3232
pub use nats::NATS_CONNECTOR;
3333
mod common;
34+
pub mod iceberg;
3435
mod manager;
3536
pub mod reader;
3637
pub mod test_source;

src/connector/with_options_source.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,35 @@ GcsProperties:
3333
field_type: String
3434
required: false
3535
default: Default::default
36+
IcebergProperties:
37+
fields:
38+
- name: catalog.type
39+
field_type: String
40+
required: true
41+
- name: s3.region
42+
field_type: String
43+
required: true
44+
- name: s3.endpoint
45+
field_type: String
46+
required: false
47+
default: Default::default
48+
- name: s3.access.key
49+
field_type: String
50+
required: false
51+
default: Default::default
52+
- name: s3.secret.key
53+
field_type: String
54+
required: false
55+
default: Default::default
56+
- name: warehouse.path
57+
field_type: String
58+
required: true
59+
- name: database.name
60+
field_type: String
61+
required: true
62+
- name: table.name
63+
field_type: String
64+
required: true
3665
KafkaProperties:
3766
fields:
3867
- name: bytes.per.second

src/frontend/src/handler/alter_source_with_sr.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ fn format_type_to_format(from: FormatType) -> Option<Format> {
4242
FormatType::Canal => Format::Canal,
4343
FormatType::Upsert => Format::Upsert,
4444
FormatType::Plain => Format::Plain,
45+
FormatType::None => Format::None,
4546
})
4647
}
4748

@@ -55,6 +56,7 @@ fn encode_type_to_encode(from: EncodeType) -> Option<Encode> {
5556
EncodeType::Json => Encode::Json,
5657
EncodeType::Bytes => Encode::Bytes,
5758
EncodeType::Template => Encode::Template,
59+
EncodeType::None => Encode::None,
5860
})
5961
}
6062

src/frontend/src/handler/create_sink.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
701701
F::Plain => SinkFormat::AppendOnly,
702702
F::Upsert => SinkFormat::Upsert,
703703
F::Debezium => SinkFormat::Debezium,
704-
f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
704+
f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
705705
return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
706706
}
707707
};
@@ -710,7 +710,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
710710
E::Protobuf => SinkEncode::Protobuf,
711711
E::Avro => SinkEncode::Avro,
712712
E::Template => SinkEncode::Template,
713-
e @ (E::Native | E::Csv | E::Bytes) => {
713+
e @ (E::Native | E::Csv | E::Bytes | E::None) => {
714714
return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
715715
}
716716
};

0 commit comments

Comments
 (0)