Skip to content

feat: alter the schema registry of table with connector #15025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 8, 2024
Merged
74 changes: 74 additions & 0 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,61 @@ ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
statement ok
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
ALTER TABLE t_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);

# It's ok to drop columns which are not referenced by downstream tasks
statement ok
ALTER TABLE t_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

# Reference `city`, whose type will be altered in the next statement
statement ok
CREATE MATERIALIZED VIEW t_user_ds_1 AS SELECT t_user.city FROM t_user;

# It's not ok to alter the type of columns which are referenced by downstream tasks
statement error unable to drop the column due to being referenced by downstream materialized views or sinks
ALTER TABLE t_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithNewType'
);

# It's always ok to add more columns
statement ok
ALTER TABLE t_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);

# Reference `age`, a new added column
statement ok
CREATE MATERIALIZED VIEW t_user_ds_2 AS SELECT t_user.age FROM t_user;

# It's not ok to drop columns which are referenced by downstream tasks
statement error unable to drop the column due to being referenced by downstream materialized views or sinks
ALTER TABLE t_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

Expand All @@ -64,6 +119,25 @@ SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields;
----
25 4 0

query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25 4 0 10

query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user_ds_2;
----
25 4 0 10

statement ok
DROP MATERIALIZED VIEW t_user_ds_2;

statement ok
DROP MATERIALIZED VIEW t_user_ds_1;

statement ok
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_user;

Expand Down
125 changes: 88 additions & 37 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
Expand All @@ -29,7 +31,10 @@ use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::util::is_cdc_connector;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result};
use crate::session::SessionImpl;
use crate::{Binder, WithOptions};

fn format_type_to_format(from: FormatType) -> Option<Format> {
Expand Down Expand Up @@ -61,7 +66,7 @@ fn encode_type_to_encode(from: EncodeType) -> Option<Encode> {
/// Returns the columns in `columns_a` but not in `columns_b`,
/// where the comparison is done by name and data type,
/// and hidden columns are ignored.
fn columns_diff(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec<ColumnCatalog> {
fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec<ColumnCatalog> {
columns_a
.iter()
.filter(|col_a| {
Expand All @@ -74,12 +79,11 @@ fn columns_diff(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec
.collect()
}

pub async fn handle_alter_source_with_sr(
handler_args: HandlerArgs,
name: ObjectName,
connector_schema: ConnectorSchema,
) -> Result<RwPgResponse> {
let session = handler_args.session;
/// Fetch the source catalog and the `database/schema_id` of the source.
pub fn fetch_source_catalog_with_db_schema_id(
session: &SessionImpl,
name: &ObjectName,
) -> Result<(Arc<SourceCatalog>, DatabaseId, SchemaId)> {
let db_name = session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, name.clone())?;
Expand All @@ -88,25 +92,26 @@ pub async fn handle_alter_source_with_sr(

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (database_id, schema_id, mut source) = {
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
let db = reader.get_database_by_name(db_name)?;
let schema = db.get_schema_by_name(schema_name).unwrap();

session.check_privilege_for_drop_alter(schema_name, &**source)?;
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
let db = reader.get_database_by_name(db_name)?;
let schema = db.get_schema_by_name(schema_name).unwrap();

(db.id(), schema.id(), (**source).clone())
};
session.check_privilege_for_drop_alter(schema_name, &**source)?;

if source.associated_table_id.is_some() {
bail_not_implemented!("altering source associated with table is not supported yet");
}
Ok((Arc::clone(source), db.id(), schema.id()))
}

/// Check if the original source is created with `FORMAT .. ENCODE ..` clause,
/// and if the FORMAT and ENCODE are modified.
pub fn check_format_encode(
original_source: &SourceCatalog,
new_connector_schema: &ConnectorSchema,
) -> Result<()> {
let StreamSourceInfo {
format, row_encode, ..
} = source.info;
} = original_source.info;
let (Some(old_format), Some(old_row_encode)) = (
format_type_to_format(FormatType::try_from(format).unwrap()),
encode_type_to_encode(EncodeType::try_from(row_encode).unwrap()),
Expand All @@ -119,38 +124,79 @@ pub async fn handle_alter_source_with_sr(
.into());
};

if connector_schema.format != old_format || connector_schema.row_encode != old_row_encode {
if new_connector_schema.format != old_format
|| new_connector_schema.row_encode != old_row_encode
{
bail_not_implemented!(
"the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet",
&old_format,
&old_row_encode,
);
}

if !schema_has_schema_registry(&connector_schema) {
return Err(ErrorCode::NotSupported(
"altering a source without schema registry".to_string(),
"try `ALTER SOURCE .. ADD COLUMNS ...`".to_string(),
)
.into());
}
Ok(())
}

let mut with_properties = source.with_properties.clone().into_iter().collect();
validate_compatibility(&connector_schema, &mut with_properties)?;
/// Refresh the source registry and get the added/dropped columns.
pub async fn refresh_sr_and_get_columns_diff(
original_source: &SourceCatalog,
connector_schema: &ConnectorSchema,
session: &Arc<SessionImpl>,
) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
let mut with_properties = original_source
.with_properties
.clone()
.into_iter()
.collect();
validate_compatibility(connector_schema, &mut with_properties)?;

if is_cdc_connector(&with_properties) {
bail_not_implemented!("altering a cdc source is not supported");
}

let (Some(columns_from_resolve_source), source_info) =
bind_columns_from_source(&session, &connector_schema, &with_properties).await?
bind_columns_from_source(session, connector_schema, &with_properties).await?
else {
// Source without schema registry is rejected.
unreachable!("source without schema registry is rejected")
};

let added_columns = columns_diff(&columns_from_resolve_source, &source.columns);
let dropped_columns = columns_diff(&source.columns, &columns_from_resolve_source);
let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source);

Ok((source_info, added_columns, dropped_columns))
}

pub async fn handle_alter_source_with_sr(
handler_args: HandlerArgs,
name: ObjectName,
connector_schema: ConnectorSchema,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let (source, database_id, schema_id) =
fetch_source_catalog_with_db_schema_id(session.as_ref(), &name)?;
let mut source = source.as_ref().clone();

if source.associated_table_id.is_some() {
return Err(ErrorCode::NotSupported(
"alter table with connector using ALTER SOURCE statement".to_string(),
"try to use ALTER TABLE instead".to_string(),
)
.into());
};

check_format_encode(&source, &connector_schema)?;

if !schema_has_schema_registry(&connector_schema) {
return Err(ErrorCode::NotSupported(
"altering a source without schema registry".to_string(),
"try `ALTER SOURCE .. ADD COLUMN ...` instead".to_string(),
)
.into());
}

let (source_info, added_columns, dropped_columns) =
refresh_sr_and_get_columns_diff(&source, &connector_schema, &session).await?;

if !dropped_columns.is_empty() {
bail_not_implemented!(
Expand Down Expand Up @@ -184,9 +230,10 @@ pub async fn handle_alter_source_with_sr(
Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}

/// Apply the new `format_encode_options` to the source/table definition.
pub fn alter_definition_format_encode(
definition: &str,
row_options: Vec<SqlOption>,
format_encode_options: Vec<SqlOption>,
) -> Result<String> {
let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
let mut stmt = ast
Expand All @@ -197,10 +244,14 @@ pub fn alter_definition_format_encode(
match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
}
| Statement::CreateTable {
source_schema: Some(source_schema),
..
} => {
match source_schema {
CompatibleSourceSchema::V2(schema) => {
schema.row_options = row_options;
schema.row_options = format_encode_options;
}
// TODO: Confirm the behavior of legacy source schema.
// Legacy source schema should be rejected by the handler and never reaches here.
Expand All @@ -222,7 +273,7 @@ pub mod tests {
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
async fn test_alter_source_column_handler() {
async fn test_alter_source_with_sr_handler() {
let proto_file = create_proto_file(PROTO_FILE_DATA);
let sql = format!(
r#"CREATE SOURCE src
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ pub async fn handle_alter_table_column(

if let Some(source_schema) = &source_schema {
if schema_has_schema_registry(source_schema) {
bail_not_implemented!("Alter table with source having schema registry");
return Err(ErrorCode::NotSupported(
"alter table with schema registry".to_string(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_string(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I guess the user may actually want to refresh the schema i.e. pulling from the schema registry to get a latest schema. Do we have such syntax support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have such syntax support?

Yes, that's exactly what this pr has done: impl ALTER TABLE .. FORMAT .. ENCODE .. (schema.xxx = ...).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, he might not want to change the schema.location or message but only do a refresh from the schema registry (because he has updated the schema in the schema registry server before)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. In current impl, users need to repeat the same config as they create the table/source to do a refresh operation. Maybe we can add a shortcut for this purpose:

  1. ALTER ... FORMAT .. ENCODE ..;: empty format_encode_options implies a simple refresh.
  2. ALTER ... REFRESH SCHEMA REGISTRY;: new syntax to avoid confusion in original FORMAT ENCODE syntax.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan to allow users to specify a completely different set of FORMAT and ENCODE when altering a table in the future? If not, I would suggest using a syntax like ALTER ... REFRESH SCHEMA REGISTRY for clarity and convenience.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think they can get all the fields easily from the system table.
Both LGTM.

Copy link
Contributor Author

@Rossil2012 Rossil2012 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a decision here?

FYI, we are going to implement alter source src format xxx encode xxx (connector = 'new_connector'). As users may have a source for history data and another for incremental data, they can continue the consumption by altering the connector.

So we gonna mix schema refresh and connector reset in the same sql, and probably more cases in the future. The semantic can be confusing then. I suggest we designate the sql individually.

Let's start a vote here.

Copy link
Contributor Author

@Rossil2012 Rossil2012 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER SOURCE ... REFRESH SCHEMA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER SOURCE ... FORMAT .. ENCODE ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I believe these two features could be clearly distinguished. 🤔

Refresh schema (or update any other minor configuration fields)

  • The connector is still the same.
  • The state of the source executors is kept.
  • Users would not like to repeat unchanged properties.

Alter connector

  • The connector is replaced with a completely different one.
  • The state of the source executors is reset.
  • Users would not like to reuse any existing properties.

When it comes to designing syntax, I suppose we should choose different syntaxes based on the specific requirements. For example, we may leave the general syntax of ALTER SOURCE ... FORMAT .. ENCODE .. for the latter one. For the former feature, a sugar like REFRESH SCHEMA looks good to me if we don't find any other similar requirements.

)
.into());
}
}

Expand Down
Loading