Skip to content

Commit 1b8fa46

Browse files
authored
Merge branch 'main' into yuhao/handle_join_watermark
2 parents 4dce18a + b63df21 commit 1b8fa46

File tree

14 files changed

+204
-130
lines changed

14 files changed

+204
-130
lines changed

src/compute/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#![feature(type_alias_impl_trait)]
1919
#![feature(let_chains)]
2020
#![feature(result_option_inspect)]
21-
#![feature(allocator_api)]
21+
#![feature(lint_reasons)]
2222
#![cfg_attr(coverage, feature(no_coverage))]
2323

2424
#[macro_use]

src/compute/src/memory_management/memory_manager.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,24 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::atomic::{AtomicU64, Ordering};
15+
use std::sync::atomic::AtomicU64;
1616
use std::sync::Arc;
17+
#[cfg(target_os = "linux")]
1718
use std::time::Duration;
1819

1920
use risingwave_batch::task::BatchManager;
21+
#[cfg(target_os = "linux")]
2022
use risingwave_common::util::epoch::Epoch;
2123
use risingwave_stream::executor::monitor::StreamingMetrics;
2224
use risingwave_stream::task::LocalStreamManager;
2325
#[cfg(target_os = "linux")]
2426
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};
27+
#[cfg(target_os = "linux")]
2528
use tracing;
2629

2730
/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
2831
/// the memory usage.
32+
#[cfg_attr(not(target_os = "linux"), expect(dead_code))]
2933
pub struct GlobalMemoryManager {
3034
/// All cached data before the watermark should be evicted.
3135
watermark_epoch: Arc<AtomicU64>,
@@ -39,7 +43,9 @@ pub struct GlobalMemoryManager {
3943
pub type GlobalMemoryManagerRef = Arc<GlobalMemoryManager>;
4044

4145
impl GlobalMemoryManager {
46+
#[cfg(target_os = "linux")]
4247
const EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
48+
#[cfg(target_os = "linux")]
4349
const EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
4450

4551
pub fn new(
@@ -63,7 +69,10 @@ impl GlobalMemoryManager {
6369
self.watermark_epoch.clone()
6470
}
6571

72+
#[cfg(target_os = "linux")]
6673
fn set_watermark_time_ms(&self, time_ms: u64) {
74+
use std::sync::atomic::Ordering;
75+
6776
let epoch = Epoch::from_physical_time(time_ms).0;
6877
let watermark_epoch = self.watermark_epoch.as_ref();
6978
watermark_epoch.store(epoch, Ordering::Relaxed);

src/expr/src/vector_op/cast.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,40 @@ pub fn bool_out(input: bool, writer: &mut dyn Write) -> Result<()> {
397397
Ok(())
398398
}
399399

400+
/// A lite version of casting from string to target type. Used by frontend to handle types that have
401+
/// to be created by casting.
402+
///
403+
/// For example, the user can input `1` or `true` directly, but they have to use
404+
/// `'2022-01-01'::date`.
405+
pub fn literal_parsing(
406+
t: &DataType,
407+
s: &str,
408+
) -> std::result::Result<ScalarImpl, Option<ExprError>> {
409+
let scalar = match t {
410+
DataType::Boolean => str_to_bool(s)?.into(),
411+
DataType::Int16 => str_parse::<i16>(s)?.into(),
412+
DataType::Int32 => str_parse::<i32>(s)?.into(),
413+
DataType::Int64 => str_parse::<i64>(s)?.into(),
414+
DataType::Decimal => str_parse::<Decimal>(s)?.into(),
415+
DataType::Float32 => str_parse::<OrderedF32>(s)?.into(),
416+
DataType::Float64 => str_parse::<OrderedF64>(s)?.into(),
417+
DataType::Varchar => return Err(None),
418+
DataType::Date => str_to_date(s)?.into(),
419+
DataType::Timestamp => str_to_timestamp(s)?.into(),
420+
// We only handle the case with timezone here, and leave the implicit session timezone case
421+
// for later phase.
422+
DataType::Timestamptz => str_with_time_zone_to_timestamptz(s)?.into(),
423+
DataType::Time => str_to_time(s)?.into(),
424+
DataType::Interval => str_parse::<IntervalUnit>(s)?.into(),
425+
// Not processing list or struct literal right now. Leave it for later phase (normal backend
426+
// evaluation).
427+
DataType::List { .. } => return Err(None),
428+
DataType::Struct(_) => return Err(None),
429+
DataType::Bytea => str_to_bytea(s)?.into(),
430+
};
431+
Ok(scalar)
432+
}
433+
400434
/// It accepts a macro whose input is `{ $input:ident, $cast:ident, $func:expr }` tuples
401435
///
402436
/// * `$input`: input type

src/frontend/planner_test/tests/testdata/basic_query.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
- sql: |
127127
select * from generate_series('2'::INT,'10'::INT,'2'::INT);
128128
batch_plan: |
129-
BatchTableFunction { Generate('2':Varchar::Int32, '10':Varchar::Int32, '2':Varchar::Int32) }
129+
BatchTableFunction { Generate(2:Int32, 10:Int32, 2:Int32) }
130130
- sql: |
131131
select * from unnest(Array[1,2,3]);
132132
batch_plan: |

src/frontend/planner_test/tests/testdata/cast.yaml

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,19 @@
5353
select 1 having 'y';
5454
batch_plan: |
5555
BatchProject { exprs: [1:Int32] }
56-
└─BatchFilter { predicate: 'y':Varchar::Boolean }
57-
└─BatchSimpleAgg { aggs: [] }
58-
└─BatchValues { rows: [[]] }
56+
└─BatchSimpleAgg { aggs: [] }
57+
└─BatchValues { rows: [[]] }
5958
- name: implicit cast boolean (WHERE with literal 'y' of unknown type)
6059
sql: |
6160
select 1 where 'y';
6261
batch_plan: |
6362
BatchProject { exprs: [1:Int32] }
64-
└─BatchFilter { predicate: 'y':Varchar::Boolean }
65-
└─BatchValues { rows: [[]] }
63+
└─BatchValues { rows: [[]] }
6664
- name: implicit cast boolean (CASE with literal 'y' of unknown type)
6765
sql: |
6866
select case when 'y' then 1 end;
6967
batch_plan: |
70-
BatchProject { exprs: [Case('y':Varchar::Boolean, 1:Int32)] }
68+
BatchProject { exprs: [Case(true:Boolean, 1:Int32)] }
7169
└─BatchValues { rows: [[]] }
7270
- name: implicit cast boolean (JOIN ON with literal 'y' of unknown type)
7371
sql: |
@@ -77,23 +75,21 @@
7775
batch_plan: |
7876
BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
7977
├─BatchExchange { order: [], dist: Single }
80-
| └─BatchFilter { predicate: 'y':Varchar::Boolean }
81-
| └─BatchScan { table: t1, columns: [t1.v1], distribution: SomeShard }
78+
| └─BatchScan { table: t1, columns: [t1.v1], distribution: SomeShard }
8279
└─BatchExchange { order: [], dist: Single }
83-
└─BatchFilter { predicate: 'y':Varchar::Boolean }
84-
└─BatchScan { table: t2, columns: [], distribution: SomeShard }
80+
└─BatchScan { table: t2, columns: [], distribution: SomeShard }
8581
- name: current_schemas (CURRENT_SCHEMAS with literal 'y' of unknown type)
8682
sql: |
8783
select current_schemas('y');
88-
binder_error: |-
89-
Feature is not yet implemented: Only boolean literals are supported in `current_schemas`.
90-
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
84+
batch_plan: |
85+
BatchProject { exprs: [ARRAY[pg_catalog, public]:List { datatype: Varchar }] }
86+
└─BatchValues { rows: [[]] }
9187
- name: FILTER (FILTER with literal 'y' of unknown type)
9288
sql: |
9389
create table t(v1 int);
9490
select count(*) FILTER(WHERE 'y') from t;
9591
batch_plan: |
96-
BatchSimpleAgg { aggs: [sum0(count filter('y':Varchar::Boolean))] }
92+
BatchSimpleAgg { aggs: [sum0(count)] }
9793
└─BatchExchange { order: [], dist: Single }
98-
└─BatchSimpleAgg { aggs: [count filter('y':Varchar::Boolean)] }
94+
└─BatchSimpleAgg { aggs: [count] }
9995
└─BatchScan { table: t, columns: [], distribution: SomeShard }

src/frontend/planner_test/tests/testdata/expr.yaml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
sql: |
44
select int '1';
55
logical_plan: |
6-
LogicalProject { exprs: ['1':Varchar::Int32] }
6+
LogicalProject { exprs: [1:Int32] }
77
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
88
- name: bind typed literal - bool
99
sql: |
1010
SELECT bool 't'
1111
logical_plan: |
12-
LogicalProject { exprs: ['t':Varchar::Boolean] }
12+
LogicalProject { exprs: [true:Boolean] }
1313
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
1414
- sql: |
1515
values(must_be_unimplemented_func(1));
@@ -143,14 +143,17 @@
143143
- sql: |
144144
-- Single quoted literal can be treated as number without error.
145145
values(round('123'));
146-
-- When it is invalid, PostgreSQL reports error during explain, but we have to wait until execution as of now.
147-
-- values(round('abc'));
148146
batch_plan: |
149-
BatchValues { rows: [[Round('123':Varchar::Float64)]] }
147+
BatchValues { rows: [[Round(123:Float64)]] }
148+
- sql: |
149+
-- When it is invalid, PostgreSQL reports error during explain, but we have to wait until execution as of now. #4235
150+
values(round('abc'));
151+
batch_plan: |
152+
BatchValues { rows: [[Round('abc':Varchar::Float64)]] }
150153
- sql: |
151154
values(extract(hour from timestamp '2001-02-16 20:38:40'));
152155
batch_plan: |
153-
BatchValues { rows: [[Extract('HOUR':Varchar, '2001-02-16 20:38:40':Varchar::Timestamp)]] }
156+
BatchValues { rows: [[Extract('HOUR':Varchar, '2001-02-16 20:38:40':Timestamp)]] }
154157
- sql: |
155158
values('Postgres' not like 'Post%');
156159
batch_plan: |

src/frontend/planner_test/tests/testdata/insert.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
batch_plan: |
1515
BatchExchange { order: [], dist: Single }
1616
└─BatchInsert { table: t }
17-
└─BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
17+
└─BatchValues { rows: [[22.33:Decimal::Float32, 33:Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
1818
- name: insert values on non-assign-castable types
1919
sql: |
2020
create table t (v1 real, v2 int);
@@ -73,7 +73,7 @@
7373
batch_plan: |
7474
BatchExchange { order: [], dist: Single }
7575
└─BatchInsert { table: t }
76-
└─BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] }
76+
└─BatchValues { rows: [['2020-01-01 01:02:03':Timestamp::Time], ['03:04:05':Time]] }
7777
- name: a `VALUES` without insert context may be invalid on its own (compare with above)
7878
sql: |
7979
create table t (v1 time);
@@ -115,7 +115,7 @@
115115
BatchExchange { order: [], dist: Single }
116116
└─BatchInsert { table: t }
117117
└─BatchExchange { order: [], dist: Single }
118-
└─BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
118+
└─BatchProject { exprs: ['2020-01-01 01:02:03':Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
119119
└─BatchScan { table: t, columns: [], distribution: SomeShard }
120120
- name: insert into select with cast error
121121
sql: |

src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,30 +53,30 @@
5353
AND window_start >= ts + interval '1' day AND window_end > ts + interval '4' day;
5454
logical_plan: |
5555
LogicalProject { exprs: [t.v1, t.v2, t.v3, t.v4, t.ts, window_start, window_end] }
56-
└─LogicalFilter { predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Varchar::Date) AND (window_start >= '1997-07-02':Varchar::Date) AND (window_end >= '1997-07-03':Varchar::Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) }
56+
└─LogicalFilter { predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Date) AND (window_start >= '1997-07-02':Date) AND (window_end >= '1997-07-03':Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) }
5757
└─LogicalShare { id = 4 }
5858
└─LogicalProject { exprs: [t.v1, t.v2, t.v3, t.v4, t.ts, window_start, window_end] }
5959
└─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all }
6060
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] }
6161
optimized_logical_plan: |
62-
LogicalFilter { predicate: (window_start >= '1997-07-02':Varchar::Date) AND (window_end >= '1997-07-03':Varchar::Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) }
62+
LogicalFilter { predicate: (window_start >= '1997-07-02':Date) AND (window_end >= '1997-07-03':Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) }
6363
└─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all }
64-
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Varchar::Date) }
64+
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Date) }
6565
- name: filter hop transpose with non-trivial output-indices
6666
sql: |
6767
create table t(v1 int, v2 int, v3 int, v4 int, ts date);
6868
with cte as (select window_end, v4, v2 from hop(t, ts, interval '1' day, interval '3' day))
6969
select * from cte where window_end > date '2022-01-01' AND v4=10 AND v2 > 20
7070
logical_plan: |
7171
LogicalProject { exprs: [window_end, t.v4, t.v2] }
72-
└─LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) AND (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) }
72+
└─LogicalFilter { predicate: (window_end > '2022-01-01':Date) AND (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) }
7373
└─LogicalShare { id = 4 }
7474
└─LogicalProject { exprs: [window_end, t.v4, t.v2] }
7575
└─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all }
7676
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] }
7777
optimized_logical_plan: |
7878
LogicalProject { exprs: [window_end, t.v4, t.v2] }
79-
└─LogicalFilter { predicate: (window_end > '2022-01-01':Varchar::Date) }
79+
└─LogicalFilter { predicate: (window_end > '2022-01-01':Date) }
8080
└─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: [t.v2, t.v4, window_end] }
8181
└─LogicalScan { table: t, columns: [t.v2, t.v4, t.ts], predicate: (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) }
8282
- name: filter union transpose
@@ -257,16 +257,16 @@
257257
select * from t1 join t2 where v1 = v2 and v1 > now() + '1 hr';
258258
optimized_logical_plan: |
259259
LogicalJoin { type: Inner, on: (t1.v1 = t2.v2), output: all }
260-
├─LogicalFilter { predicate: (t1.v1 > (Now + '1 hr':Varchar::Interval)) }
260+
├─LogicalFilter { predicate: (t1.v1 > (Now + '01:00:00':Interval)) }
261261
| └─LogicalScan { table: t1, columns: [t1.v1] }
262262
└─LogicalScan { table: t2, columns: [t2.v2] }
263263
stream_plan: |
264264
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], pk_columns: [t1._row_id, t2._row_id, v1, v2] }
265265
└─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] }
266266
├─StreamExchange { dist: HashShard(t1.v1) }
267-
| └─StreamDynamicFilter { predicate: (t1.v1 > (now + '1 hr':Varchar::Interval)), output: [t1.v1, t1._row_id] }
267+
| └─StreamDynamicFilter { predicate: (t1.v1 > (now + '01:00:00':Interval)), output: [t1.v1, t1._row_id] }
268268
| ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
269-
| └─StreamProject { exprs: [(now + '1 hr':Varchar::Interval)] }
269+
| └─StreamProject { exprs: [(now + '01:00:00':Interval)], watermark_columns: [(now + '01:00:00':Interval)] }
270270
| └─StreamNow { output: [now] }
271271
└─StreamExchange { dist: HashShard(t2.v2) }
272272
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
@@ -297,14 +297,14 @@
297297
create table t1(v1 timestamp with time zone, v2 int);
298298
select * from t1 where v1 > now() + '30 min' and v2 > 5;
299299
optimized_logical_plan: |
300-
LogicalFilter { predicate: (t1.v1 > (Now + '30 min':Varchar::Interval)) }
300+
LogicalFilter { predicate: (t1.v1 > (Now + '00:30:00':Interval)) }
301301
└─LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v2 > 5:Int32) }
302302
stream_plan: |
303303
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden)], pk_columns: [t1._row_id] }
304-
└─StreamDynamicFilter { predicate: (t1.v1 > (now + '30 min':Varchar::Interval)), output: [t1.v1, t1.v2, t1._row_id] }
304+
└─StreamDynamicFilter { predicate: (t1.v1 > (now + '00:30:00':Interval)), output: [t1.v1, t1.v2, t1._row_id] }
305305
├─StreamFilter { predicate: (t1.v2 > 5:Int32) }
306306
| └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
307-
└─StreamProject { exprs: [(now + '30 min':Varchar::Interval)] }
307+
└─StreamProject { exprs: [(now + '00:30:00':Interval)], watermark_columns: [(now + '00:30:00':Interval)] }
308308
└─StreamNow { output: [now] }
309309
- name: eq-predicate derived condition other side pushdown in inner join
310310
sql: |

src/frontend/planner_test/tests/testdata/project_set.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
- sql: |
33
select generate_series('2'::INT,'10'::INT,'2'::INT);
44
batch_plan: |
5-
BatchProject { exprs: [Generate('2':Varchar::Int32, '10':Varchar::Int32, '2':Varchar::Int32)] }
6-
└─BatchProjectSet { select_list: [Generate('2':Varchar::Int32, '10':Varchar::Int32, '2':Varchar::Int32)] }
5+
BatchProject { exprs: [Generate(2:Int32, 10:Int32, 2:Int32)] }
6+
└─BatchProjectSet { select_list: [Generate(2:Int32, 10:Int32, 2:Int32)] }
77
└─BatchValues { rows: [[]] }
88
- sql: |
99
select unnest(Array[1,2,3]);

src/frontend/planner_test/tests/testdata/time_window.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@
190190
LogicalProject { exprs: [*VALUES*_0.column_0, TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval), (TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) + '00:00:10':Interval)] }
191191
└─LogicalProject { exprs: [*VALUES*_0.column_0, TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval), (TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) + '00:00:10':Interval)] }
192192
└─LogicalShare { id = 2 }
193-
└─LogicalValues { rows: [['2020-01-01 12:00:00':Varchar::Timestamp]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamp] } }
193+
└─LogicalValues { rows: [['2020-01-01 12:00:00':Timestamp]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamp] } }
194194
batch_plan: |
195195
BatchProject { exprs: [*VALUES*_0.column_0, TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval), (TumbleStart(*VALUES*_0.column_0, '00:00:10':Interval) + '00:00:10':Interval)] }
196-
└─BatchValues { rows: [['2020-01-01 12:00:00':Varchar::Timestamp]] }
196+
└─BatchValues { rows: [['2020-01-01 12:00:00':Timestamp]] }

0 commit comments

Comments
 (0)