Skip to content

Commit 25a9127

Browse files
authored
feat(frontend): support extended query protocol handle (risingwavelabs#8565)
1 parent 018fe9e commit 25a9127

File tree

3 files changed

+337
-9
lines changed

3 files changed

+337
-9
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use bytes::Bytes;
18+
use pgwire::types::Format;
19+
use risingwave_common::error::{ErrorCode, Result};
20+
use risingwave_common::types::DataType;
21+
use risingwave_sqlparser::ast::Statement;
22+
23+
use super::{query, HandlerArgs, RwPgResponse};
24+
use crate::binder::BoundStatement;
25+
use crate::session::SessionImpl;
26+
27+
pub struct PrepareStatement {
28+
pub statement: Statement,
29+
pub bound_statement: BoundStatement,
30+
pub param_types: Vec<DataType>,
31+
}
32+
33+
pub struct Portal {
34+
pub statement: Statement,
35+
pub bound_statement: BoundStatement,
36+
pub result_formats: Vec<Format>,
37+
}
38+
39+
pub fn handle_parse(
40+
session: Arc<SessionImpl>,
41+
stmt: Statement,
42+
specific_param_types: Vec<DataType>,
43+
) -> Result<PrepareStatement> {
44+
session.clear_cancel_query_flag();
45+
let str_sql = stmt.to_string();
46+
let handler_args = HandlerArgs::new(session, &stmt, &str_sql)?;
47+
match stmt {
48+
Statement::Query(_)
49+
| Statement::Insert { .. }
50+
| Statement::Delete { .. }
51+
| Statement::Update { .. } => query::handle_parse(handler_args, stmt, specific_param_types),
52+
_ => Err(ErrorCode::NotSupported(
53+
format!("Can't support {} in extended query mode now", str_sql,),
54+
"".to_string(),
55+
)
56+
.into()),
57+
}
58+
}
59+
60+
pub fn handle_bind(
61+
prepare_statement: PrepareStatement,
62+
params: Vec<Bytes>,
63+
param_formats: Vec<Format>,
64+
result_formats: Vec<Format>,
65+
) -> Result<Portal> {
66+
let PrepareStatement {
67+
statement,
68+
bound_statement,
69+
..
70+
} = prepare_statement;
71+
let bound_statement = bound_statement.bind_parameter(params, param_formats)?;
72+
Ok(Portal {
73+
statement,
74+
bound_statement,
75+
result_formats,
76+
})
77+
}
78+
79+
pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result<RwPgResponse> {
80+
session.clear_cancel_query_flag();
81+
let str_sql = portal.statement.to_string();
82+
let handler_args = HandlerArgs::new(session, &portal.statement, &str_sql)?;
83+
match &portal.statement {
84+
Statement::Query(_)
85+
| Statement::Insert { .. }
86+
| Statement::Delete { .. }
87+
| Statement::Update { .. } => query::handle_execute(handler_args, portal).await,
88+
_ => Err(ErrorCode::NotSupported(
89+
format!("Can't support {} in extended query mode now", str_sql,),
90+
"".to_string(),
91+
)
92+
.into()),
93+
}
94+
}

src/frontend/src/handler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub mod drop_table;
5959
pub mod drop_user;
6060
mod drop_view;
6161
pub mod explain;
62+
pub mod extended_handle;
6263
mod flush;
6364
pub mod handle_privilege;
6465
pub mod privilege;

src/frontend/src/handler/query.rs

Lines changed: 242 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ use postgres_types::FromSql;
2424
use risingwave_common::catalog::Schema;
2525
use risingwave_common::error::{ErrorCode, Result, RwError};
2626
use risingwave_common::session_config::QueryMode;
27+
use risingwave_common::types::DataType;
2728
use risingwave_sqlparser::ast::{SetExpr, Statement};
2829

30+
use super::extended_handle::{Portal, PrepareStatement};
2931
use super::{PgResponseStream, RwPgResponse};
3032
use crate::binder::{Binder, BoundSetExpr, BoundStatement};
3133
use crate::handler::flush::do_flush;
@@ -74,6 +76,20 @@ fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
7476
) | is_insert_using_select(stmt))
7577
}
7678

79+
fn must_run_in_local_mode(bound: &BoundStatement) -> bool {
80+
let mut must_local = false;
81+
82+
if let BoundStatement::Query(query) = &bound {
83+
if let BoundSetExpr::Select(select) = &query.body
84+
&& let Some(relation) = &select.from
85+
&& relation.contains_sys_table() {
86+
must_local = true;
87+
}
88+
}
89+
90+
must_local
91+
}
92+
7793
pub fn gen_batch_query_plan(
7894
session: &SessionImpl,
7995
context: OptimizerContextRef,
@@ -89,16 +105,9 @@ pub fn gen_batch_query_plan(
89105
let check_items = resolve_privileges(&bound);
90106
session.check_privileges(&check_items)?;
91107

92-
let mut planner = Planner::new(context);
108+
let must_local = must_run_in_local_mode(&bound);
93109

94-
let mut must_local = false;
95-
if let BoundStatement::Query(query) = &bound {
96-
if let BoundSetExpr::Select(select) = &query.body
97-
&& let Some(relation) = &select.from
98-
&& relation.contains_sys_table() {
99-
must_local = true;
100-
}
101-
}
110+
let mut planner = Planner::new(context);
102111

103112
let mut logical = planner.plan(bound)?;
104113
let schema = logical.schema();
@@ -339,3 +348,227 @@ pub async fn local_execute(
339348

340349
Ok(execution.stream_rows())
341350
}
351+
352+
pub fn handle_parse(
353+
handler_args: HandlerArgs,
354+
statement: Statement,
355+
specific_param_types: Vec<DataType>,
356+
) -> Result<PrepareStatement> {
357+
let session = handler_args.session;
358+
let mut binder = Binder::new_with_param_types(&session, specific_param_types);
359+
let bound_statement = binder.bind(statement.clone())?;
360+
361+
let check_items = resolve_privileges(&bound_statement);
362+
session.check_privileges(&check_items)?;
363+
364+
let param_types = binder.export_param_types()?;
365+
366+
Ok(PrepareStatement {
367+
statement,
368+
bound_statement,
369+
param_types,
370+
})
371+
}
372+
373+
pub async fn handle_execute(handler_args: HandlerArgs, portal: Portal) -> Result<RwPgResponse> {
374+
let Portal {
375+
statement,
376+
bound_statement,
377+
result_formats,
378+
} = portal;
379+
380+
let stmt_type = StatementType::infer_from_statement(&statement)
381+
.map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
382+
let session = handler_args.session.clone();
383+
let query_start_time = Instant::now();
384+
let only_checkpoint_visible = handler_args.session.config().only_checkpoint_visible();
385+
let mut notice = String::new();
386+
387+
// Subblock to make sure PlanRef (an Rc) is dropped before `await` below.
388+
let (plan_fragmenter, query_mode, output_schema) = {
389+
let context = OptimizerContext::from_handler_args(handler_args);
390+
391+
let must_dist = must_run_in_distributed_mode(&statement)?;
392+
let must_local = must_run_in_local_mode(&bound_statement);
393+
394+
let mut planner = Planner::new(context.into());
395+
396+
let mut logical = planner.plan(bound_statement)?;
397+
let schema = logical.schema();
398+
let batch_plan = logical.gen_batch_plan()?;
399+
400+
let query_mode = match (must_dist, must_local) {
401+
(true, true) => {
402+
return Err(ErrorCode::InternalError(
403+
"the query is forced to both local and distributed mode by optimizer"
404+
.to_owned(),
405+
)
406+
.into())
407+
}
408+
(true, false) => QueryMode::Distributed,
409+
(false, true) => QueryMode::Local,
410+
(false, false) => match session.config().get_query_mode() {
411+
QueryMode::Auto => determine_query_mode(batch_plan.clone()),
412+
QueryMode::Local => QueryMode::Local,
413+
QueryMode::Distributed => QueryMode::Distributed,
414+
},
415+
};
416+
417+
let physical = match query_mode {
418+
QueryMode::Auto => unreachable!(),
419+
QueryMode::Local => logical.gen_batch_local_plan(batch_plan)?,
420+
QueryMode::Distributed => logical.gen_batch_distributed_plan(batch_plan)?,
421+
};
422+
423+
let context = physical.plan_base().ctx.clone();
424+
tracing::trace!(
425+
"Generated query plan: {:?}, query_mode:{:?}",
426+
physical.explain_to_string()?,
427+
query_mode
428+
);
429+
let plan_fragmenter = BatchPlanFragmenter::new(
430+
session.env().worker_node_manager_ref(),
431+
session.env().catalog_reader().clone(),
432+
session.config().get_batch_parallelism(),
433+
physical,
434+
)?;
435+
context.append_notice(&mut notice);
436+
(plan_fragmenter, query_mode, schema)
437+
};
438+
let query = plan_fragmenter.generate_complete_query().await?;
439+
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
440+
441+
let pg_descs = output_schema
442+
.fields()
443+
.iter()
444+
.map(to_pg_field)
445+
.collect::<Vec<PgFieldDescriptor>>();
446+
let column_types = output_schema
447+
.fields()
448+
.iter()
449+
.map(|f| f.data_type())
450+
.collect_vec();
451+
452+
// Used in counting row count.
453+
let first_field_format = result_formats.first().copied().unwrap_or(Format::Text);
454+
455+
let mut row_stream = {
456+
let query_epoch = session.config().get_query_epoch();
457+
let query_snapshot = if let Some(query_epoch) = query_epoch {
458+
PinnedHummockSnapshot::Other(query_epoch)
459+
} else {
460+
// Acquire hummock snapshot for execution.
461+
// TODO: if there's no table scan, we don't need to acquire snapshot.
462+
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
463+
let query_id = query.query_id().clone();
464+
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;
465+
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible)
466+
};
467+
match query_mode {
468+
QueryMode::Auto => unreachable!(),
469+
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
470+
local_execute(session.clone(), query, query_snapshot).await?,
471+
column_types,
472+
result_formats,
473+
session.clone(),
474+
)),
475+
// Local mode do not support cancel tasks.
476+
QueryMode::Distributed => {
477+
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
478+
distribute_execute(session.clone(), query, query_snapshot).await?,
479+
column_types,
480+
result_formats,
481+
session.clone(),
482+
))
483+
}
484+
}
485+
};
486+
487+
let rows_count: Option<i32> = match stmt_type {
488+
StatementType::SELECT
489+
| StatementType::INSERT_RETURNING
490+
| StatementType::DELETE_RETURNING
491+
| StatementType::UPDATE_RETURNING => None,
492+
493+
StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => {
494+
let first_row_set = row_stream.next().await;
495+
let first_row_set = match first_row_set {
496+
None => {
497+
return Err(RwError::from(ErrorCode::InternalError(
498+
"no affected rows in output".to_string(),
499+
)))
500+
}
501+
Some(row) => {
502+
row.map_err(|err| RwError::from(ErrorCode::InternalError(format!("{}", err))))?
503+
}
504+
};
505+
let affected_rows_str = first_row_set[0].values()[0]
506+
.as_ref()
507+
.expect("compute node should return affected rows in output");
508+
if let Format::Binary = first_field_format {
509+
Some(
510+
i64::from_sql(&postgres_types::Type::INT8, affected_rows_str)
511+
.unwrap()
512+
.try_into()
513+
.expect("affected rows count large than i32"),
514+
)
515+
} else {
516+
Some(
517+
String::from_utf8(affected_rows_str.to_vec())
518+
.unwrap()
519+
.parse()
520+
.unwrap_or_default(),
521+
)
522+
}
523+
}
524+
_ => unreachable!(),
525+
};
526+
527+
// We need to do some post work after the query is finished and before the `Complete` response
528+
// it sent. This is achieved by the `callback` in `PgResponse`.
529+
let callback = async move {
530+
// Implicitly flush the writes.
531+
if session.config().get_implicit_flush() && stmt_type.is_dml() {
532+
do_flush(&session).await?;
533+
}
534+
535+
// update some metrics
536+
match query_mode {
537+
QueryMode::Auto => unreachable!(),
538+
QueryMode::Local => {
539+
session
540+
.env()
541+
.frontend_metrics
542+
.latency_local_execution
543+
.observe(query_start_time.elapsed().as_secs_f64());
544+
545+
session
546+
.env()
547+
.frontend_metrics
548+
.query_counter_local_execution
549+
.inc();
550+
}
551+
QueryMode::Distributed => {
552+
session
553+
.env()
554+
.query_manager()
555+
.query_metrics
556+
.query_latency
557+
.observe(query_start_time.elapsed().as_secs_f64());
558+
559+
session
560+
.env()
561+
.query_manager()
562+
.query_metrics
563+
.completed_query_counter
564+
.inc();
565+
}
566+
}
567+
568+
Ok(())
569+
};
570+
571+
Ok(PgResponse::new_for_stream_extra(
572+
stmt_type, rows_count, row_stream, pg_descs, notice, callback,
573+
))
574+
}

0 commit comments

Comments
 (0)