Skip to content

feat: implement ALTER SOURCE xx FORMAT xx ENCODE xx (...) #14057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
de82ece
sql parser
Rossil2012 Dec 19, 2023
80f8204
refresh sr without dropping columns
Rossil2012 Dec 22, 2023
61d54c0
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 Dec 22, 2023
eda2ae6
fix datatype matching
Rossil2012 Dec 27, 2023
31de53d
e2e test
Rossil2012 Jan 5, 2024
24da94c
fmt
Rossil2012 Jan 5, 2024
303537a
fix test
Rossil2012 Jan 5, 2024
a083afb
update version
Rossil2012 Jan 5, 2024
54b3c75
rename to alter_source_with_sr
Rossil2012 Jan 5, 2024
3afd6a0
fix comments
Rossil2012 Jan 5, 2024
9ee5266
fix comments
Rossil2012 Jan 5, 2024
a69aacd
fix test
Rossil2012 Jan 5, 2024
b3dacbf
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 Jan 5, 2024
927824a
fix copyright
Rossil2012 Jan 5, 2024
deac2a4
fix
Rossil2012 Jan 5, 2024
e0a83bd
fix test
Rossil2012 Jan 5, 2024
0888fba
build proto
Rossil2012 Jan 5, 2024
bb0df04
drop source after test
Rossil2012 Jan 5, 2024
f2d0e48
more tests
Rossil2012 Jan 8, 2024
bdd194d
test for ingesting data
Rossil2012 Jan 8, 2024
27a054b
try fix
Rossil2012 Jan 8, 2024
e10900e
try fix
Rossil2012 Jan 8, 2024
5e153f0
fix format
Rossil2012 Jan 8, 2024
5b0647f
fix case
Rossil2012 Jan 8, 2024
d9ad50c
fix sink syntax
Rossil2012 Jan 8, 2024
cd3aa6d
force_append_only
Rossil2012 Jan 8, 2024
b61005a
add pb schema
Rossil2012 Jan 8, 2024
71a1dfd
enum as int
Rossil2012 Jan 8, 2024
d853491
default gender field
Rossil2012 Jan 8, 2024
fcd5c79
support to parse not only the first msg in proto
Rossil2012 Jan 9, 2024
b8d5d05
fix test
Rossil2012 Jan 9, 2024
f4909e2
fix test
Rossil2012 Jan 9, 2024
7ca30f0
fix test
Rossil2012 Jan 9, 2024
986ca0d
fix test
Rossil2012 Jan 9, 2024
160da64
register sr with curl
Rossil2012 Jan 10, 2024
313120f
test mv before alter
Rossil2012 Jan 10, 2024
be48d8b
fix test
Rossil2012 Jan 11, 2024
24fdbda
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 Jan 11, 2024
1acea84
Update src/frontend/src/handler/alter_source_with_sr.rs
Rossil2012 Jan 17, 2024
e838c70
Update src/frontend/src/handler/alter_source_with_sr.rs
Rossil2012 Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ set -euo pipefail

source ci/scripts/common.sh

# Arguments:
# $1: subject name
# $2: schema file path
function register_schema_registry() {
curl -X POST http://message_queue:8081/subjects/$1/versions \
-H ‘Content-Type: application/vnd.schemaregistry.v1+json’ \
--data-binary @<(jq -n --arg schema “$(cat $2)” ‘{schemaType: “PROTOBUF”, schema: $schema}’)
}
Comment on lines +11 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quotes here are Chinese quotes.. Does it really work???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it seems this function is unused


# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

Expand Down Expand Up @@ -115,12 +124,13 @@ export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe
python3 -m pip install requests protobuf confluent-kafka
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20 user
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://message_queue:8081/subjects'; echo
# curl --silent --head -X GET 'http://message_queue:8081/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://message_queue:8081/subjects' | grep -v 'google/protobuf/source_context.proto'
sqllogictest -p 4566 -d dev './e2e_test/schema_registry/pb.slt'
sqllogictest -p 4566 -d dev './e2e_test/schema_registry/alter_sr.slt'

echo "--- Kill cluster"
cargo make ci-kill
Expand Down
74 changes: 74 additions & 0 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Before running this test, seed data into kafka:
# python3 e2e_test/schema_registry/pb.py <brokerlist> <schema-registry-url> <topic> <num-records>

statement ok
CREATE SOURCE src_user WITH (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
Copy link
Collaborator

@fuyufjh fuyufjh Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the Conflulent Schema Registry supports versioning of schema, that is, one schema (topic) may have multiple proto definition with different versions. For example: https://docs.confluent.io/platform/current/schema-registry/develop/using.html#fetch-version-1-of-the-schema-registered-under-subject-kafka-value. Shall we support specifying an optional version here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this feature is not hard to implement. I will try it in the next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can always stick to the latest schema version in source.
Here we assume the upstream workers always use the latest schema version (writer schema). It is not a strong assumption because if no producer uses a new schema, the schema version on the Schema Registry will not be updated.
Schema registry guarantee backward compatibility, RW can get all it needs if reader schema version <= writer schema version, ie. there must be some producers generate data with equal or higher version of schema.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. We may defer it until being requested by some users.

);

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

# Changing type is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\)
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithNewType'
);

# Changing format/encode is not allowed
statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet
ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);

# Dropping columns is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\)
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user;

system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 10s

query I
SELECT COUNT(*) FROM mv_user;
----
25

statement error
SELECT SUM(age) FROM mv_user;

query III
SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields;
----
25 4 0

statement ok
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP MATERIALIZED VIEW mv_more_fields;

statement ok
DROP SOURCE src_user;
41 changes: 35 additions & 6 deletions e2e_test/schema_registry/pb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,38 @@ def get_user(i):
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)

def get_user_with_more_fields(i):
return user_pb2.UserWithMoreFields(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city="City_{}".format(i),
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
age=i,
)

def get_user_with_new_type(i):
return user_pb2.UserWithNewType(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city=i,
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)

def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records):
def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
user_pb2.User,
pb_message,
schema_registry_client,
{"use.deprecated.format": False, 'skip.known.types': True},
)

producer = Producer(producer_conf)
for i in range(num_records):
user = get_user(i)
user = get_user_fn(i)

producer.produce(
topic=topic,
Expand All @@ -49,20 +69,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records):


if __name__ == "__main__":
if len(sys.argv) < 4:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records>")
if len(sys.argv) < 5:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>")
exit(1)

broker_list = sys.argv[1]
schema_registry_url = sys.argv[2]
topic = sys.argv[3]
num_records = int(sys.argv[4])
pb_message = sys.argv[5]

all_pb_messages = {
'user': (get_user, user_pb2.User),
'user_with_more_fields': (get_user_with_more_fields, user_pb2.UserWithMoreFields),
'user_with_new_type': (get_user_with_new_type, user_pb2.UserWithNewType),
}

assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}'

schema_registry_conf = {"url": schema_registry_url}
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records)
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, *all_pb_messages[pb_message])
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)
2 changes: 1 addition & 1 deletion e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Before running this test, seed data into kafka:
# python3 e2e_test/schema_registry/pb.py
# python3 e2e_test/schema_registry/pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>

# Create a table.
statement ok
Expand Down
19 changes: 19 additions & 0 deletions e2e_test/schema_registry/protobuf/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,22 @@ enum Gender {
MALE = 0;
FEMALE = 1;
}

message UserWithMoreFields {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

message UserWithNewType {
int32 id = 1;
string name = 2;
string address = 3;
int32 city = 4; // change the type from string to int32
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
}
12 changes: 8 additions & 4 deletions e2e_test/schema_registry/protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -680,10 +680,10 @@ select count(*) from s16
----
0

statement error Feature is not yet implemented: Alter source with schema registry
statement error Not supported: alter source with schema registry
alter source s18 add column v10 int;

statement error Feature is not yet implemented: Alter source with schema registry
statement error Not supported: alter source with schema registry
alter source s17 add column v10 int;

query III rowsort
Expand Down
17 changes: 11 additions & 6 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,19 +525,24 @@ fn protobuf_type_mapping(
Ok(t)
}

/// Reference: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
/// Wire format for Confluent pb header is:
/// | 0 | 1-4 | 5-x | x+1-end
/// | magic-byte | schema-id | message-indexes | protobuf-payload
pub(crate) fn resolve_pb_header(payload: &[u8]) -> Result<&[u8]> {
// there's a message index array at the front of payload
// if it is the first message in proto def, the array is just and `0`
// TODO: support parsing more complex index array
let (_, remained) = extract_schema_id(payload)?;
// The message indexes are encoded as int using variable-length zig-zag encoding,
// prefixed by the length of the array.
// Note that if the first byte is 0, it is equivalent to (1, 0) as an optimization.
match remained.first() {
Some(0) => Ok(&remained[1..]),
Some(i) => {
Err(RwError::from(ProtocolError(format!("The payload message must be the first message in protobuf schema def, but the message index is {}", i))))
}
None => {
Err(RwError::from(ProtocolError("The proto payload is empty".to_owned())))
}
Some(i) => Ok(&remained[(*i as usize)..]),
Copy link
Contributor

@xiangjinwu xiangjinwu Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is skipping i bytes rather than skipping i variable-length zig-zag encoded integers. For i within 1..=63, the encoding outputs a single byte so this happens to work.

prefixed by the length of the array (which is also variable length, Zigzag encoded)

i itself is also encoded so the decoded length shall be i / 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, so the correct implementation is

        Some(i) => Ok(&remained[(1 + *i / 2) as usize..]),

None => Err(RwError::from(ProtocolError(
"The proto payload is empty".to_owned(),
))),
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>;

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>;

async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism)
-> Result<()>;

Expand Down Expand Up @@ -495,6 +497,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source_with_sr(source).await?;
self.wait_version(version).await
}

async fn alter_parallelism(
&self,
table_id: u32,
Expand Down
17 changes: 12 additions & 5 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
Expand Down Expand Up @@ -63,15 +62,23 @@ pub async fn handle_alter_source_column(
let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
match encode {
SourceEncode::Avro | SourceEncode::Protobuf => {
bail_not_implemented!("Alter source with schema registry")
return Err(ErrorCode::NotSupported(
"alter source with schema registry".to_string(),
"try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_string(),
)
.into());
}
SourceEncode::Json if catalog.info.use_schema_registry => {
bail_not_implemented!("Alter source with schema registry")
return Err(ErrorCode::NotSupported(
"alter source with schema registry".to_string(),
"try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_string(),
)
.into());
}
SourceEncode::Invalid | SourceEncode::Native => {
return Err(RwError::from(ErrorCode::NotSupported(
format!("Alter source with encode {:?}", encode),
"Alter source with encode JSON | BYTES | CSV".into(),
format!("alter source with encode {:?}", encode),
"alter source with encode JSON | BYTES | CSV".into(),
)));
}
_ => {}
Expand Down
Loading