Skip to content

Commit bafc9e3

Browse files
committed
refactor: we don't need to check_iceberg_source since the schema is loaded
since #15415 Signed-off-by: xxchan <[email protected]>
1 parent f7b7c13 commit bafc9e3

File tree

3 files changed

+4
-61
lines changed

3 files changed

+4
-61
lines changed

src/common/src/catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &
9898
/// [this rfc](https://github.com/risingwavelabs/rfcs/pull/20).
9999
pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
100100

101+
/// RisingWave iceberg table engine will create the column `_risingwave_iceberg_row_id` in the iceberg table..
101102
pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
102103
pub fn is_system_schema(schema_name: &str) -> bool {
103104
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)

src/frontend/src/handler/create_source.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::sync::LazyLock;
1919
use anyhow::{anyhow, Context};
2020
use either::Either;
2121
use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22-
use external_schema::iceberg::check_iceberg_source;
2322
use external_schema::nexmark::check_nexmark_schema;
2423
use itertools::Itertools;
2524
use maplit::{convert_args, hashmap, hashset};
@@ -28,8 +27,7 @@ use rand::Rng;
2827
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
2928
use risingwave_common::bail_not_implemented;
3029
use risingwave_common::catalog::{
31-
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
32-
ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
30+
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, TableId,
3331
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, ROWID_PREFIX,
3432
};
3533
use risingwave_common::license::Feature;
@@ -576,7 +574,7 @@ pub(super) fn bind_source_watermark(
576574
///
577575
/// One should only call this function after all properties of all columns are resolved, like
578576
/// generated column descriptors.
579-
pub(super) async fn check_format_encode(
577+
pub(super) fn check_format_encode(
580578
props: &WithOptionsSecResolved,
581579
row_id_index: Option<usize>,
582580
columns: &[ColumnCatalog],
@@ -587,10 +585,6 @@ pub(super) async fn check_format_encode(
587585

588586
if connector == NEXMARK_CONNECTOR {
589587
check_nexmark_schema(props, row_id_index, columns)
590-
} else if connector == ICEBERG_CONNECTOR {
591-
Ok(check_iceberg_source(props, columns)
592-
.await
593-
.map_err(|err| ProtocolError(err.to_report_string()))?)
594588
} else {
595589
Ok(())
596590
}
@@ -839,7 +833,7 @@ pub async fn bind_create_source_or_table_with_connector(
839833
sql_columns_defs.to_vec(),
840834
&pk_col_ids,
841835
)?;
842-
check_format_encode(&with_properties, row_id_index, &columns).await?;
836+
check_format_encode(&with_properties, row_id_index, &columns)?;
843837

844838
let definition = handler_args.normalized_sql.clone();
845839

src/frontend/src/handler/create_source/external_schema/iceberg.rs

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -53,55 +53,3 @@ pub async fn extract_iceberg_columns(
5353
)))
5454
}
5555
}
56-
57-
pub async fn check_iceberg_source(
58-
props: &WithOptionsSecResolved,
59-
columns: &[ColumnCatalog],
60-
) -> anyhow::Result<()> {
61-
let props = ConnectorProperties::extract(props.clone(), true)?;
62-
let ConnectorProperties::Iceberg(properties) = props else {
63-
return Err(anyhow!(format!(
64-
"Invalid properties for iceberg source: {:?}",
65-
props
66-
)));
67-
};
68-
69-
let schema = Schema {
70-
fields: columns
71-
.iter()
72-
.filter(|&c| {
73-
c.column_desc.name != ICEBERG_SEQUENCE_NUM_COLUMN_NAME
74-
&& c.column_desc.name != ICEBERG_FILE_PATH_COLUMN_NAME
75-
&& c.column_desc.name != ICEBERG_FILE_POS_COLUMN_NAME
76-
})
77-
.cloned()
78-
.map(|c| c.column_desc.into())
79-
.collect(),
80-
};
81-
82-
let table = properties.load_table().await?;
83-
84-
let iceberg_schema =
85-
::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
86-
87-
for f1 in schema.fields() {
88-
if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) {
89-
return Err(anyhow::anyhow!(format!(
90-
"Column {} not found in iceberg table",
91-
f1.name
92-
)));
93-
}
94-
}
95-
96-
let new_iceberg_field = iceberg_schema
97-
.fields
98-
.iter()
99-
.filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name))
100-
.cloned()
101-
.collect::<Vec<_>>();
102-
let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field);
103-
104-
risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;
105-
106-
Ok(())
107-
}

0 commit comments

Comments
 (0)