Skip to content

Commit 5ba1874

Browse files
authored
refactor(frontend): derive create_type when generating the table catalog (#16827)
1 parent 8b5a7bc commit 5ba1874

File tree

5 files changed

+45
-31
lines changed

5 files changed

+45
-31
lines changed

src/frontend/src/handler/create_mv.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ use either::Either;
1616
use itertools::Itertools;
1717
use pgwire::pg_response::{PgResponse, StatementType};
1818
use risingwave_common::acl::AclMode;
19-
use risingwave_pb::catalog::{CreateType, PbTable};
19+
use risingwave_pb::catalog::PbTable;
2020
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
21-
use risingwave_pb::stream_plan::StreamScanType;
2221
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
2322

2423
use super::privilege::resolve_relation_privileges;
@@ -167,7 +166,7 @@ pub async fn handle_create_mv(
167166
return Ok(resp);
168167
}
169168

170-
let (mut table, graph, can_run_in_background) = {
169+
let (table, graph) = {
171170
let context = OptimizerContext::from_handler_args(handler_args);
172171
if !context.with_options().is_empty() {
173172
// get other useful fields by `remove`, the logic here is to reject unknown options.
@@ -186,21 +185,7 @@ It only indicates the physical clustering of the data, which may improve the per
186185

187186
let (plan, table) =
188187
gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?;
189-
// All leaf nodes must be stream table scan, no other scan operators support recovery.
190-
fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool {
191-
if plan.inputs().is_empty() {
192-
if let Some(scan) = plan.as_stream_table_scan() {
193-
scan.stream_scan_type() == StreamScanType::Backfill
194-
|| scan.stream_scan_type() == StreamScanType::ArrangementBackfill
195-
} else {
196-
false
197-
}
198-
} else {
199-
assert!(!plan.inputs().is_empty());
200-
plan.inputs().iter().all(plan_has_backfill_leaf_nodes)
201-
}
202-
}
203-
let can_run_in_background = plan_has_backfill_leaf_nodes(&plan);
188+
204189
let context = plan.plan_base().ctx().clone();
205190
let mut graph = build_graph(plan)?;
206191
graph.parallelism =
@@ -214,7 +199,7 @@ It only indicates the physical clustering of the data, which may improve the per
214199
let ctx = graph.ctx.as_mut().unwrap();
215200
ctx.timezone = context.get_session_timezone();
216201

217-
(table, graph, can_run_in_background)
202+
(table, graph)
218203
};
219204

220205
// Ensure writes to `StreamJobTracker` are atomic.
@@ -229,14 +214,6 @@ It only indicates the physical clustering of the data, which may improve the per
229214
table.name.clone(),
230215
));
231216

232-
let run_in_background = session.config().background_ddl();
233-
let create_type = if run_in_background && can_run_in_background {
234-
CreateType::Background
235-
} else {
236-
CreateType::Foreground
237-
};
238-
table.create_type = create_type.into();
239-
240217
let session = session.clone();
241218
let catalog_writer = session.catalog_writer()?;
242219
catalog_writer

src/frontend/src/optimizer/plan_node/stream_materialize.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
3333
use crate::error::Result;
3434
use crate::optimizer::plan_node::derive::derive_pk;
3535
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36+
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
3637
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
3738
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
3839
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -84,6 +85,14 @@ impl StreamMaterialize {
8485
let input = reorganize_elements_id(input);
8586
let columns = derive_columns(input.schema(), out_names, &user_cols)?;
8687

88+
let create_type = if matches!(table_type, TableType::MaterializedView)
89+
&& input.ctx().session_ctx().config().background_ddl()
90+
&& plan_has_backfill_leaf_nodes(&input)
91+
{
92+
CreateType::Background
93+
} else {
94+
CreateType::Foreground
95+
};
8796
let table = Self::derive_table_catalog(
8897
input.clone(),
8998
name,
@@ -98,6 +107,7 @@ impl StreamMaterialize {
98107
None,
99108
cardinality,
100109
retention_seconds,
110+
create_type,
101111
)?;
102112

103113
Ok(Self::new(input, table))
@@ -139,6 +149,7 @@ impl StreamMaterialize {
139149
version,
140150
Cardinality::unknown(), // unknown cardinality for tables
141151
retention_seconds,
152+
CreateType::Foreground,
142153
)?;
143154

144155
Ok(Self::new(input, table))
@@ -210,6 +221,7 @@ impl StreamMaterialize {
210221
version: Option<TableVersion>,
211222
cardinality: Cardinality,
212223
retention_seconds: Option<NonZeroU32>,
224+
create_type: CreateType,
213225
) -> Result<TableCatalog> {
214226
let input = rewritten_input;
215227

@@ -259,7 +271,7 @@ impl StreamMaterialize {
259271
created_at_epoch: None,
260272
initialized_at_epoch: None,
261273
cleaned_by_watermark: false,
262-
create_type: CreateType::Foreground, // Will be updated in the handler itself.
274+
create_type,
263275
stream_job_status: StreamJobStatus::Creating,
264276
description: None,
265277
incoming_sinks: vec![],

src/frontend/src/optimizer/plan_node/stream_sink.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec
4545
use crate::error::{ErrorCode, Result};
4646
use crate::expr::{ExprImpl, FunctionCall, InputRef};
4747
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
48+
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
4849
use crate::optimizer::plan_node::PlanTreeNodeUnary;
4950
use crate::optimizer::property::{Distribution, Order, RequiredDist};
5051
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -373,7 +374,9 @@ impl StreamSink {
373374
};
374375
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
375376
let distribution_key = input.distribution().dist_column_indices().to_vec();
376-
let create_type = if input.ctx().session_ctx().config().background_ddl() {
377+
let create_type = if input.ctx().session_ctx().config().background_ddl()
378+
&& plan_has_backfill_leaf_nodes(&input)
379+
{
377380
CreateType::Background
378381
} else {
379382
CreateType::Foreground

src/frontend/src/optimizer/plan_node/utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
3131
use crate::catalog::table_catalog::TableType;
3232
use crate::catalog::{ColumnId, TableCatalog, TableId};
3333
use crate::optimizer::property::{Cardinality, Order, RequiredDist};
34+
use crate::optimizer::StreamScanType;
3435
use crate::utils::{Condition, IndexSet};
3536

3637
#[derive(Default)]
@@ -370,3 +371,19 @@ pub fn infer_kv_log_store_table_catalog_inner(
370371

371372
table_catalog_builder.build(dist_key, read_prefix_len_hint)
372373
}
374+
375+
/// Check that all leaf nodes must be stream table scan,
376+
/// since that plan node maps to `backfill` executor, which supports recovery.
377+
pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool {
378+
if plan.inputs().is_empty() {
379+
if let Some(scan) = plan.as_stream_table_scan() {
380+
scan.stream_scan_type() == StreamScanType::Backfill
381+
|| scan.stream_scan_type() == StreamScanType::ArrangementBackfill
382+
} else {
383+
false
384+
}
385+
} else {
386+
assert!(!plan.inputs().is_empty());
387+
plan.inputs().iter().all(plan_has_backfill_leaf_nodes)
388+
}
389+
}

src/meta/src/rpc/ddl_controller.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ use crate::manager::{
6969
CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory,
7070
IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1,
7171
NotificationVersion, RelationIdEnum, SchemaId, SinkId, SourceId, StreamingClusterInfo,
72-
StreamingJob, SubscriptionId, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION,
72+
StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, ViewId,
73+
IGNORED_NOTIFICATION_VERSION,
7374
};
7475
use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism};
7576
use crate::rpc::cloud_provider::AwsEc2Client;
@@ -909,7 +910,7 @@ impl DdlController {
909910
)
910911
.await
911912
}
912-
(CreateType::Background, _) => {
913+
(CreateType::Background, &StreamingJob::MaterializedView(_)) => {
913914
let ctrl = self.clone();
914915
let mgr = mgr.clone();
915916
let stream_job_id = stream_job.id();
@@ -935,6 +936,10 @@ impl DdlController {
935936
tokio::spawn(fut);
936937
Ok(IGNORED_NOTIFICATION_VERSION)
937938
}
939+
(CreateType::Background, _) => {
940+
let d: StreamingJobDiscriminants = stream_job.into();
941+
bail!("background_ddl not supported for: {:?}", d)
942+
}
938943
}
939944
}
940945

0 commit comments

Comments
 (0)