Skip to content

Commit 1449439

Browse files
authored
refactor(fragmenter): remove is_singleton workarounds on Chain (risingwavelabs#8536)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 9f68cef commit 1449439

File tree

10 files changed

+42
-59
lines changed

10 files changed

+42
-59
lines changed

dashboard/proto/gen/stream_plan.ts

Lines changed: 10 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/stream_plan.proto

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,6 @@ enum ChainType {
383383
// 1. MergeNode (as a placeholder) for streaming read.
384384
// 2. BatchPlanNode for snapshot read.
385385
message ChainNode {
386-
reserved 5;
387-
reserved "same_worker_node";
388-
389386
uint32 table_id = 1;
390387
// The schema of input stream, which will be used to build a MergeNode
391388
repeated plan_common.Field upstream_fields = 2;
@@ -400,10 +397,6 @@ message ChainNode {
400397
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
401398
// ChainType is used to decide which implementation for the ChainNode.
402399
ChainType chain_type = 4;
403-
// Whether the upstream materialize is and this chain should be a singleton.
404-
// FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one
405-
// fragment in the downstream mview. Remove this when we refactor the fragmenter.
406-
bool is_singleton = 6;
407400

408401
// The upstream materialized view info used by backfill.
409402
plan_common.StorageTableDesc table_desc = 7;
@@ -653,8 +646,10 @@ message StreamFragmentGraph {
653646
StreamNode node = 2;
654647
// Bitwise-OR of FragmentTypeFlags
655648
uint32 fragment_type_mask = 3;
656-
// mark whether this fragment should only have one actor.
657-
bool is_singleton = 4;
649+
// Mark whether this fragment requires exactly one actor.
650+
// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
651+
// One should check `meta.Fragment.distribution_type` for the final result.
652+
bool requires_singleton = 4;
658653
// Number of table ids (stateful states) for this fragment.
659654
uint32 table_ids_cnt = 5;
660655
// Mark the upstream table ids of this fragment, Used for fragments with `Chain`s.

src/common/src/hash/consistent_hash/mapping.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,11 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
217217

218218
/// Transform this vnode mapping to another type of vnode mapping, with the given mapping from
219219
/// items of this mapping to items of the other mapping.
220-
pub fn transform<T2: VnodeMappingItem>(
221-
&self,
222-
to_map: &HashMap<T::Item, T2::Item>,
223-
) -> VnodeMapping<T2> {
220+
pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
221+
where
222+
T2: VnodeMappingItem,
223+
M: for<'a> Index<&'a T::Item, Output = T2::Item>,
224+
{
224225
VnodeMapping {
225226
original_indices: self.original_indices.clone(),
226227
data: self.data.iter().map(|item| to_map[item]).collect(),

src/frontend/src/optimizer/plan_node/stream_index_scan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ impl StreamIndexScan {
206206
.map(|&i| i as _)
207207
.collect(),
208208
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
209-
is_singleton: false,
210209
table_desc: Some(self.logical.table_desc().to_protobuf()),
211210
})),
212211
stream_key,

src/frontend/src/optimizer/plan_node/stream_table_scan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ impl StreamTableScan {
236236
.map(|&i| i as _)
237237
.collect(),
238238
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
239-
is_singleton: *self.distribution() == Distribution::Single,
240239
// The table desc used by backfill executor
241240
table_desc: Some(self.logical.table_desc().to_protobuf()),
242241
})),

src/frontend/src/stream_fragmenter/graph/fragment_graph.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ pub struct StreamFragment {
3737
/// Bitwise-OR of type Flags of this fragment.
3838
pub fragment_type_mask: u32,
3939

40-
/// mark whether this fragment should only have one actor.
41-
pub is_singleton: bool,
40+
/// Mark whether this fragment requires exactly one actor.
41+
pub requires_singleton: bool,
4242

4343
/// Number of table ids (stateful states) for this fragment.
4444
pub table_ids_cnt: u32,
@@ -64,8 +64,7 @@ impl StreamFragment {
6464
Self {
6565
fragment_id,
6666
fragment_type_mask: FragmentTypeFlag::FragmentUnspecified as u32,
67-
// FIXME: is it okay to use `false` as default value?
68-
is_singleton: false,
67+
requires_singleton: false,
6968
node: None,
7069
table_ids_cnt: 0,
7170
upstream_table_ids: vec![],
@@ -77,7 +76,7 @@ impl StreamFragment {
7776
fragment_id: self.fragment_id,
7877
node: self.node.clone().map(|n| *n),
7978
fragment_type_mask: self.fragment_type_mask,
80-
is_singleton: self.is_singleton,
79+
requires_singleton: self.requires_singleton,
8180
table_ids_cnt: self.table_ids_cnt,
8281
upstream_table_ids: self.upstream_table_ids.clone(),
8382
}

src/frontend/src/stream_fragmenter/mod.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,7 @@ pub(self) fn build_and_add_fragment(
223223
}
224224

225225
/// Build new fragment and link dependencies by visiting children recursively, update
226-
/// `is_singleton` and `fragment_type` properties for current fragment. While traversing the
227-
/// tree, count how many table ids should be allocated in this fragment.
228-
// TODO: Should we store the concurrency in StreamFragment directly?
226+
/// `requires_singleton` and `fragment_type` properties for current fragment.
229227
fn build_fragment(
230228
state: &mut BuildFragmentGraphState,
231229
current_fragment: &mut StreamFragment,
@@ -248,8 +246,7 @@ fn build_fragment(
248246

249247
NodeBody::Sink(_) => current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32,
250248

251-
// TODO: Force singleton for TopN as a workaround. We should implement two phase TopN.
252-
NodeBody::TopN(_) => current_fragment.is_singleton = true,
249+
NodeBody::TopN(_) => current_fragment.requires_singleton = true,
253250

254251
// FIXME: workaround for single-fragment mview on singleton upstream mview.
255252
NodeBody::Chain(node) => {
@@ -259,12 +256,11 @@ fn build_fragment(
259256
.dependent_relation_ids
260257
.insert(TableId::new(node.table_id));
261258
current_fragment.upstream_table_ids.push(node.table_id);
262-
current_fragment.is_singleton = node.is_singleton;
263259
}
264260

265261
NodeBody::Now(_) => {
266262
current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
267-
current_fragment.is_singleton = true;
263+
current_fragment.requires_singleton = true;
268264
}
269265

270266
_ => {}
@@ -293,8 +289,6 @@ fn build_fragment(
293289
// Exchange node indicates a new child fragment.
294290
NodeBody::Exchange(exchange_node) => {
295291
let exchange_node_strategy = exchange_node.get_strategy()?.clone();
296-
let is_simple_dispatcher =
297-
exchange_node_strategy.get_type()? == DispatcherType::Simple;
298292

299293
// Exchange node should have only one input.
300294
let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap();
@@ -308,9 +302,6 @@ fn build_fragment(
308302
},
309303
);
310304

311-
if is_simple_dispatcher {
312-
current_fragment.is_singleton = true;
313-
}
314305
Ok(child_node)
315306
}
316307

src/meta/src/stream/stream_graph/fragment.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use itertools::Itertools;
2323
use risingwave_common::bail;
2424
use risingwave_common::catalog::{generate_internal_table_name_with_type, TableId};
2525
use risingwave_pb::catalog::Table;
26-
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
2726
use risingwave_pb::meta::table_fragments::Fragment;
2827
use risingwave_pb::stream_plan::stream_fragment_graph::{
2928
Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
@@ -604,11 +603,7 @@ impl CompleteStreamFragmentGraph {
604603
table_id,
605604
} = self.get_fragment(id).into_building().unwrap();
606605

607-
let distribution_type = if inner.is_singleton {
608-
FragmentDistributionType::Single
609-
} else {
610-
FragmentDistributionType::Hash
611-
} as i32;
606+
let distribution_type = distribution.to_distribution_type() as i32;
612607

613608
let state_table_ids = internal_tables
614609
.iter()

src/meta/src/stream/stream_graph/schedule.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use rand::thread_rng;
2929
use risingwave_common::bail;
3030
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
3131
use risingwave_pb::common::{ActorInfo, ParallelUnit};
32-
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
32+
use risingwave_pb::meta::table_fragments::fragment::{
33+
FragmentDistributionType, PbFragmentDistributionType,
34+
};
3335
use risingwave_pb::stream_plan::DispatcherType::{self, *};
3436

3537
use crate::manager::{WorkerId, WorkerLocations};
@@ -170,6 +172,14 @@ impl Distribution {
170172
FragmentDistributionType::Hash => Distribution::Hash(mapping),
171173
}
172174
}
175+
176+
/// Convert the distribution to [`PbFragmentDistributionType`].
177+
pub fn to_distribution_type(&self) -> PbFragmentDistributionType {
178+
match self {
179+
Distribution::Singleton(_) => PbFragmentDistributionType::Single,
180+
Distribution::Hash(_) => PbFragmentDistributionType::Hash,
181+
}
182+
}
173183
}
174184

175185
/// [`Scheduler`] schedules the distribution of fragments in a stream graph.
@@ -272,7 +282,7 @@ impl Scheduler {
272282
// Building fragments and Singletons
273283
for (&id, fragment) in graph.building_fragments() {
274284
facts.push(Fact::Fragment(id));
275-
if fragment.is_singleton {
285+
if fragment.requires_singleton {
276286
facts.push(Fact::SingletonReq(id));
277287
}
278288
}

src/meta/src/stream/test_fragmenter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
209209
fragment_id: 2,
210210
node: Some(source_node),
211211
fragment_type_mask: FragmentTypeFlag::Source as u32,
212-
is_singleton: false,
212+
requires_singleton: false,
213213
table_ids_cnt: 0,
214214
upstream_table_ids: vec![],
215215
});
@@ -280,7 +280,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
280280
fragment_id: 1,
281281
node: Some(simple_agg_node),
282282
fragment_type_mask: FragmentTypeFlag::FragmentUnspecified as u32,
283-
is_singleton: false,
283+
requires_singleton: false,
284284
table_ids_cnt: 0,
285285
upstream_table_ids: vec![],
286286
});
@@ -368,7 +368,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
368368
fragment_id: 0,
369369
node: Some(mview_node),
370370
fragment_type_mask: FragmentTypeFlag::Mview as u32,
371-
is_singleton: true,
371+
requires_singleton: true,
372372
table_ids_cnt: 0,
373373
upstream_table_ids: vec![],
374374
});

0 commit comments

Comments
 (0)