Skip to content

feat(catalog): implement pg_settings #15108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions e2e_test/batch/catalog/issue_8791.slt.part
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# UNION and other complex queries should also be in local mode
query I
SELECT name FROM pg_catalog.pg_settings union select 'a';
SELECT amname FROM pg_catalog.pg_am union select 'a';
----
a

query T
SELECT name FROM (SELECT pg_catalog.lower(name) AS name FROM pg_catalog.pg_settings UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(name,1,0)=''
LIMIT 1000
SELECT amname FROM (SELECT pg_catalog.lower(amname) AS amname FROM pg_catalog.pg_am UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(amname,1,0)=''
LIMIT 1000;
----
session authorization
all

query I
with q as ( select name FROM pg_catalog.pg_settings ) select * from q;
with q as ( select amname FROM pg_catalog.pg_am ) select * from q;
----
45 changes: 44 additions & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,49 @@
query TT
SELECT * FROM pg_catalog.pg_settings;
SELECT name FROM pg_catalog.pg_settings order by name;
----
application_name
background_ddl
batch_enable_distributed_dml
batch_parallelism
bytea_output
client_encoding
client_min_messages
create_compaction_group_for_mv
datestyle
extra_float_digits
idle_in_transaction_session_timeout
intervalstyle
lock_timeout
max_split_range_gap
query_epoch
query_mode
row_security
rw_batch_enable_lookup_join
rw_batch_enable_sort_agg
rw_enable_join_ordering
rw_enable_share_plan
rw_enable_two_phase_agg
rw_force_split_distinct_agg
rw_force_two_phase_agg
rw_implicit_flush
rw_streaming_allow_jsonb_in_stream_key
rw_streaming_enable_bushy_join
rw_streaming_enable_delta_join
rw_streaming_over_window_cache_policy
search_path
server_encoding
server_version
server_version_num
sink_decouple
standard_conforming_strings
statement_timeout
streaming_enable_arrangement_backfill
streaming_parallelism
streaming_rate_limit
synchronize_seqscans
timezone
transaction_isolation
visibility_mode

query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use std::sync::{Arc, LazyLock};
use async_trait::async_trait;
use futures::future::BoxFuture;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, SysCatalogReader, TableDesc, TableId, DEFAULT_SUPER_USER_ID,
NON_RESERVED_SYS_CATALOG_ID,
};
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
Expand Down Expand Up @@ -104,7 +106,10 @@ pub struct SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
// Read from meta.
meta_client: Arc<dyn FrontendMetaClient>,
// Read auth context.
auth_context: Arc<AuthContext>,
// Read config.
config: Arc<RwLock<ConfigMap>>,
}

impl SysCatalogReaderImpl {
Expand All @@ -114,13 +119,15 @@ impl SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
meta_client: Arc<dyn FrontendMetaClient>,
auth_context: Arc<AuthContext>,
config: Arc<RwLock<ConfigMap>>,
) -> Self {
Self {
catalog_reader,
user_info_reader,
worker_node_manager,
meta_client,
auth_context,
config,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,28 @@
use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use crate::catalog::system_catalog::SysCatalogReaderImpl;

/// The catalog `pg_settings` stores settings.
/// Ref: [`https://www.postgresql.org/docs/current/view-pg-settings.html`]
#[system_catalog(view, "pg_catalog.pg_settings")]
#[derive(Fields)]
struct PgSetting {
name: String,
setting: String,
short_desc: String,
}

#[system_catalog(table, "pg_catalog.pg_settings")]
fn read_pg_settings(reader: &SysCatalogReaderImpl) -> Vec<PgSetting> {
let config_reader = reader.config.read();
let all_variables = config_reader.show_all();

all_variables
.iter()
.map(|info| PgSetting {
name: info.name.clone(),
setting: info.setting.clone(),
short_desc: info.description.clone(),
})
.collect()
}
8 changes: 2 additions & 6 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
use crate::scheduler::task_context::FrontendBatchTaskContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult};
use crate::session::{AuthContext, FrontendEnv, SessionImpl};
use crate::session::{FrontendEnv, SessionImpl};

pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;

Expand Down Expand Up @@ -94,10 +94,6 @@ impl LocalQueryExecution {
}
}

fn auth_context(&self) -> Arc<AuthContext> {
self.session.auth_context()
}

fn shutdown_rx(&self) -> ShutdownToken {
self.session.reset_cancel_query_flag()
}
Expand All @@ -106,7 +102,7 @@ impl LocalQueryExecution {
pub async fn run_inner(self) {
debug!(%self.query.query_id, self.sql, "Starting to run query");

let context = FrontendBatchTaskContext::new(self.front_env.clone(), self.auth_context());
let context = FrontendBatchTaskContext::new(self.session.clone());

let task_id = TaskId {
query_id: self.query.query_id.id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ impl ExecutionContext {
}

pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext {
FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context())
FrontendBatchTaskContext::new(self.session.clone())
}
}
28 changes: 14 additions & 14 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
use crate::session::SessionImpl;

/// Batch task execution context in frontend.
#[derive(Clone)]
pub struct FrontendBatchTaskContext {
env: FrontendEnv,
auth_context: Arc<AuthContext>,
session: Arc<SessionImpl>,
}

impl FrontendBatchTaskContext {
pub fn new(env: FrontendEnv, auth_context: Arc<AuthContext>) -> Self {
Self { env, auth_context }
pub fn new(session: Arc<SessionImpl>) -> Self {
Self { session }
}
}

Expand All @@ -47,16 +46,17 @@ impl BatchTaskContext for FrontendBatchTaskContext {

fn catalog_reader(&self) -> SysCatalogReaderRef {
Arc::new(SysCatalogReaderImpl::new(
self.env.catalog_reader().clone(),
self.env.user_info_reader().clone(),
self.env.worker_node_manager_ref(),
self.env.meta_client_ref(),
self.auth_context.clone(),
self.session.env().catalog_reader().clone(),
self.session.env().user_info_reader().clone(),
self.session.env().worker_node_manager_ref(),
self.session.env().meta_client_ref(),
self.session.auth_context(),
self.session.shared_config(),
))
}

fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
is_local_address(self.env.server_address(), peer_addr)
is_local_address(self.session.env().server_address(), peer_addr)
}

fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
Expand All @@ -68,19 +68,19 @@ impl BatchTaskContext for FrontendBatchTaskContext {
}

fn client_pool(&self) -> ComputeClientPoolRef {
self.env.client_pool()
self.session.env().client_pool()
}

fn get_config(&self) -> &BatchConfig {
self.env.batch_config()
self.session.env().batch_config()
}

fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
unimplemented!("not supported in local mode")
}

fn source_metrics(&self) -> Arc<SourceMetrics> {
self.env.source_metrics()
self.session.env().source_metrics()
}

fn store_mem_usage(&self, _val: usize) {
Expand Down