Skip to content

Commit 6d28cf6

Browse files
authored
refactor(streaming): only scan necessary columns in backfill (risingwavelabs#8533)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 42f17a6 commit 6d28cf6

File tree

9 files changed

+115
-16
lines changed

9 files changed

+115
-16
lines changed

dashboard/proto/gen/stream_plan.ts

Lines changed: 19 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/stream_plan.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,12 @@ message ChainNode {
389389
uint32 table_id = 1;
390390
// The schema of input stream, which will be used to build a MergeNode
391391
repeated plan_common.Field upstream_fields = 2;
392-
// Which columns from upstream are used in this Chain node.
392+
// The columns from the upstream table to output.
393+
// TODO: rename this field.
393394
repeated uint32 upstream_column_indices = 3;
395+
// The columns from the upstream table that'll be internally required by this chain node.
396+
// TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529).
397+
repeated int32 upstream_column_ids = 8;
394398
// Generally, the barrier needs to be rearranged during the MV creation process, so that data can
395399
// be flushed to shared buffer periodically, instead of making the first epoch from batch query extra
396400
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.

src/common/src/catalog/column.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ impl From<i32> for ColumnId {
6262
Self::new(column_id)
6363
}
6464
}
65+
impl From<&i32> for ColumnId {
66+
fn from(column_id: &i32) -> Self {
67+
Self::new(*column_id)
68+
}
69+
}
6570

6671
impl From<ColumnId> for i32 {
6772
fn from(id: ColumnId) -> i32 {

src/frontend/src/optimizer/plan_node/logical_scan.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ impl LogicalScan {
201201
.collect()
202202
}
203203

204+
/// Get the ids of the output columns and primary key columns.
205+
pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
206+
let mut ids = self.output_column_ids();
207+
for column_order in self.primary_key() {
208+
let id = self.table_desc().columns[column_order.column_index].column_id;
209+
if !ids.contains(&id) {
210+
ids.push(id);
211+
}
212+
}
213+
ids
214+
}
215+
204216
pub fn output_column_indices(&self) -> &[usize] {
205217
&self.core.output_col_idx
206218
}

src/frontend/src/optimizer/plan_node/stream_index_scan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ impl StreamNode for StreamIndexScan {
125125
}
126126

127127
impl StreamIndexScan {
128+
// TODO: this method is almost the same as `StreamTableScan::adhoc_to_stream_prost`, we should
129+
// avoid duplication.
128130
pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan {
129131
use risingwave_pb::plan_common::*;
130132
use risingwave_pb::stream_plan::*;
@@ -141,6 +143,14 @@ impl StreamIndexScan {
141143

142144
let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec();
143145

146+
let upstream_column_ids = match self.chain_type {
147+
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
148+
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
149+
self.logical.output_column_ids()
150+
}
151+
ChainType::ChainUnspecified => unreachable!(),
152+
};
153+
144154
ProstStreamPlan {
145155
fields: self.schema().to_prost(),
146156
input: vec![
@@ -195,6 +205,7 @@ impl StreamIndexScan {
195205
.iter()
196206
.map(|&i| i as _)
197207
.collect(),
208+
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
198209
is_singleton: false,
199210
table_desc: Some(self.logical.table_desc().to_protobuf()),
200211
})),

src/frontend/src/optimizer/plan_node/stream_table_scan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,16 @@ impl StreamTableScan {
171171

172172
let stream_key = self.logical_pk().iter().map(|x| *x as u32).collect_vec();
173173

174+
// The required columns from the table (both scan and upstream).
175+
let upstream_column_ids = match self.chain_type {
176+
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
177+
self.logical.output_column_ids()
178+
}
179+
// For backfill, we additionally need the primary key columns.
180+
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
181+
ChainType::ChainUnspecified => unreachable!(),
182+
};
183+
174184
ProstStreamPlan {
175185
fields: self.schema().to_prost(),
176186
input: vec![
@@ -225,6 +235,7 @@ impl StreamTableScan {
225235
.iter()
226236
.map(|&i| i as _)
227237
.collect(),
238+
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
228239
is_singleton: *self.distribution() == Distribution::Single,
229240
// The table desc used by backfill executor
230241
table_desc: Some(self.logical.table_desc().to_protobuf()),

src/storage/src/table/batch_table/storage_table.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,20 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
287287
pub fn pk_indices(&self) -> &[usize] {
288288
&self.pk_indices
289289
}
290+
291+
pub fn output_indices(&self) -> &[usize] {
292+
&self.output_indices
293+
}
294+
295+
/// Get the indices of the primary key columns in the output columns.
296+
///
297+
/// Returns `None` if any of the primary key columns is not in the output columns.
298+
pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
299+
self.pk_indices
300+
.iter()
301+
.map(|&i| self.output_indices.iter().position(|&j| i == j))
302+
.collect()
303+
}
290304
}
291305

292306
/// Point get

src/stream/src/executor/backfill.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use either::Either;
2020
use futures::stream::select_with_strategy;
2121
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
2222
use futures_async_stream::try_stream;
23+
use itertools::Itertools;
2324
use risingwave_common::array::{Op, StreamChunk};
2425
use risingwave_common::buffer::BitmapBuilder;
2526
use risingwave_common::catalog::Schema;
@@ -104,10 +105,24 @@ where
104105

105106
#[try_stream(ok = Message, error = StreamExecutorError)]
106107
async fn execute_inner(mut self) {
107-
// Table storage primary key.
108-
let table_pk_indices = self.table.pk_indices();
108+
// The primary key columns, in the output columns of the table scan.
109+
let pk_in_output_indices = self.table.pk_in_output_indices().unwrap();
109110
let pk_order = self.table.pk_serializer().get_order_types();
110-
let upstream_indices = self.upstream_indices;
111+
112+
// TODO: unify these two mappings if we make the upstream and table output the same.
113+
// The columns to be forwarded to the downstream, in the upstream columns.
114+
let downstream_in_upstream_indices = self.upstream_indices;
115+
// The columns to be forwarded to the downstream, in the output columns of the table scan.
116+
let downstream_in_output_indices = downstream_in_upstream_indices
117+
.iter()
118+
.map(|&i| {
119+
self.table
120+
.output_indices()
121+
.iter()
122+
.position(|&j| i == j)
123+
.unwrap()
124+
})
125+
.collect_vec();
111126

112127
let mut upstream = self.upstream.execute();
113128

@@ -139,7 +154,9 @@ where
139154
// Forward messages directly to the downstream.
140155
#[for_await]
141156
for message in upstream {
142-
if let Some(message) = Self::mapping_message(message?, &upstream_indices) {
157+
if let Some(message) =
158+
Self::mapping_message(message?, &downstream_in_upstream_indices)
159+
{
143160
yield message;
144161
}
145162
}
@@ -213,10 +230,10 @@ where
213230
Self::mark_chunk(
214231
chunk,
215232
current_pos,
216-
table_pk_indices,
233+
&pk_in_output_indices,
217234
pk_order,
218235
),
219-
&upstream_indices,
236+
&downstream_in_upstream_indices,
220237
));
221238
}
222239
}
@@ -255,7 +272,7 @@ where
255272
processed_rows += chunk.cardinality() as u64;
256273
yield Message::Chunk(Self::mapping_chunk(
257274
chunk,
258-
&upstream_indices,
275+
&downstream_in_upstream_indices,
259276
));
260277
}
261278

@@ -272,11 +289,14 @@ where
272289
.last()
273290
.unwrap()
274291
.1
275-
.project(table_pk_indices)
292+
.project(&pk_in_output_indices)
276293
.into_owned_row(),
277294
);
278295
processed_rows += chunk.cardinality() as u64;
279-
yield Message::Chunk(Self::mapping_chunk(chunk, &upstream_indices));
296+
yield Message::Chunk(Self::mapping_chunk(
297+
chunk,
298+
&downstream_in_output_indices,
299+
));
280300
}
281301
}
282302
}
@@ -293,7 +313,7 @@ where
293313
// Forward messages directly to the downstream.
294314
#[for_await]
295315
for msg in upstream {
296-
if let Some(msg) = Self::mapping_message(msg?, &upstream_indices) {
316+
if let Some(msg) = Self::mapping_message(msg?, &downstream_in_upstream_indices) {
297317
if let Some(barrier) = msg.as_barrier() {
298318
self.progress.finish(barrier.epoch.curr);
299319
}
@@ -360,7 +380,7 @@ where
360380
fn mark_chunk(
361381
chunk: StreamChunk,
362382
current_pos: &OwnedRow,
363-
table_pk_indices: PkIndicesRef<'_>,
383+
pk_in_output_indices: PkIndicesRef<'_>,
364384
pk_order: &[OrderType],
365385
) -> StreamChunk {
366386
let chunk = chunk.compact();
@@ -369,7 +389,7 @@ where
369389
// Use project to avoid allocation.
370390
for v in data.rows().map(|row| {
371391
match row
372-
.project(table_pk_indices)
392+
.project(pk_in_output_indices)
373393
.iter()
374394
.zip_eq_fast(pk_order.iter())
375395
.cmp_by(current_pos.iter(), |(x, order), y| {

src/stream/src/from_proto/chain.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
15+
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption};
1616
use risingwave_common::util::sort_util::OrderType;
1717
use risingwave_pb::plan_common::StorageTableDesc;
1818
use risingwave_pb::stream_plan::{ChainNode, ChainType};
@@ -98,7 +98,11 @@ impl ExecutorBuilder for ChainExecutorBuilder {
9898
.iter()
9999
.map(ColumnDesc::from)
100100
.collect_vec();
101-
let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec();
101+
let column_ids = node
102+
.upstream_column_ids
103+
.iter()
104+
.map(ColumnId::from)
105+
.collect_vec();
102106

103107
// Use indices based on full table instead of streaming executor output.
104108
let pk_indices = table_desc

0 commit comments

Comments
 (0)