diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt index dc05f81fc1362..0ba9ae9da5541 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/schema_registry/alter_sr.slt @@ -16,59 +16,68 @@ FORMAT PLAIN ENCODE PROTOBUF( statement ok CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; -# Changing type is not allowed -statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\) -ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://message_queue:8081', - message = 'test.UserWithNewType' -); - -# Changing format/encode is not allowed -statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet -ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF( +statement ok +CREATE TABLE t_user WITH ( + connector = 'kafka', + topic = 'sr_pb_test', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( schema.registry = 'http://message_queue:8081', message = 'test.User' ); -statement ok -ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://message_queue:8081', - message = 'test.UserWithMoreFields' -); +statement error +SELECT age FROM mv_user; -# Dropping columns is not allowed -statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\) +statement error +SELECT age FROM t_user; + +# Push more events with extended fields +system ok +python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields + +sleep 5s + +# Refresh source schema +statement ok ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( schema.registry = 'http://message_queue:8081', message = 'test.User' ); statement ok -CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user; +CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; + +# Refresh table schema +statement ok +ALTER TABLE t_user REFRESH SCHEMA; +query IIII +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; +---- +25 4 0 10 + +# Push more events with extended fields system ok python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields -sleep 10s +sleep 5s -query I -SELECT COUNT(*) FROM mv_user; +query IIII +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; ---- -25 - -statement error -SELECT SUM(age) FROM mv_user; +30 4 0 10 -query III -SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields; ----- -25 4 0 +statement ok +DROP MATERIALIZED VIEW mv_user_more; statement ok -DROP MATERIALIZED VIEW mv_user; +DROP TABLE t_user; statement ok -DROP MATERIALIZED VIEW mv_more_fields; +DROP MATERIALIZED VIEW mv_user; statement ok DROP SOURCE src_user; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/schema_registry/pb.py index 7ca15222e149d..fd6e0dc478b51 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/schema_registry/pb.py @@ -1,6 +1,6 @@ -from protobuf import user_pb2 -from google.protobuf.source_context_pb2 import SourceContext import sys +import importlib +from google.protobuf.source_context_pb2 import SourceContext from confluent_kafka import Producer from confluent_kafka.serialization import ( SerializationContext, @@ -26,7 +26,7 @@ def get_user(i): ) def get_user_with_more_fields(i): - return user_pb2.UserWithMoreFields( + return user_pb2.User( id=i, name="User_{}".format(i), address="Address_{}".format(i), @@ -36,16 +36,6 @@ def get_user_with_more_fields(i): age=i, ) -def get_user_with_new_type(i): - return user_pb2.UserWithNewType( - id=i, - name="User_{}".format(i), - address="Address_{}".format(i), - city=i, - gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE, - sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), - ) - def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): schema_registry_client = SchemaRegistryClient(schema_registry_conf) serializer = ProtobufSerializer( @@ -69,7 +59,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u if __name__ == "__main__": - if len(sys.argv) < 5: + if len(sys.argv) < 6: print("pb.py ") exit(1) @@ -79,10 +69,11 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u num_records = int(sys.argv[4]) pb_message = sys.argv[5] + user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2') + all_pb_messages = { - 'user': (get_user, user_pb2.User), - 'user_with_more_fields': (get_user_with_more_fields, user_pb2.UserWithMoreFields), - 'user_with_new_type': (get_user_with_new_type, user_pb2.UserWithNewType), + 'user': get_user, + 'user_with_more_fields': get_user_with_more_fields, } assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}' @@ -91,7 +82,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u producer_conf = {"bootstrap.servers": broker_list} try: - send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, *all_pb_messages[pb_message]) + send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User) except Exception as e: print("Send Protobuf data to schema registry and kafka failed {}", e) exit(1) diff --git a/e2e_test/schema_registry/protobuf/user.proto b/e2e_test/schema_registry/protobuf/user.proto index bbff4b97bac9c..e6c5f109bbd76 100644 --- a/e2e_test/schema_registry/protobuf/user.proto +++ b/e2e_test/schema_registry/protobuf/user.proto @@ -17,22 +17,3 @@ enum Gender { MALE = 0; FEMALE = 1; } - -message UserWithMoreFields { - int32 id = 1; - string name = 2; - string address = 3; - string city = 4; - Gender gender = 5; - google.protobuf.SourceContext sc = 6; - int32 age = 7; // new field here -} - -message UserWithNewType { - int32 id = 1; - string name = 2; - string address = 3; - int32 city = 4; // change the type from string to int32 - Gender gender = 5; - google.protobuf.SourceContext sc = 6; -} diff --git a/e2e_test/schema_registry/protobuf/user_pb2.py b/e2e_test/schema_registry/protobuf/user_pb2.py index bd7b61e646fb1..a2efcde3f899b 100644 --- a/e2e_test/schema_registry/protobuf/user_pb2.py +++ b/e2e_test/schema_registry/protobuf/user_pb2.py @@ -15,19 +15,15 @@ from google.protobuf import source_context_pb2 as google_dot_protobuf_dot_source__context__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nuser.proto\x12\x04test\x1a$google/protobuf/source_context.proto\"\x89\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\t\x12\x0c\n\x04\x63ity\x18\x04 \x01(\t\x12\x1c\n\x06gender\x18\x05 \x01(\x0e\x32\x0c.test.Gender\x12*\n\x02sc\x18\x06 \x01(\x0b\x32\x1e.google.protobuf.SourceContext\"\xa4\x01\n\x12UserWithMoreFields\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\t\x12\x0c\n\x04\x63ity\x18\x04 \x01(\t\x12\x1c\n\x06gender\x18\x05 \x01(\x0e\x32\x0c.test.Gender\x12*\n\x02sc\x18\x06 \x01(\x0b\x32\x1e.google.protobuf.SourceContext\x12\x0b\n\x03\x61ge\x18\x07 \x01(\x05\"\x94\x01\n\x0fUserWithNewType\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\t\x12\x0c\n\x04\x63ity\x18\x04 \x01(\x05\x12\x1c\n\x06gender\x18\x05 \x01(\x0e\x32\x0c.test.Gender\x12*\n\x02sc\x18\x06 \x01(\x0b\x32\x1e.google.protobuf.SourceContext*\x1e\n\x06Gender\x12\x08\n\x04MALE\x10\x00\x12\n\n\x06\x46\x45MALE\x10\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nuser.proto\x12\x04test\x1a$google/protobuf/source_context.proto\"\x89\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\t\x12\x0c\n\x04\x63ity\x18\x04 \x01(\t\x12\x1c\n\x06gender\x18\x05 \x01(\x0e\x32\x0c.test.Gender\x12*\n\x02sc\x18\x06 \x01(\x0b\x32\x1e.google.protobuf.SourceContext*\x1e\n\x06Gender\x12\x08\n\x04MALE\x10\x00\x12\n\n\x06\x46\x45MALE\x10\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_GENDER']._serialized_start=516 - _globals['_GENDER']._serialized_end=546 + _globals['_GENDER']._serialized_start=198 + _globals['_GENDER']._serialized_end=228 _globals['_USER']._serialized_start=59 _globals['_USER']._serialized_end=196 - _globals['_USERWITHMOREFIELDS']._serialized_start=199 - _globals['_USERWITHMOREFIELDS']._serialized_end=363 - _globals['_USERWITHNEWTYPE']._serialized_start=366 - _globals['_USERWITHNEWTYPE']._serialized_end=514 # @@protoc_insertion_point(module_scope) diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields.proto b/e2e_test/schema_registry/protobuf/user_with_more_fields.proto new file mode 100644 index 0000000000000..69700bb640ccc --- /dev/null +++ b/e2e_test/schema_registry/protobuf/user_with_more_fields.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package test; + +import "google/protobuf/source_context.proto"; + +message User { + int32 id = 1; + string name = 2; + string address = 3; + string city = 4; + Gender gender = 5; + google.protobuf.SourceContext sc = 6; + int32 age = 7; // new field here +} + +enum Gender { + MALE = 0; + FEMALE = 1; +} diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py b/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py new file mode 100644 index 0000000000000..6dda7777112c4 --- /dev/null +++ b/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: user_with_more_fields.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import source_context_pb2 as google_dot_protobuf_dot_source__context__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1buser_with_more_fields.proto\x12\x04test\x1a$google/protobuf/source_context.proto\"\x96\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\t\x12\x0c\n\x04\x63ity\x18\x04 \x01(\t\x12\x1c\n\x06gender\x18\x05 \x01(\x0e\x32\x0c.test.Gender\x12*\n\x02sc\x18\x06 \x01(\x0b\x32\x1e.google.protobuf.SourceContext\x12\x0b\n\x03\x61ge\x18\x07 \x01(\x05*\x1e\n\x06Gender\x12\x08\n\x04MALE\x10\x00\x12\n\n\x06\x46\x45MALE\x10\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_with_more_fields_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_GENDER']._serialized_start=228 + _globals['_GENDER']._serialized_end=258 + _globals['_USER']._serialized_start=76 + _globals['_USER']._serialized_end=226 +# @@protoc_insertion_point(module_scope) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 06bb2d0387479..fc35552270a2e 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::bail_not_implemented; @@ -29,7 +31,10 @@ use super::create_source::{bind_columns_from_source, validate_compatibility}; use super::util::is_cdc_connector; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::source_catalog::SourceCatalog; +use crate::catalog::{DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result}; +use crate::session::SessionImpl; use crate::{Binder, WithOptions}; fn format_type_to_format(from: FormatType) -> Option { @@ -63,7 +68,7 @@ fn encode_type_to_encode(from: EncodeType) -> Option { /// Returns the columns in `columns_a` but not in `columns_b`, /// where the comparison is done by name and data type, /// and hidden columns are ignored. -fn columns_diff(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { +fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { columns_a .iter() .filter(|col_a| { @@ -76,12 +81,11 @@ fn columns_diff(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec .collect() } -pub async fn handle_alter_source_with_sr( - handler_args: HandlerArgs, - name: ObjectName, - connector_schema: ConnectorSchema, -) -> Result { - let session = handler_args.session; +/// Fetch the source catalog and the `database/schema_id` of the source. +pub fn fetch_source_catalog_with_db_schema_id( + session: &SessionImpl, + name: &ObjectName, +) -> Result<(Arc, DatabaseId, SchemaId)> { let db_name = session.database(); let (schema_name, real_source_name) = Binder::resolve_schema_qualified_name(db_name, name.clone())?; @@ -90,25 +94,26 @@ pub async fn handle_alter_source_with_sr( let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - let (database_id, schema_id, mut source) = { - let reader = session.env().catalog_reader().read_guard(); - let (source, schema_name) = - reader.get_source_by_name(db_name, schema_path, &real_source_name)?; - let db = reader.get_database_by_name(db_name)?; - let schema = db.get_schema_by_name(schema_name).unwrap(); - - session.check_privilege_for_drop_alter(schema_name, &**source)?; + let reader = session.env().catalog_reader().read_guard(); + let (source, schema_name) = + reader.get_source_by_name(db_name, schema_path, &real_source_name)?; + let db = reader.get_database_by_name(db_name)?; + let schema = db.get_schema_by_name(schema_name).unwrap(); - (db.id(), schema.id(), (**source).clone()) - }; + session.check_privilege_for_drop_alter(schema_name, &**source)?; - if source.associated_table_id.is_some() { - bail_not_implemented!("altering source associated with table is not supported yet"); - } + Ok((Arc::clone(source), db.id(), schema.id())) +} +/// Check if the original source is created with `FORMAT .. ENCODE ..` clause, +/// and if the FORMAT and ENCODE are modified. +pub fn check_format_encode( + original_source: &SourceCatalog, + new_connector_schema: &ConnectorSchema, +) -> Result<()> { let StreamSourceInfo { format, row_encode, .. - } = source.info; + } = original_source.info; let (Some(old_format), Some(old_row_encode)) = ( format_type_to_format(FormatType::try_from(format).unwrap()), encode_type_to_encode(EncodeType::try_from(row_encode).unwrap()), @@ -121,7 +126,9 @@ pub async fn handle_alter_source_with_sr( .into()); }; - if connector_schema.format != old_format || connector_schema.row_encode != old_row_encode { + if new_connector_schema.format != old_format + || new_connector_schema.row_encode != old_row_encode + { bail_not_implemented!( "the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet", &old_format, @@ -129,30 +136,69 @@ pub async fn handle_alter_source_with_sr( ); } - if !schema_has_schema_registry(&connector_schema) { - return Err(ErrorCode::NotSupported( - "altering a source without schema registry".to_string(), - "try `ALTER SOURCE .. ADD COLUMNS ...`".to_string(), - ) - .into()); - } + Ok(()) +} - let mut with_properties = source.with_properties.clone().into_iter().collect(); - validate_compatibility(&connector_schema, &mut with_properties)?; +/// Refresh the source registry and get the added/dropped columns. +pub async fn refresh_sr_and_get_columns_diff( + original_source: &SourceCatalog, + connector_schema: &ConnectorSchema, + session: &Arc, +) -> Result<(StreamSourceInfo, Vec, Vec)> { + let mut with_properties = original_source + .with_properties + .clone() + .into_iter() + .collect(); + validate_compatibility(connector_schema, &mut with_properties)?; if is_cdc_connector(&with_properties) { bail_not_implemented!("altering a cdc source is not supported"); } let (Some(columns_from_resolve_source), source_info) = - bind_columns_from_source(&session, &connector_schema, &with_properties).await? + bind_columns_from_source(session, connector_schema, &with_properties).await? else { // Source without schema registry is rejected. unreachable!("source without schema registry is rejected") }; - let added_columns = columns_diff(&columns_from_resolve_source, &source.columns); - let dropped_columns = columns_diff(&source.columns, &columns_from_resolve_source); + let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); + + Ok((source_info, added_columns, dropped_columns)) +} + +pub async fn handle_alter_source_with_sr( + handler_args: HandlerArgs, + name: ObjectName, + connector_schema: ConnectorSchema, +) -> Result { + let session = handler_args.session; + let (source, database_id, schema_id) = + fetch_source_catalog_with_db_schema_id(session.as_ref(), &name)?; + let mut source = source.as_ref().clone(); + + if source.associated_table_id.is_some() { + return Err(ErrorCode::NotSupported( + "alter table with connector using ALTER SOURCE statement".to_string(), + "try to use ALTER TABLE instead".to_string(), + ) + .into()); + }; + + check_format_encode(&source, &connector_schema)?; + + if !schema_has_schema_registry(&connector_schema) { + return Err(ErrorCode::NotSupported( + "altering a source without schema registry".to_string(), + "try `ALTER SOURCE .. ADD COLUMN ...` instead".to_string(), + ) + .into()); + } + + let (source_info, added_columns, dropped_columns) = + refresh_sr_and_get_columns_diff(&source, &connector_schema, &session).await?; if !dropped_columns.is_empty() { bail_not_implemented!( @@ -186,9 +232,10 @@ pub async fn handle_alter_source_with_sr( Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } +/// Apply the new `format_encode_options` to the source/table definition. pub fn alter_definition_format_encode( definition: &str, - row_options: Vec, + format_encode_options: Vec, ) -> Result { let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); let mut stmt = ast @@ -199,10 +246,14 @@ pub fn alter_definition_format_encode( match &mut stmt { Statement::CreateSource { stmt: CreateSourceStatement { source_schema, .. }, + } + | Statement::CreateTable { + source_schema: Some(source_schema), + .. } => { match source_schema { CompatibleSourceSchema::V2(schema) => { - schema.row_options = row_options; + schema.row_options = format_encode_options; } // TODO: Confirm the behavior of legacy source schema. // Legacy source schema should be rejected by the handler and never reaches here. @@ -224,7 +275,7 @@ pub mod tests { use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA}; #[tokio::test] - async fn test_alter_source_column_handler() { + async fn test_alter_source_with_sr_handler() { let proto_file = create_proto_file(PROTO_FILE_DATA); let sql = format!( r#"CREATE SOURCE src diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 8446f3ffeb83d..dc100bf4c2a59 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -34,6 +34,65 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::session::SessionImpl; use crate::{Binder, TableCatalog, WithOptions}; +pub async fn replace_table_with_definition( + session: &Arc, + table_name: ObjectName, + definition: Statement, + original_catalog: &Arc, + source_schema: Option, +) -> Result<()> { + // Create handler args as if we're creating a new table with the altered definition. + let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; + let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); + let Statement::CreateTable { + columns, + constraints, + source_watermarks, + append_only, + wildcard_idx, + .. + } = definition + else { + panic!("unexpected statement type: {:?}", definition); + }; + + let (graph, table, source) = generate_stream_graph_for_table( + session, + table_name, + original_catalog, + source_schema, + handler_args, + col_id_gen, + columns, + wildcard_idx, + constraints, + source_watermarks, + append_only, + ) + .await?; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + original_catalog + .columns() + .iter() + .map(|old_c| { + table.columns.iter().position(|new_c| { + new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() + }) + }) + .collect(), + table.columns.len(), + ); + + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping) + .await?; + Ok(()) +} + /// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or /// `DropColumn`. pub async fn handle_alter_table_column( @@ -74,7 +133,11 @@ pub async fn handle_alter_table_column( if let Some(source_schema) = &source_schema { if schema_has_schema_registry(source_schema) { - bail_not_implemented!("Alter table with source having schema registry"); + return Err(ErrorCode::NotSupported( + "alter table with schema registry".to_string(), + "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_string(), + ) + .into()); } } @@ -145,56 +208,15 @@ pub async fn handle_alter_table_column( _ => unreachable!(), } - // Create handler args as if we're creating a new table with the altered definition. - let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; - let col_id_gen = ColumnIdGenerator::new_alter(&original_catalog); - let Statement::CreateTable { - columns, - constraints, - source_watermarks, - append_only, - wildcard_idx, - .. - } = definition - else { - panic!("unexpected statement type: {:?}", definition); - }; - - let (graph, table, source) = generate_stream_graph_for_table( + replace_table_with_definition( &session, table_name, + definition, &original_catalog, source_schema, - handler_args, - col_id_gen, - columns, - wildcard_idx, - constraints, - source_watermarks, - append_only, ) .await?; - // Calculate the mapping from the original columns to the new columns. - let col_index_mapping = ColIndexMapping::new( - original_catalog - .columns() - .iter() - .map(|old_c| { - table.columns.iter().position(|new_c| { - new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() - }) - }) - .collect(), - table.columns.len(), - ); - - let catalog_writer = session.catalog_writer()?; - - catalog_writer - .replace_table(source, table, graph, col_index_mapping) - .await?; - Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs new file mode 100644 index 0000000000000..1b7b09e4cb8ed --- /dev/null +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use pgwire::pg_response::StatementType; +use risingwave_common::bail_not_implemented; +use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement}; +use risingwave_sqlparser::parser::Parser; + +use super::alter_source_with_sr::alter_definition_format_encode; +use super::alter_table_column::{fetch_table_catalog_for_alter, replace_table_with_definition}; +use super::util::SourceSchemaCompatExt; +use super::{HandlerArgs, RwPgResponse}; +use crate::error::{ErrorCode, Result, RwError}; +use crate::TableCatalog; + +fn get_connector_schema_from_table(table: &TableCatalog) -> Result> { + let [stmt]: [_; 1] = Parser::parse_sql(&table.definition) + .context("unable to parse original table definition")? + .try_into() + .unwrap(); + let Statement::CreateTable { source_schema, .. } = stmt else { + unreachable!() + }; + Ok(source_schema.map(|schema| schema.into_v2_with_warning())) +} + +pub async fn handle_refresh_schema( + handler_args: HandlerArgs, + table_name: ObjectName, +) -> Result { + let session = handler_args.session; + let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; + + if !original_table.incoming_sinks.is_empty() { + bail_not_implemented!("alter table with incoming sinks"); + } + + // TODO(yuhao): alter table with generated columns. + if original_table.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Alter a table with generated column has not been implemented.".to_string(), + ))); + } + + let connector_schema = + get_connector_schema_from_table(&original_table)?.ok_or(ErrorCode::NotSupported( + "Tables without schema registry cannot refreshed".to_string(), + "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_string(), + ))?; + + let definition = alter_definition_format_encode( + &original_table.definition, + connector_schema.row_options.clone(), + )?; + + let [definition]: [_; 1] = Parser::parse_sql(&definition) + .context("unable to parse original table definition")? + .try_into() + .unwrap(); + + let source_schema = get_connector_schema_from_table(&original_table)?; + replace_table_with_definition( + &session, + table_name, + definition, + &original_table, + source_schema, + ) + .await?; + + Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE)) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index a8cb4c03e1bd2..18c2b2d480f9b 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -47,6 +47,7 @@ mod alter_source_column; mod alter_source_with_sr; mod alter_system; mod alter_table_column; +mod alter_table_with_sr; pub mod alter_user; pub mod cancel_job; mod comment; @@ -598,6 +599,10 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::RefreshSchema, + } => alter_table_with_sr::handle_refresh_schema(handler_args, name).await, Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 59f74a3ad3acb..11074ad4aaab1 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -49,9 +49,13 @@ pub enum AlterTableOperation { /// `ADD ` AddConstraint(TableConstraint), /// `ADD [ COLUMN ] ` - AddColumn { column_def: ColumnDef }, + AddColumn { + column_def: ColumnDef, + }, /// TODO: implement `DROP CONSTRAINT ` - DropConstraint { name: Ident }, + DropConstraint { + name: Ident, + }, /// `DROP [ COLUMN ] [ IF EXISTS ] [ CASCADE ]` DropColumn { column_name: Ident, @@ -64,7 +68,9 @@ pub enum AlterTableOperation { new_column_name: Ident, }, /// `RENAME TO ` - RenameTable { table_name: ObjectName }, + RenameTable { + table_name: ObjectName, + }, // CHANGE [ COLUMN ] [ ] ChangeColumn { old_name: Ident, @@ -75,21 +81,29 @@ pub enum AlterTableOperation { /// `RENAME CONSTRAINT TO ` /// /// Note: this is a PostgreSQL-specific operation. - RenameConstraint { old_name: Ident, new_name: Ident }, + RenameConstraint { + old_name: Ident, + new_name: Ident, + }, /// `ALTER [ COLUMN ]` AlterColumn { column_name: Ident, op: AlterColumnOperation, }, /// `OWNER TO ` - ChangeOwner { new_owner_name: Ident }, + ChangeOwner { + new_owner_name: Ident, + }, /// `SET SCHEMA ` - SetSchema { new_schema_name: ObjectName }, + SetSchema { + new_schema_name: ObjectName, + }, /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, deferred: bool, }, + RefreshSchema, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -263,6 +277,9 @@ impl fmt::Display for AlterTableOperation { if *deferred { " DEFERRED" } else { "" } ) } + AlterTableOperation::RefreshSchema => { + write!(f, "REFRESH SCHEMA") + } } } } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 1b73edc1150da..dac019fe5e126 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -276,7 +276,7 @@ impl Parser { } } - /// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SOURCE` and `CREATE SINK`. + /// Parse `FORMAT ... ENCODE ... (...)`. pub fn parse_schema(&mut self) -> Result, ParserError> { if !self.parse_keyword(Keyword::FORMAT) { return Ok(None); diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index a3cc9013a21ef..d674f32582fe0 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -404,6 +404,7 @@ define_keywords!( REF, REFERENCES, REFERENCING, + REFRESH, REGCLASS, REGISTRY, REGPROC, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 22f0350024141..cb5385b1e90b8 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3069,6 +3069,8 @@ impl Parser { ); }; AlterTableOperation::AlterColumn { column_name, op } + } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { + AlterTableOperation::RefreshSchema } else { return self.expected( "ADD or RENAME or OWNER TO or SET or DROP after ALTER TABLE",