Skip to content

feat(catalog): implement pg_settings #15108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions e2e_test/batch/catalog/issue_8791.slt.part
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# UNION and other complex queries should also be in local mode
query I
SELECT name FROM pg_catalog.pg_settings union select 'a';
SELECT amname FROM pg_catalog.pg_am union select 'a';
----
a

query T
SELECT name FROM (SELECT pg_catalog.lower(name) AS name FROM pg_catalog.pg_settings UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(name,1,0)=''
LIMIT 1000
SELECT amname FROM (SELECT pg_catalog.lower(amname) AS amname FROM pg_catalog.pg_am UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(amname,1,0)=''
LIMIT 1000;
----
session authorization
all

query I
with q as ( select name FROM pg_catalog.pg_settings ) select * from q;
with q as ( select amname FROM pg_catalog.pg_am ) select * from q;
----
45 changes: 44 additions & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,49 @@
query TT
SELECT * FROM pg_catalog.pg_settings;
SELECT name, setting FROM pg_catalog.pg_settings order by name;
----
application_name psql
background_ddl false
batch_enable_distributed_dml false
batch_parallelism 0
bytea_output hex
client_encoding UTF8
client_min_messages notice
create_compaction_group_for_mv false
datestyle
extra_float_digits 1
idle_in_transaction_session_timeout 60000
intervalstyle
lock_timeout 0
max_split_range_gap 8
query_epoch 0
query_mode auto
row_security true
rw_batch_enable_lookup_join true
rw_batch_enable_sort_agg true
rw_enable_join_ordering true
rw_enable_share_plan true
rw_enable_two_phase_agg true
rw_force_split_distinct_agg false
rw_force_two_phase_agg false
rw_implicit_flush false
rw_streaming_allow_jsonb_in_stream_key false
rw_streaming_enable_bushy_join true
rw_streaming_enable_delta_join false
rw_streaming_over_window_cache_policy full
search_path "$user", public
server_encoding UTF8
server_version 9.5.0
server_version_num 90500
sink_decouple default
standard_conforming_strings on
statement_timeout 0
streaming_enable_arrangement_backfill false
streaming_parallelism 0
streaming_rate_limit 0
synchronize_seqscans false
timezone UTC
transaction_isolation read committed
visibility_mode default

query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use std::sync::{Arc, LazyLock};
use async_trait::async_trait;
use futures::future::BoxFuture;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, SysCatalogReader, TableDesc, TableId, DEFAULT_SUPER_USER_ID,
NON_RESERVED_SYS_CATALOG_ID,
};
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
Expand Down Expand Up @@ -104,7 +106,10 @@ pub struct SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
// Read from meta.
meta_client: Arc<dyn FrontendMetaClient>,
// Read auth context.
auth_context: Arc<AuthContext>,
// Read config.
config: Arc<RwLock<ConfigMap>>,
}

impl SysCatalogReaderImpl {
Expand All @@ -114,13 +119,15 @@ impl SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
meta_client: Arc<dyn FrontendMetaClient>,
auth_context: Arc<AuthContext>,
config: Arc<RwLock<ConfigMap>>,
) -> Self {
Self {
catalog_reader,
user_info_reader,
worker_node_manager,
meta_client,
auth_context,
config,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,28 @@
use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use crate::catalog::system_catalog::SysCatalogReaderImpl;

/// The catalog `pg_settings` stores settings.
/// Ref: [`https://www.postgresql.org/docs/current/view-pg-settings.html`]
#[system_catalog(view, "pg_catalog.pg_settings")]
#[derive(Fields)]
struct PgSetting {
name: String,
setting: String,
short_desc: String,
}

#[system_catalog(table, "pg_catalog.pg_settings")]
fn read_pg_settings(reader: &SysCatalogReaderImpl) -> Vec<PgSetting> {
let config_reader = reader.config.read();
let all_variables = config_reader.show_all();

all_variables
.iter()
.map(|info| PgSetting {
name: info.name.clone(),
setting: info.setting.clone(),
short_desc: info.description.clone(),
})
.collect()
}
8 changes: 2 additions & 6 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
use crate::scheduler::task_context::FrontendBatchTaskContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult};
use crate::session::{AuthContext, FrontendEnv, SessionImpl};
use crate::session::{FrontendEnv, SessionImpl};

pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;

Expand Down Expand Up @@ -94,10 +94,6 @@ impl LocalQueryExecution {
}
}

fn auth_context(&self) -> Arc<AuthContext> {
self.session.auth_context()
}

fn shutdown_rx(&self) -> ShutdownToken {
self.session.reset_cancel_query_flag()
}
Expand All @@ -106,7 +102,7 @@ impl LocalQueryExecution {
pub async fn run_inner(self) {
debug!(%self.query.query_id, self.sql, "Starting to run query");

let context = FrontendBatchTaskContext::new(self.front_env.clone(), self.auth_context());
let context = FrontendBatchTaskContext::new(self.session.clone());

let task_id = TaskId {
query_id: self.query.query_id.id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ impl ExecutionContext {
}

pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext {
FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context())
FrontendBatchTaskContext::new(self.session.clone())
}
}
28 changes: 14 additions & 14 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
use crate::session::SessionImpl;

/// Batch task execution context in frontend.
#[derive(Clone)]
pub struct FrontendBatchTaskContext {
env: FrontendEnv,
auth_context: Arc<AuthContext>,
session: Arc<SessionImpl>,
}

impl FrontendBatchTaskContext {
pub fn new(env: FrontendEnv, auth_context: Arc<AuthContext>) -> Self {
Self { env, auth_context }
pub fn new(session: Arc<SessionImpl>) -> Self {
Self { session }
}
}

Expand All @@ -47,16 +46,17 @@ impl BatchTaskContext for FrontendBatchTaskContext {

fn catalog_reader(&self) -> SysCatalogReaderRef {
Arc::new(SysCatalogReaderImpl::new(
self.env.catalog_reader().clone(),
self.env.user_info_reader().clone(),
self.env.worker_node_manager_ref(),
self.env.meta_client_ref(),
self.auth_context.clone(),
self.session.env().catalog_reader().clone(),
self.session.env().user_info_reader().clone(),
self.session.env().worker_node_manager_ref(),
self.session.env().meta_client_ref(),
self.session.auth_context(),
self.session.shared_config(),
))
}

fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
is_local_address(self.env.server_address(), peer_addr)
is_local_address(self.session.env().server_address(), peer_addr)
}

fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
Expand All @@ -68,19 +68,19 @@ impl BatchTaskContext for FrontendBatchTaskContext {
}

fn client_pool(&self) -> ComputeClientPoolRef {
self.env.client_pool()
self.session.env().client_pool()
}

fn get_config(&self) -> &BatchConfig {
self.env.batch_config()
self.session.env().batch_config()
}

fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
unimplemented!("not supported in local mode")
}

fn source_metrics(&self) -> Arc<SourceMetrics> {
self.env.source_metrics()
self.session.env().source_metrics()
}

fn store_mem_usage(&self, _val: usize) {
Expand Down