-
Notifications
You must be signed in to change notification settings - Fork 636
feat: implement ALTER SOURCE xx FORMAT xx ENCODE xx (...)
#14057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de82ece
80f8204
61d54c0
eda2ae6
31de53d
24da94c
303537a
a083afb
54b3c75
3afd6a0
9ee5266
a69aacd
b3dacbf
927824a
deac2a4
e0a83bd
0888fba
bb0df04
f2d0e48
bdd194d
27a054b
e10900e
5e153f0
5b0647f
d9ad50c
cd3aa6d
b61005a
71a1dfd
d853491
fcd5c79
b8d5d05
f4909e2
7ca30f0
986ca0d
160da64
313120f
be48d8b
24fdbda
1acea84
e838c70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
# Before running this test, seed data into kafka: | ||
# python3 e2e_test/schema_registry/pb.py <brokerlist> <schema-registry-url> <topic> <num-records> | ||
|
||
statement ok | ||
CREATE SOURCE src_user WITH ( | ||
connector = 'kafka', | ||
topic = 'sr_pb_test', | ||
properties.bootstrap.server = 'message_queue:29092', | ||
scan.startup.mode = 'earliest' | ||
) | ||
FORMAT PLAIN ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the Conflulent Schema Registry supports versioning of schema, that is, one schema (topic) may have multiple proto definition with different versions. For example: https://docs.confluent.io/platform/current/schema-registry/develop/using.html#fetch-version-1-of-the-schema-registered-under-subject-kafka-value. Shall we support specifying an optional There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this feature is not hard to implement. I will try it in the next pr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can always stick to the latest schema version in source. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense to me. We may defer it until being requested by some users. |
||
); | ||
|
||
statement ok | ||
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; | ||
|
||
# Changing type is not allowed | ||
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\) | ||
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.UserWithNewType' | ||
); | ||
|
||
# Changing format/encode is not allowed | ||
statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet | ||
ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' | ||
); | ||
|
||
statement ok | ||
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.UserWithMoreFields' | ||
); | ||
|
||
# Dropping columns is not allowed | ||
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\) | ||
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' | ||
); | ||
|
||
statement ok | ||
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user; | ||
|
||
system ok | ||
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields | ||
|
||
sleep 10s | ||
|
||
query I | ||
SELECT COUNT(*) FROM mv_user; | ||
---- | ||
25 | ||
|
||
statement error | ||
SELECT SUM(age) FROM mv_user; | ||
|
||
query III | ||
SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields; | ||
---- | ||
25 4 0 | ||
|
||
statement ok | ||
DROP MATERIALIZED VIEW mv_user; | ||
|
||
statement ok | ||
DROP MATERIALIZED VIEW mv_more_fields; | ||
|
||
statement ok | ||
DROP SOURCE src_user; |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -525,19 +525,24 @@ fn protobuf_type_mapping( | |
Ok(t) | ||
} | ||
|
||
/// Reference: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format> | ||
/// Wire format for Confluent pb header is: | ||
/// | 0 | 1-4 | 5-x | x+1-end | ||
/// | magic-byte | schema-id | message-indexes | protobuf-payload | ||
pub(crate) fn resolve_pb_header(payload: &[u8]) -> Result<&[u8]> { | ||
// there's a message index array at the front of payload | ||
// if it is the first message in proto def, the array is just and `0` | ||
// TODO: support parsing more complex index array | ||
let (_, remained) = extract_schema_id(payload)?; | ||
// The message indexes are encoded as int using variable-length zig-zag encoding, | ||
// prefixed by the length of the array. | ||
// Note that if the first byte is 0, it is equivalent to (1, 0) as an optimization. | ||
match remained.first() { | ||
Some(0) => Ok(&remained[1..]), | ||
Some(i) => { | ||
Err(RwError::from(ProtocolError(format!("The payload message must be the first message in protobuf schema def, but the message index is {}", i)))) | ||
} | ||
None => { | ||
Err(RwError::from(ProtocolError("The proto payload is empty".to_owned()))) | ||
} | ||
Some(i) => Ok(&remained[(*i as usize)..]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is skipping
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, so the correct implementation is
|
||
None => Err(RwError::from(ProtocolError( | ||
"The proto payload is empty".to_owned(), | ||
))), | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The quotes here are Chinese quotes.. Does it really work???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it seems this function is unused