Skip to content

Commit df6b157

Browse files
authored
feat(parser): support emit_mode (#7783)
The parser part Approved-By: BugenZhao Approved-By: kwannoel
1 parent 2d79894 commit df6b157

File tree

6 files changed

+121
-3
lines changed

6 files changed

+121
-3
lines changed

src/frontend/src/handler/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ pub async fn handle(
316316

317317
with_options: _, // It is put in OptimizerContext
318318
or_replace, // not supported
319+
emit_mode,
319320
} => {
320321
if or_replace {
321322
return Err(ErrorCode::NotImplemented(
@@ -324,6 +325,13 @@ pub async fn handle(
324325
)
325326
.into());
326327
}
328+
if emit_mode == Some(EmitMode::OnWindowClose) {
329+
return Err(ErrorCode::NotImplemented(
330+
"CREATE MATERIALIZED VIEW EMIT ON WINDOW CLOSE".to_string(),
331+
None.into(),
332+
)
333+
.into());
334+
}
327335
if materialized {
328336
create_mv::handle_create_mv(handler_args, name, *query, columns).await
329337
} else {

src/sqlparser/src/ast/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,7 @@ pub enum Statement {
906906
name: ObjectName,
907907
columns: Vec<Ident>,
908908
query: Box<Query>,
909+
emit_mode: Option<EmitMode>,
909910
with_options: Vec<SqlOption>,
910911
},
911912
/// CREATE TABLE
@@ -1236,6 +1237,7 @@ impl fmt::Display for Statement {
12361237
query,
12371238
materialized,
12381239
with_options,
1240+
emit_mode,
12391241
} => {
12401242
write!(
12411243
f,
@@ -1244,6 +1246,9 @@ impl fmt::Display for Statement {
12441246
materialized = if *materialized { "MATERIALIZED " } else { "" },
12451247
name = name
12461248
)?;
1249+
if let Some(emit_mode) = emit_mode {
1250+
write!(f, " EMIT {}", emit_mode)?;
1251+
}
12471252
if !with_options.is_empty() {
12481253
write!(f, " WITH ({})", display_comma_separated(with_options))?;
12491254
}
@@ -1908,6 +1913,22 @@ impl fmt::Display for SqlOption {
19081913
}
19091914
}
19101915

1916+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1917+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1918+
pub enum EmitMode {
1919+
Immediately,
1920+
OnWindowClose,
1921+
}
1922+
1923+
impl fmt::Display for EmitMode {
1924+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1925+
f.write_str(match self {
1926+
EmitMode::Immediately => "IMMEDIATELY",
1927+
EmitMode::OnWindowClose => "ON WINDOW CLOSE",
1928+
})
1929+
}
1930+
}
1931+
19111932
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19121933
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
19131934
pub enum TransactionMode {

src/sqlparser/src/keywords.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ define_keywords!(
198198
EACH,
199199
ELEMENT,
200200
ELSE,
201+
EMIT,
201202
ENCRYPTED,
202203
END,
203204
END_EXEC = "END-EXEC",
@@ -250,6 +251,7 @@ define_keywords!(
250251
IF,
251252
IGNORE,
252253
ILIKE,
254+
IMMEDIATELY,
253255
IMMUTABLE,
254256
IN,
255257
INCLUDE,

src/sqlparser/src/parser.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,11 @@ impl Parser {
16571657
// ANSI SQL and Postgres support RECURSIVE here, but we don't support it either.
16581658
let name = self.parse_object_name()?;
16591659
let columns = self.parse_parenthesized_column_list(Optional)?;
1660+
let emit_mode = if materialized {
1661+
self.parse_emit_mode()?
1662+
} else {
1663+
None
1664+
};
16601665
let with_options = self.parse_options(Keyword::WITH)?;
16611666
self.expect_keyword(Keyword::AS)?;
16621667
let query = Box::new(self.parse_query()?);
@@ -1668,6 +1673,7 @@ impl Parser {
16681673
materialized,
16691674
or_replace,
16701675
with_options,
1676+
emit_mode,
16711677
})
16721678
}
16731679

@@ -2204,6 +2210,25 @@ impl Parser {
22042210
Ok(SqlOption { name, value })
22052211
}
22062212

2213+
pub fn parse_emit_mode(&mut self) -> Result<Option<EmitMode>, ParserError> {
2214+
if self.parse_keyword(Keyword::EMIT) {
2215+
match self.parse_one_of_keywords(&[Keyword::IMMEDIATELY, Keyword::ON]) {
2216+
Some(Keyword::IMMEDIATELY) => Ok(Some(EmitMode::Immediately)),
2217+
Some(Keyword::ON) => {
2218+
self.expect_keywords(&[Keyword::WINDOW, Keyword::CLOSE])?;
2219+
Ok(Some(EmitMode::OnWindowClose))
2220+
}
2221+
Some(_) => unreachable!(),
2222+
None => self.expected(
2223+
"IMMEDIATELY or ON WINDOW CLOSE after EMIT",
2224+
self.peek_token(),
2225+
),
2226+
}
2227+
} else {
2228+
Ok(None)
2229+
}
2230+
}
2231+
22072232
pub fn parse_alter(&mut self) -> Result<Statement, ParserError> {
22082233
if self.parse_keyword(Keyword::TABLE) {
22092234
self.parse_alter_table()

src/sqlparser/tests/sqlparser_common.rs

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2920,13 +2920,15 @@ fn parse_create_view() {
29202920
or_replace,
29212921
materialized,
29222922
with_options,
2923+
emit_mode,
29232924
} => {
29242925
assert_eq!("myschema.myview", name.to_string());
29252926
assert_eq!(Vec::<Ident>::new(), columns);
29262927
assert_eq!("SELECT foo FROM bar", query.to_string());
29272928
assert!(!materialized);
29282929
assert!(!or_replace);
29292930
assert_eq!(with_options, vec![]);
2931+
assert_eq!(emit_mode, None);
29302932
}
29312933
_ => unreachable!(),
29322934
}
@@ -2966,13 +2968,15 @@ fn parse_create_view_with_columns() {
29662968
with_options,
29672969
query,
29682970
materialized,
2971+
emit_mode,
29692972
} => {
29702973
assert_eq!("v", name.to_string());
29712974
assert_eq!(columns, vec![Ident::new("has"), Ident::new("cols")]);
29722975
assert_eq!(with_options, vec![]);
29732976
assert_eq!("SELECT 1, 2", query.to_string());
29742977
assert!(!materialized);
2975-
assert!(!or_replace)
2978+
assert!(!or_replace);
2979+
assert_eq!(emit_mode, None);
29762980
}
29772981
_ => unreachable!(),
29782982
}
@@ -2988,13 +2992,15 @@ fn parse_create_or_replace_view() {
29882992
with_options,
29892993
query,
29902994
materialized,
2995+
emit_mode,
29912996
} => {
29922997
assert_eq!("v", name.to_string());
29932998
assert_eq!(columns, vec![]);
29942999
assert_eq!(with_options, vec![]);
29953000
assert_eq!("SELECT 1", query.to_string());
29963001
assert!(!materialized);
2997-
assert!(or_replace)
3002+
assert!(or_replace);
3003+
assert_eq!(emit_mode, None);
29983004
}
29993005
_ => unreachable!(),
30003006
}
@@ -3015,13 +3021,15 @@ fn parse_create_or_replace_materialized_view() {
30153021
with_options,
30163022
query,
30173023
materialized,
3024+
emit_mode,
30183025
} => {
30193026
assert_eq!("v", name.to_string());
30203027
assert_eq!(columns, vec![]);
30213028
assert_eq!(with_options, vec![]);
30223029
assert_eq!("SELECT 1", query.to_string());
30233030
assert!(materialized);
3024-
assert!(or_replace)
3031+
assert!(or_replace);
3032+
assert_eq!(emit_mode, None);
30253033
}
30263034
_ => unreachable!(),
30273035
}
@@ -3038,13 +3046,66 @@ fn parse_create_materialized_view() {
30383046
query,
30393047
materialized,
30403048
with_options,
3049+
emit_mode,
3050+
} => {
3051+
assert_eq!("myschema.myview", name.to_string());
3052+
assert_eq!(Vec::<Ident>::new(), columns);
3053+
assert_eq!("SELECT foo FROM bar", query.to_string());
3054+
assert!(materialized);
3055+
assert_eq!(with_options, vec![]);
3056+
assert!(!or_replace);
3057+
assert_eq!(emit_mode, None);
3058+
}
3059+
_ => unreachable!(),
3060+
}
3061+
}
3062+
3063+
#[test]
3064+
fn parse_create_materialized_view_emit_immediately() {
3065+
let sql = "CREATE MATERIALIZED VIEW myschema.myview EMIT IMMEDIATELY AS SELECT foo FROM bar";
3066+
match verified_stmt(sql) {
3067+
Statement::CreateView {
3068+
name,
3069+
or_replace,
3070+
columns,
3071+
query,
3072+
materialized,
3073+
with_options,
3074+
emit_mode,
3075+
} => {
3076+
assert_eq!("myschema.myview", name.to_string());
3077+
assert_eq!(Vec::<Ident>::new(), columns);
3078+
assert_eq!("SELECT foo FROM bar", query.to_string());
3079+
assert!(materialized);
3080+
assert_eq!(with_options, vec![]);
3081+
assert!(!or_replace);
3082+
assert_eq!(emit_mode, Some(EmitMode::Immediately));
3083+
}
3084+
_ => unreachable!(),
3085+
}
3086+
}
3087+
3088+
#[test]
3089+
fn parse_create_materialized_view_emit_on_window_close() {
3090+
let sql =
3091+
"CREATE MATERIALIZED VIEW myschema.myview EMIT ON WINDOW CLOSE AS SELECT foo FROM bar";
3092+
match verified_stmt(sql) {
3093+
Statement::CreateView {
3094+
name,
3095+
or_replace,
3096+
columns,
3097+
query,
3098+
materialized,
3099+
with_options,
3100+
emit_mode,
30413101
} => {
30423102
assert_eq!("myschema.myview", name.to_string());
30433103
assert_eq!(Vec::<Ident>::new(), columns);
30443104
assert_eq!("SELECT foo FROM bar", query.to_string());
30453105
assert!(materialized);
30463106
assert_eq!(with_options, vec![]);
30473107
assert!(!or_replace);
3108+
assert_eq!(emit_mode, Some(EmitMode::OnWindowClose));
30483109
}
30493110
_ => unreachable!(),
30503111
}

src/tests/sqlsmith/src/sql_gen/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> {
203203
columns: vec![],
204204
query,
205205
with_options: vec![],
206+
emit_mode: None,
206207
};
207208
(mview, table)
208209
}

0 commit comments

Comments
 (0)