-
Notifications
You must be signed in to change notification settings - Fork 636
feat: implement ALTER SOURCE xx FORMAT xx ENCODE xx (...)
#14057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
de82ece
sql parser
Rossil2012 80f8204
refresh sr without dropping columns
Rossil2012 61d54c0
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 eda2ae6
fix datatype matching
Rossil2012 31de53d
e2e test
Rossil2012 24da94c
fmt
Rossil2012 303537a
fix test
Rossil2012 a083afb
update version
Rossil2012 54b3c75
rename to alter_source_with_sr
Rossil2012 3afd6a0
fix comments
Rossil2012 9ee5266
fix comments
Rossil2012 a69aacd
fix test
Rossil2012 b3dacbf
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 927824a
fix copyright
Rossil2012 deac2a4
fix
Rossil2012 e0a83bd
fix test
Rossil2012 0888fba
build proto
Rossil2012 bb0df04
drop source after test
Rossil2012 f2d0e48
more tests
Rossil2012 bdd194d
test for ingesting data
Rossil2012 27a054b
try fix
Rossil2012 e10900e
try fix
Rossil2012 5e153f0
fix format
Rossil2012 5b0647f
fix case
Rossil2012 d9ad50c
fix sink syntax
Rossil2012 cd3aa6d
force_append_only
Rossil2012 b61005a
add pb schema
Rossil2012 71a1dfd
enum as int
Rossil2012 d853491
default gender field
Rossil2012 fcd5c79
support to parse not only the first msg in proto
Rossil2012 b8d5d05
fix test
Rossil2012 f4909e2
fix test
Rossil2012 7ca30f0
fix test
Rossil2012 986ca0d
fix test
Rossil2012 160da64
register sr with curl
Rossil2012 313120f
test mv before alter
Rossil2012 be48d8b
fix test
Rossil2012 24fdbda
Merge branch 'main' into kanzhen/alter-source-format-encode
Rossil2012 1acea84
Update src/frontend/src/handler/alter_source_with_sr.rs
Rossil2012 e838c70
Update src/frontend/src/handler/alter_source_with_sr.rs
Rossil2012 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# Before running this test, seed data into kafka: | ||
# python3 e2e_test/schema_registry/pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_type> | ||
|
||
statement ok | ||
CREATE SOURCE src_user WITH ( | ||
connector = 'kafka', | ||
topic = 'sr_pb_test', | ||
properties.bootstrap.server = 'message_queue:29092', | ||
scan.startup.mode = 'earliest' | ||
) | ||
FORMAT plain ENCODE protobuf( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' | ||
); | ||
|
||
# Changing type is not allowed | ||
statement error | ||
ALTER SOURCE src_user FORMAT plain ENCODE protobuf( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.UserNewType' | ||
); | ||
|
||
statement ok | ||
ALTER SOURCE src_user FORMAT plain ENCODE protobuf( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.UserMore' | ||
); | ||
|
||
# Dropping columns is not allowed | ||
statement error | ||
ALTER SOURCE src_user FORMAT plain ENCODE protobuf( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' | ||
); | ||
|
||
# Changing format/encode is not allowed | ||
statement error | ||
ALTER SOURCE src_user FORMAT json ENCODE protobuf( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.UserMore' | ||
); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the Conflulent Schema Registry supports versioning of schema, that is, one schema (topic) may have multiple proto definition with different versions. For example: https://docs.confluent.io/platform/current/schema-registry/develop/using.html#fetch-version-1-of-the-schema-registered-under-subject-kafka-value. Shall we support specifying an optional
version
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this feature is not hard to implement. I will try it in the next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can always stick to the latest schema version in source.
Here we assume the upstream workers always use the latest schema version (writer schema). It is not a strong assumption because if no producer uses a new schema, the schema version on the Schema Registry will not be updated.
Schema registry guarantee backward compatibility, RW can get all it needs if
reader schema version <= writer schema version
, ie. there must be some producers generate data with equal or higher version of schema.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. We may defer it until being requested by some users.