Skip to content

Commit f263d2d

Browse files
authored
feat(frontend): derive columns from iceberg source automatically (#15415)
1 parent 44ccb7e commit f263d2d

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

src/common/src/array/arrow/arrow_impl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ impl From<&arrow_schema::DataType> for DataType {
222222
LargeUtf8 => Self::Jsonb,
223223
Struct(fields) => Self::Struct(fields.into()),
224224
List(field) => Self::List(Box::new(field.data_type().into())),
225+
Decimal128(_, _) => Self::Decimal,
225226
_ => todo!("Unsupported arrow data type: {value:?}"),
226227
}
227228
}

src/frontend/src/handler/create_source.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use maplit::{convert_args, hashmap};
2323
use pgwire::pg_response::{PgResponse, StatementType};
2424
use risingwave_common::bail_not_implemented;
2525
use risingwave_common::catalog::{
26-
is_column_ids_dedup, ColumnCatalog, ColumnDesc, Schema, TableId, INITIAL_SOURCE_VERSION_ID,
27-
KAFKA_TIMESTAMP_COLUMN_NAME,
26+
is_column_ids_dedup, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
27+
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
2828
};
2929
use risingwave_common::types::DataType;
3030
use risingwave_connector::parser::additional_columns::{
@@ -37,6 +37,7 @@ use risingwave_connector::parser::{
3737
use risingwave_connector::schema::schema_registry::{
3838
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
3939
};
40+
use risingwave_connector::sink::iceberg::IcebergConfig;
4041
use risingwave_connector::source::cdc::external::CdcTableType;
4142
use risingwave_connector::source::cdc::{
4243
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
@@ -67,7 +68,6 @@ use thiserror_ext::AsReport;
6768
use super::RwPgResponse;
6869
use crate::binder::Binder;
6970
use crate::catalog::source_catalog::SourceCatalog;
70-
use crate::catalog::ColumnId;
7171
use crate::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError};
7272
use crate::error::{Result, RwError};
7373
use crate::expr::Expr;
@@ -76,7 +76,8 @@ use crate::handler::create_table::{
7676
ensure_table_constraints_supported, ColumnIdGenerator,
7777
};
7878
use crate::handler::util::{
79-
connector_need_pk, get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt,
79+
connector_need_pk, get_connector, is_cdc_connector, is_iceberg_connector, is_kafka_connector,
80+
SourceSchemaCompatExt,
8081
};
8182
use crate::handler::HandlerArgs;
8283
use crate::optimizer::plan_node::generic::SourceNodeKind;
@@ -333,7 +334,6 @@ pub(crate) async fn bind_columns_from_source(
333334

334335
let columns = match (&source_schema.format, &source_schema.row_encode) {
335336
(Format::Native, Encode::Native)
336-
| (Format::None, Encode::None)
337337
| (Format::Plain, Encode::Bytes)
338338
| (Format::DebeziumMongo, Encode::Json) => None,
339339
(Format::Plain, Encode::Protobuf) => {
@@ -444,6 +444,17 @@ pub(crate) async fn bind_columns_from_source(
444444
)
445445
.await?
446446
}
447+
(Format::None, Encode::None) => {
448+
if is_iceberg_connector(with_properties) {
449+
Some(
450+
extract_iceberg_columns(with_properties)
451+
.await
452+
.map_err(|err| ProtocolError(err.to_report_string()))?,
453+
)
454+
} else {
455+
None
456+
}
457+
}
447458
(format, encoding) => {
448459
return Err(RwError::from(ProtocolError(format!(
449460
"Unknown combination {:?} {:?}",
@@ -1158,6 +1169,46 @@ pub(super) fn check_nexmark_schema(
11581169
Ok(())
11591170
}
11601171

1172+
pub async fn extract_iceberg_columns(
1173+
with_properties: &HashMap<String, String>,
1174+
) -> anyhow::Result<Vec<ColumnCatalog>> {
1175+
let props = ConnectorProperties::extract(with_properties.clone(), true)?;
1176+
if let ConnectorProperties::Iceberg(properties) = props {
1177+
let iceberg_config: IcebergConfig = properties.to_iceberg_config();
1178+
let table = iceberg_config.load_table().await?;
1179+
let iceberg_schema: arrow_schema::Schema = table
1180+
.current_table_metadata()
1181+
.current_schema()?
1182+
.clone()
1183+
.try_into()?;
1184+
1185+
let columns = iceberg_schema
1186+
.fields()
1187+
.iter()
1188+
.enumerate()
1189+
.map(|(i, field)| {
1190+
let data_type = field.data_type().clone();
1191+
let column_desc = ColumnDesc::named(
1192+
field.name(),
1193+
ColumnId::new((i as u32).try_into().unwrap()),
1194+
data_type.into(),
1195+
);
1196+
ColumnCatalog {
1197+
column_desc,
1198+
is_hidden: false,
1199+
}
1200+
})
1201+
.collect();
1202+
1203+
Ok(columns)
1204+
} else {
1205+
Err(anyhow!(format!(
1206+
"Invalid properties for iceberg source: {:?}",
1207+
props
1208+
)))
1209+
}
1210+
}
1211+
11611212
pub async fn check_iceberg_source(
11621213
props: &HashMap<String, String>,
11631214
columns: &[ColumnCatalog],

src/sqlparser/src/ast/statement.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,10 @@ impl Format {
133133
"PLAIN" => Format::Plain,
134134
"UPSERT" => Format::Upsert,
135135
"NATIVE" => Format::Native, // used internally for schema change
136+
"NONE" => Format::None, // used by iceberg
136137
_ => {
137138
return Err(ParserError::ParserError(
138-
"expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE after FORMAT"
139+
"expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
139140
.to_string(),
140141
))
141142
}
@@ -186,8 +187,9 @@ impl Encode {
186187
"JSON" => Encode::Json,
187188
"TEMPLATE" => Encode::Template,
188189
"NATIVE" => Encode::Native, // used internally for schema change
190+
"NONE" => Encode::None, // used by iceberg
189191
_ => return Err(ParserError::ParserError(
190-
"expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE after Encode"
192+
"expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | NONE after Encode"
191193
.to_string(),
192194
)),
193195
})

0 commit comments

Comments
 (0)