Skip to content

feat(frontend): add settings keyword for runtime parameter #21356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions e2e_test/batch/basic/runtime_parameter.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
statement ok
CREATE SCHEMA schema_2;

statement ok
CREATE TABLE schema_2.t (c int);

query error
EXPLAIN SELECT * FROM t;
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: table or source not found: t


query error
SELECT * FROM t;
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: table or source not found: t


statement ok
EXPLAIN SELECT * FROM t SETTINGS search_path=schema_2 ,query_mode=auto;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we follow the syntax of WITH clause for better consistency? Kindly ask @xiangjinwu for insights.


statement ok
SELECT * FROM t SETTINGS search_path=schema_2, query_mode=local;

statement ok
DROP TABLE schema_2.t;

statement ok
DROP SCHEMA schema_2;
2 changes: 2 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ pub struct SessionConfig {
streaming_sync_log_store_buffer_size: usize,
}

pub type RuntimeParameters = SessionConfig;

fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
if val.is_empty() {
return Ok(());
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::catalog::FunctionId;
use risingwave_common::session_config::{SearchPath, SessionConfig};
use risingwave_common::session_config::{RuntimeParameters, SearchPath, SessionConfig};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_sqlparser::ast::{Expr as AstExpr, SelectItem, SetExpr, Statement};
Expand Down Expand Up @@ -343,7 +343,7 @@ impl Binder {
next_values_id: 0,
next_share_id: 0,
session_config: session.shared_config(),
search_path: session.config().search_path(),
search_path: session.running_sql_runtime_parameters(RuntimeParameters::search_path),
bind_for,
shared_views: HashMap::new(),
included_relations: HashSet::new(),
Expand Down Expand Up @@ -893,6 +893,7 @@ mod tests {
limit: None,
offset: None,
fetch: None,
settings: None,
},
)"#]];
parse_expected.assert_eq(&format!("{:#?}", stmt));
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Binder {
limit,
offset,
fetch,
settings: _,
}: Query,
) -> Result<BoundQuery> {
let mut with_ties = false;
Expand Down Expand Up @@ -431,6 +432,7 @@ impl Binder {
limit,
offset,
fetch,
settings: _,
} = query;

/// the input clause should not be supported.
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use pgwire::pg_response::StatementType;
use risingwave_common::acl::AclMode;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::user::grant_privilege;
use risingwave_sqlparser::ast::{Ident, ObjectName};
Expand Down Expand Up @@ -61,7 +62,7 @@ pub async fn handle_alter_owner(
let db_name = &session.database();
let (schema_name, real_obj_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::StatementType;
use risingwave_common::bail;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::meta::table_parallelism::{
AdaptiveParallelism, FixedParallelism, PbParallelism,
};
Expand All @@ -40,7 +41,7 @@ pub async fn handle_alter_parallelism(
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
13 changes: 7 additions & 6 deletions src/frontend/src/handler/alter_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::acl::AclMode;
use risingwave_common::catalog::is_system_schema;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::ddl_service::alter_name_request;
use risingwave_pb::user::grant_privilege;
use risingwave_sqlparser::ast::ObjectName;
Expand All @@ -36,7 +37,7 @@ pub async fn handle_rename_table(
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let new_table_name = Binder::resolve_table_name(new_table_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down Expand Up @@ -83,7 +84,7 @@ pub async fn handle_rename_index(
let (schema_name, real_index_name) =
Binder::resolve_schema_qualified_name(db_name, index_name.clone())?;
let new_index_name = Binder::resolve_index_name(new_index_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down Expand Up @@ -117,7 +118,7 @@ pub async fn handle_rename_view(
let (schema_name, real_view_name) =
Binder::resolve_schema_qualified_name(db_name, view_name.clone())?;
let new_view_name = Binder::resolve_view_name(new_view_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down Expand Up @@ -147,7 +148,7 @@ pub async fn handle_rename_sink(
let (schema_name, real_sink_name) =
Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?;
let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down Expand Up @@ -180,7 +181,7 @@ pub async fn handle_rename_subscription(
let (schema_name, real_subscription_name) =
Binder::resolve_schema_qualified_name(db_name, subscription_name.clone())?;
let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down Expand Up @@ -214,7 +215,7 @@ pub async fn handle_rename_source(
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
let new_source_name = Binder::resolve_source_name(new_source_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_resource_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::StatementType;
use risingwave_common::bail;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};

Expand All @@ -34,7 +35,7 @@ pub async fn handle_alter_resource_group(
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(&db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.auth_context().user_name;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_set_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use pgwire::pg_response::StatementType;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::ddl_service::alter_set_schema_request::Object;
use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};

Expand All @@ -38,7 +39,7 @@ pub async fn handle_alter_set_schema(
let db_name = &session.database();
let (schema_name, real_obj_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::max_column_id;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::source::{SourceEncode, SourceStruct, extract_source_struct};
use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};
Expand All @@ -39,7 +40,7 @@ pub async fn handle_alter_source_column(
let db_name = &session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{ColumnCatalog, max_column_id};
use risingwave_common::session_config::RuntimeParameters;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
Expand Down Expand Up @@ -103,7 +104,7 @@ pub fn fetch_source_catalog_with_db_schema_id(
let db_name = &session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

Expand All @@ -34,7 +35,7 @@ pub async fn handle_alter_streaming_rate_limit(
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_swap_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::ddl_service::alter_swap_rename_request;
use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair;
use risingwave_sqlparser::ast::ObjectName;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub async fn handle_swap_rename(
let db_name = &session.database();
let (src_schema_name, src_obj_name) =
Binder::resolve_schema_qualified_name(db_name, source_object)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();
let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
let (target_schema_name, target_obj_name) =
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::hash::VnodeCount;
use risingwave_common::session_config::RuntimeParameters;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -441,7 +442,7 @@ pub fn fetch_table_catalog_for_alter(
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_table_drop_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use risingwave_common::session_config::RuntimeParameters;
use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -44,7 +45,7 @@ fn fetch_schema_info(
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name.as_str(), table_name.clone())?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{IndexId, TableDesc, TableId};
use risingwave_common::session_config::RuntimeParameters;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable};
use risingwave_sqlparser::ast;
Expand Down Expand Up @@ -50,7 +51,7 @@ pub(crate) fn resolve_index_schema(
) -> Result<(String, Arc<TableCatalog>, String)> {
let db_name = &session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let search_path = session.config().search_path();
let search_path = session.running_sql_runtime_parameters(RuntimeParameters::search_path);
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use either::Either;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
use risingwave_common::session_config::RuntimeParameters;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::serverless_backfill_controller::{
ProvisionRequest, node_group_controller_service_client,
Expand Down Expand Up @@ -103,7 +104,7 @@ pub fn gen_create_mv_plan_bound(
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
) -> Result<(PlanRef, PbTable)> {
if session.config().create_compaction_group_for_mv() {
if session.running_sql_runtime_parameters(RuntimeParameters::create_compaction_group_for_mv) {
context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
}

Expand Down Expand Up @@ -303,10 +304,9 @@ It only indicates the physical clustering of the data, which may improve the per
}

if resource_group.is_some()
&& !context
.session_ctx()
.config()
.streaming_use_arrangement_backfill()
&& !context.session_ctx().running_sql_runtime_parameters(
RuntimeParameters::streaming_use_arrangement_backfill,
)
{
return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
}
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SC
mod additional_column;
use additional_column::check_and_add_timestamp_column;
pub use additional_column::handle_addition_columns;
use risingwave_common::session_config::RuntimeParameters;

fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
columns
Expand Down Expand Up @@ -185,7 +186,8 @@ impl CreateSourceType {
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source()
&& session
.running_sql_runtime_parameters(RuntimeParameters::streaming_use_shared_source)
{
CreateSourceType::SharedNonCdc
} else {
Expand Down Expand Up @@ -758,8 +760,9 @@ pub fn bind_connector_props(
CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
handler_args
.session
.config()
.cdc_source_wait_streaming_start_timeout()
.running_sql_runtime_parameters(
RuntimeParameters::cdc_source_wait_streaming_start_timeout,
)
.to_string(),
);
}
Expand Down
Loading
Loading