Skip to content

Commit c12996f

Browse files
authored
feat(dml): add not null support (#20611)
1 parent a5e6243 commit c12996f

File tree

20 files changed

+325
-31
lines changed

20 files changed

+325
-31
lines changed

e2e_test/ddl/not_null.slt

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
### Test batch input
2+
3+
statement ok
4+
CREATE TABLE test_struct (
5+
id INT PRIMARY KEY,
6+
info STRUCT <first_name VARCHAR, last_name VARCHAR> NOT NULL
7+
);
8+
9+
statement ok
10+
INSERT INTO test_struct (id, info) VALUES (1, ('Alice', 'Smith'));
11+
12+
statement error null value in column "info" of relation "test_struct" violates not-null constraint
13+
INSERT INTO test_struct (id, info) VALUES (2, NULL);
14+
15+
statement ok
16+
FLUSH;
17+
18+
statement error null value in column "info" of relation "test_struct" violates not-null constraint
19+
UPDATE test_struct SET info = NULL WHERE id = 1;
20+
21+
statement ok
22+
INSERT INTO test_struct (id, info)
23+
SELECT 3, ROW('John', 'Doe')::STRUCT <first_name VARCHAR, last_name VARCHAR>;
24+
25+
statement error null value in column "info" of relation "test_struct" violates not-null constraint
26+
INSERT INTO test_struct (id, info)
27+
SELECT 4, NULL::STRUCT <first_name VARCHAR, last_name VARCHAR>;
28+
29+
statement ok
30+
UPDATE test_struct SET info = (SELECT ROW('Jane', 'Smith')::STRUCT <first_name VARCHAR, last_name VARCHAR>) WHERE id = 1;
31+
32+
statement error null value in column "info" of relation "test_struct" violates not-null constraint
33+
UPDATE test_struct SET info = (SELECT NULL::STRUCT <first_name VARCHAR, last_name VARCHAR>) WHERE id = 1;
34+
35+
statement ok
36+
FLUSH;
37+
38+
query TT
39+
SELECT COUNT(*) FROM test_struct;
40+
----
41+
2
42+
43+
statement ok
44+
DROP TABLE test_struct;
45+
46+
statement ok
47+
CREATE TABLE test_array (
48+
id INT PRIMARY KEY,
49+
numbers INT[] NOT NULL
50+
);
51+
52+
statement ok
53+
INSERT INTO test_array (id, numbers) VALUES (1, ARRAY[1, 2, 3]);
54+
55+
statement error null value in column "numbers" of relation "test_array" violates not-null constraint
56+
INSERT INTO test_array (id, numbers) VALUES (2, NULL);
57+
58+
statement ok
59+
FLUSH;
60+
61+
statement error null value in column "numbers" of relation "test_array" violates not-null constraint
62+
UPDATE test_array SET numbers = NULL WHERE id = 1;
63+
64+
statement ok
65+
UPDATE test_array SET numbers = ARRAY[]::INT[] WHERE id = 1;
66+
67+
statement ok
68+
INSERT INTO test_array (id, numbers)
69+
SELECT 3, ARRAY[4, 5, 6];
70+
71+
statement error null value in column "numbers" of relation "test_array" violates not-null constraint
72+
INSERT INTO test_array (id, numbers)
73+
SELECT 4, NULL::INT[];
74+
75+
statement ok
76+
UPDATE test_array SET numbers = (SELECT ARRAY[7,8,9]) WHERE id = 1;
77+
78+
statement error null value in column "numbers" of relation "test_array" violates not-null constraint
79+
UPDATE test_array SET numbers = (SELECT NULL::INT[]) WHERE id = 1;
80+
81+
statement ok
82+
FLUSH;
83+
84+
query TT
85+
SELECT COUNT(*) FROM test_array;
86+
----
87+
2
88+
89+
statement ok
90+
DROP TABLE test_array;
91+
92+
### Test stream input
93+
94+
statement ok
95+
CREATE TABLE test_person (id INT PRIMARY KEY NOT NULL, name VARCHAR NULL, age INT NOT NULL, height FLOAT NOT NULL);
96+
97+
statement ok
98+
INSERT INTO test_person (id, name, age, height) VALUES (1, 'Alice', 30, 5.5);
99+
100+
statement ok
101+
INSERT INTO test_person (id, name, age, height) VALUES (2, NULL, 25, 6.0);
102+
103+
statement error null value in column "id" of relation "test_person" violates not-null constraint
104+
INSERT INTO test_person (id, name, age, height) VALUES (NULL, 'Bob', 40, 5.8);
105+
106+
statement error null value in column "age" of relation "test_person" violates not-null constraint
107+
INSERT INTO test_person (id, name, age, height) VALUES (3, 'Charlie', NULL, 6.1);
108+
109+
statement error null value in column "height" of relation "test_person" violates not-null constraint
110+
INSERT INTO test_person (id, name, age, height) VALUES (4, 'Diana', 35, NULL);
111+
112+
statement ok
113+
FLUSH;
114+
115+
query TT
116+
SELECT COUNT(*) FROM test_person;
117+
----
118+
2
119+
120+
statement ok
121+
CREATE TABLE test_person_source (id INT PRIMARY KEY, name VARCHAR NULL, age INT, height FLOAT);
122+
123+
statement ok
124+
INSERT INTO test_person_source (id, name, age, height) VALUES
125+
(101, 'John', 28, 5.9),
126+
(102, 'Sarah', NULL, 5.5),
127+
(103, NULL, 40, 6.1),
128+
(104, 'Michael', 35, NULL),
129+
(105, 'Emma', 29, 5.4);
130+
131+
statement ok
132+
CREATE SINK test_person_sink INTO test_person AS SELECT * FROM test_person_source;
133+
134+
statement ok
135+
FLUSH;
136+
137+
query TT
138+
SELECT * FROM test_person ORDER BY id;
139+
----
140+
1 Alice 30 5.5
141+
2 NULL 25 6
142+
101 John 28 5.9
143+
102 Sarah NULL 5.5
144+
103 NULL 40 6.1
145+
104 Michael 35 NULL
146+
105 Emma 29 5.4
147+
148+
statement ok
149+
DROP SINK test_person_sink;
150+
151+
statement ok
152+
DROP TABLE test_person_source;
153+
154+
statement ok
155+
DROP TABLE test_person;

e2e_test/ddl/table/table.slt.part

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,12 @@ drop table "T2"
126126
statement error
127127
create table C1 (c1 varchar(5));
128128

129-
statement error
129+
statement ok
130130
create table t (v1 int not null);
131131

132+
statement ok
133+
drop table t;
134+
132135
statement error
133136
create table t (v1 varchar collate "en_US");
134137

proto/expr.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ message ExprNode {
201201
HMAC = 332;
202202
SECURE_COMPARE = 333;
203203

204+
// Constraints Check
205+
CHECK_NOT_NULL = 350;
206+
204207
// Unary operators
205208
NEG = 401;
206209
// Nested selection operators

proto/plan_common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message ColumnDesc {
6060
ColumnDescVersion version = 10;
6161

6262
AdditionalColumn additional_column = 11;
63+
64+
bool nullable = 12;
6365
}
6466

6567
message ColumnCatalog {

src/batch/executors/src/executor/insert.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ use itertools::Itertools;
2121
use risingwave_common::array::{
2222
ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, SerialArray, StreamChunk,
2323
};
24-
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
24+
use risingwave_common::catalog::{Schema, TableId, TableVersionId};
2525
use risingwave_common::transaction::transaction_id::TxnId;
26-
use risingwave_common::types::DataType;
2726
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
2827
use risingwave_dml::dml_manager::DmlManagerRef;
2928
use risingwave_expr::expr::{BoxedExpression, build_from_prost};
@@ -78,13 +77,7 @@ impl InsertExecutor {
7877
dml_manager,
7978
child,
8079
chunk_size,
81-
schema: if returning {
82-
table_schema
83-
} else {
84-
Schema {
85-
fields: vec![Field::unnamed(DataType::Serial)],
86-
}
87-
},
80+
schema: table_schema,
8881
identity,
8982
column_indices,
9083
sorted_default_columns,
@@ -137,6 +130,7 @@ impl InsertExecutor {
137130
.enumerate()
138131
.map(|(i, idx)| (*idx, columns[i].clone()))
139132
.collect_vec();
133+
140134
ordered_columns.reserve(ordered_columns.len() + self.sorted_default_columns.len());
141135

142136
for (idx, expr) in &self.sorted_default_columns {
@@ -269,10 +263,10 @@ mod tests {
269263
use futures::StreamExt;
270264
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
271265
use risingwave_common::catalog::{
272-
ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
266+
ColumnDesc, ColumnId, Field, INITIAL_TABLE_VERSION_ID, schema_test_utils,
273267
};
274268
use risingwave_common::transaction::transaction_message::TxnMsg;
275-
use risingwave_common::types::StructType;
269+
use risingwave_common::types::{DataType, StructType};
276270
use risingwave_dml::dml_manager::DmlManager;
277271
use risingwave_storage::hummock::CachePolicy;
278272
use risingwave_storage::hummock::test_utils::*;

src/common/src/catalog/column.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ pub struct ColumnDesc {
123123
/// Currently the system column is used for `_rw_timestamp` only and is generated at runtime,
124124
/// so this field is not persisted.
125125
pub system_column: Option<SystemColumn>,
126+
/// Whether the column is nullable. Only applies to BatchInsert/BatchUpdate operations into tables.
127+
pub nullable: bool,
126128
}
127129

128130
impl ColumnDesc {
@@ -140,6 +142,7 @@ impl ColumnDesc {
140142
additional_column: AdditionalColumn { column_type: None },
141143
version: ColumnDescVersion::LATEST,
142144
system_column: None,
145+
nullable: true,
143146
}
144147
}
145148

@@ -199,6 +202,7 @@ impl ColumnDesc {
199202
additional_column_type: 0, // deprecated
200203
additional_column: Some(self.additional_column.clone()),
201204
version: self.version as i32,
205+
nullable: self.nullable,
202206
}
203207
}
204208

@@ -235,6 +239,7 @@ impl From<PbColumnDesc> for ColumnDesc {
235239
additional_column,
236240
version,
237241
system_column: None,
242+
nullable: prost.nullable,
238243
}
239244
}
240245
}
@@ -313,6 +318,11 @@ impl ColumnCatalog {
313318
&self.column_desc.data_type
314319
}
315320

321+
/// Get nullable info of the column.
322+
pub fn nullable(&self) -> bool {
323+
self.column_desc.nullable
324+
}
325+
316326
/// Get the column desc's column id.
317327
pub fn column_id(&self) -> ColumnId {
318328
self.column_desc.column_id

src/common/src/catalog/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ impl ColumnDescTestExt for ColumnDesc {
3030
name: name.to_owned(),
3131
additional_column: Some(AdditionalColumn { column_type: None }),
3232
version: ColumnDescVersion::LATEST as _,
33+
nullable: true,
3334
..Default::default()
3435
}
3536
}

src/connector/src/parser/plain_parser.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ mod tests {
513513
},
514514
version: Pr13707,
515515
system_column: None,
516+
nullable: true,
516517
},
517518
is_hidden: false,
518519
},
@@ -528,6 +529,7 @@ mod tests {
528529
},
529530
version: Pr13707,
530531
system_column: None,
532+
nullable: true,
531533
},
532534
is_hidden: false,
533535
},
@@ -543,6 +545,7 @@ mod tests {
543545
},
544546
version: Pr13707,
545547
system_column: None,
548+
nullable: true,
546549
},
547550
is_hidden: false,
548551
},

src/connector/src/source/manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl From<&ColumnDesc> for SourceColumnDesc {
130130
description: _,
131131
version: _,
132132
system_column: _,
133+
nullable: _,
133134
}: &ColumnDesc,
134135
) -> Self {
135136
if let Some(option) = generated_or_default_column {
@@ -182,6 +183,7 @@ impl From<&SourceColumnDesc> for ColumnDesc {
182183
description: None,
183184
version: ColumnDescVersion::LATEST,
184185
system_column: None,
186+
nullable: true,
185187
}
186188
}
187189
}

src/expr/core/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ pub enum ExprError {
108108
#[error("too few arguments for format()")]
109109
TooFewArguments,
110110

111+
#[error(
112+
"null value in column \"{col_name}\" of relation \"{table_name}\" violates not-null constraint"
113+
)]
114+
NotNullViolation {
115+
col_name: Box<str>,
116+
table_name: Box<str>,
117+
},
118+
111119
#[error("invalid state: {0}")]
112120
InvalidState(String),
113121

src/expr/impl/src/scalar/cmp.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use risingwave_common::array::{Array, BoolArray};
1919
use risingwave_common::bitmap::Bitmap;
2020
use risingwave_common::row::Row;
2121
use risingwave_common::types::{Scalar, ScalarRef, ScalarRefImpl};
22-
use risingwave_expr::function;
22+
use risingwave_expr::{ExprError, Result, function};
2323

2424
#[function("equal(boolean, boolean) -> boolean", batch_fn = "boolarray_eq")]
2525
#[function("equal(*int, *int) -> boolean")]
@@ -431,6 +431,21 @@ pub fn secure_compare(left: &str, right: &str) -> bool {
431431
constant_time_eq(left.as_bytes(), right.as_bytes())
432432
}
433433

434+
#[function("check_not_null(any, varchar, varchar) -> any")]
435+
fn check_not_null<'a>(
436+
v: Option<ScalarRefImpl<'a>>,
437+
col_name: &str,
438+
relation_name: &str,
439+
) -> Result<Option<ScalarRefImpl<'a>>> {
440+
if v.is_none() {
441+
return Err(ExprError::NotNullViolation {
442+
col_name: col_name.into(),
443+
table_name: relation_name.into(),
444+
});
445+
}
446+
Ok(v)
447+
}
448+
434449
#[cfg(test)]
435450
mod tests {
436451
use std::str::FromStr;

0 commit comments

Comments
 (0)