Skip to content

Commit 554201c

Browse files
committed
fix conflicts
1 parent ab72c64 commit 554201c

File tree

18 files changed

+212
-568
lines changed

18 files changed

+212
-568
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/src/session_config/mod.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::util::epoch::Epoch;
3636

3737
// This is a hack, &'static str is not allowed as a const generics argument.
3838
// TODO: refine this using the adt_const_params feature.
39-
const CONFIG_KEYS: [&str; 38] = [
39+
const CONFIG_KEYS: [&str; 37] = [
4040
"RW_IMPLICIT_FLUSH",
4141
"CREATE_COMPACTION_GROUP_FOR_MV",
4242
"QUERY_MODE",
@@ -74,7 +74,6 @@ const CONFIG_KEYS: [&str; 38] = [
7474
"RW_STREAMING_RATE_LIMIT",
7575
"CDC_BACKFILL",
7676
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
77-
"STREAMING_ENABLE_ARRANGEMENT_BACKFILL",
7877
];
7978

8079
// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
@@ -116,7 +115,6 @@ const STANDARD_CONFORMING_STRINGS: usize = 33;
116115
const RW_STREAMING_RATE_LIMIT: usize = 34;
117116
const CDC_BACKFILL: usize = 35;
118117
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;
119-
const STREAMING_ENABLE_ARRANGEMENT_BACKFILL: usize = 37;
120118

121119
trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
122120
fn entry_name() -> &'static str;
@@ -322,8 +320,6 @@ type Timezone = ConfigString<TIMEZONE>;
322320
type StreamingParallelism = ConfigU64<STREAMING_PARALLELISM, 0>;
323321
type StreamingEnableDeltaJoin = ConfigBool<STREAMING_ENABLE_DELTA_JOIN, false>;
324322
type StreamingEnableBushyJoin = ConfigBool<STREAMING_ENABLE_BUSHY_JOIN, true>;
325-
// TODO: Revert to false before merging.
326-
type StreamingEnableArrangementBackfill = ConfigBool<STREAMING_ENABLE_ARRANGEMENT_BACKFILL, true>;
327323
type EnableTwoPhaseAgg = ConfigBool<ENABLE_TWO_PHASE_AGG, true>;
328324
type ForceTwoPhaseAgg = ConfigBool<FORCE_TWO_PHASE_AGG, false>;
329325
type EnableSharePlan = ConfigBool<RW_ENABLE_SHARE_PLAN, true>;
@@ -416,9 +412,6 @@ pub struct ConfigMap {
416412
/// Enable bushy join for streaming queries. Defaults to true.
417413
streaming_enable_bushy_join: StreamingEnableBushyJoin,
418414

419-
/// Enable arrangement backfill for streaming queries. Defaults to false.
420-
streaming_enable_arrangement_backfill: StreamingEnableArrangementBackfill,
421-
422415
/// Enable join ordering for streaming and batch queries. Defaults to true.
423416
enable_join_ordering: EnableJoinOrdering,
424417

@@ -546,8 +539,6 @@ impl ConfigMap {
546539
self.streaming_enable_delta_join = val.as_slice().try_into()?;
547540
} else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) {
548541
self.streaming_enable_bushy_join = val.as_slice().try_into()?;
549-
} else if key.eq_ignore_ascii_case(StreamingEnableArrangementBackfill::entry_name()) {
550-
self.streaming_enable_arrangement_backfill = val.as_slice().try_into()?;
551542
} else if key.eq_ignore_ascii_case(EnableJoinOrdering::entry_name()) {
552543
self.enable_join_ordering = val.as_slice().try_into()?;
553544
} else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) {
@@ -654,8 +645,6 @@ impl ConfigMap {
654645
Ok(self.streaming_enable_delta_join.to_string())
655646
} else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) {
656647
Ok(self.streaming_enable_bushy_join.to_string())
657-
} else if key.eq_ignore_ascii_case(StreamingEnableArrangementBackfill::entry_name()) {
658-
Ok(self.streaming_enable_arrangement_backfill.to_string())
659648
} else if key.eq_ignore_ascii_case(EnableJoinOrdering::entry_name()) {
660649
Ok(self.enable_join_ordering.to_string())
661650
} else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) {
@@ -788,11 +777,6 @@ impl ConfigMap {
788777
setting : self.streaming_enable_bushy_join.to_string(),
789778
description: String::from("Enable bushy join in streaming queries.")
790779
},
791-
VariableInfo{
792-
name : StreamingEnableArrangementBackfill::entry_name().to_lowercase(),
793-
setting : self.streaming_enable_arrangement_backfill.to_string(),
794-
description: String::from("Enable arrangement backfill in streaming queries.")
795-
},
796780
VariableInfo{
797781
name : EnableJoinOrdering::entry_name().to_lowercase(),
798782
setting : self.enable_join_ordering.to_string(),
@@ -975,10 +959,6 @@ impl ConfigMap {
975959
*self.streaming_enable_bushy_join
976960
}
977961

978-
pub fn get_streaming_enable_arrangement_backfill(&self) -> bool {
979-
*self.streaming_enable_arrangement_backfill
980-
}
981-
982962
pub fn get_enable_join_ordering(&self) -> bool {
983963
*self.enable_join_ordering
984964
}

src/storage/src/filter_key_extractor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,6 @@ mod tests {
549549
cardinality: None,
550550
created_at_epoch: None,
551551
cleaned_by_watermark: false,
552-
output_indices: vec![],
553552
}
554553
}
555554

src/storage/src/hummock/state_store.rs

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::future::Future;
1616
use std::ops::Bound;
1717
use std::sync::atomic::Ordering as MemOrdering;
18-
use std::time::Duration;
1918

2019
use bytes::Bytes;
2120
use itertools::Itertools;
@@ -35,6 +34,7 @@ use crate::hummock::event_handler::HummockEvent;
3534
use crate::hummock::store::memtable::ImmutableMemtable;
3635
use crate::hummock::store::state_store::LocalHummockStorage;
3736
use crate::hummock::store::version::read_filter_for_batch;
37+
use crate::hummock::utils::wait_for_epoch;
3838
use crate::hummock::{HummockEpoch, HummockError};
3939
use crate::monitor::StoreLocalStatistic;
4040
use crate::store::*;
@@ -186,42 +186,8 @@ impl StateStore for HummockStorage {
186186
}
187187
_ => return Ok(()),
188188
};
189-
let mut receiver = self.version_update_notifier_tx.subscribe();
190-
// avoid unnecessary check in the loop if the value does not change
191-
let max_committed_epoch = *receiver.borrow_and_update();
192-
if max_committed_epoch >= wait_epoch {
193-
return Ok(());
194-
}
195-
loop {
196-
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
197-
Err(elapsed) => {
198-
// The reason that we need to retry here is batch scan in
199-
// chain/rearrange_chain is waiting for an
200-
// uncommitted epoch carried by the CreateMV barrier, which
201-
// can take unbounded time to become committed and propagate
202-
// to the CN. We should consider removing the retry as well as wait_epoch
203-
// for chain/rearrange_chain if we enforce
204-
// chain/rearrange_chain to be scheduled on the same
205-
// CN with the same distribution as the upstream MV.
206-
// See #3845 for more details.
207-
tracing::warn!(
208-
"wait_epoch {:?} timeout when waiting for version update elapsed {:?}s",
209-
wait_epoch,
210-
elapsed
211-
);
212-
continue;
213-
}
214-
Ok(Err(_)) => {
215-
return Err(HummockError::wait_epoch("tx dropped").into());
216-
}
217-
Ok(Ok(_)) => {
218-
let max_committed_epoch = *receiver.borrow();
219-
if max_committed_epoch >= wait_epoch {
220-
return Ok(());
221-
}
222-
}
223-
}
224-
}
189+
let receiver = self.version_update_notifier_tx.subscribe();
190+
wait_for_epoch(receiver, wait_epoch).await
225191
}
226192

227193
async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {

src/storage/src/hummock/store/state_store.rs

Lines changed: 5 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::future::Future;
1616
use std::ops::Bound;
1717
use std::sync::Arc;
18-
use std::time::Duration;
1918

2019
use await_tree::InstrumentAwait;
2120
use bytes::Bytes;
@@ -40,10 +39,10 @@ use crate::hummock::shared_buffer::shared_buffer_batch::{
4039
use crate::hummock::store::version::{read_filter_for_local, HummockVersionReader};
4140
use crate::hummock::utils::{
4241
cmp_delete_range_left_bounds, do_delete_sanity_check, do_insert_sanity_check,
43-
do_update_sanity_check, filter_with_delete_range, ENABLE_SANITY_CHECK,
42+
do_update_sanity_check, filter_with_delete_range, wait_for_epoch, ENABLE_SANITY_CHECK,
4443
};
4544
use crate::hummock::write_limiter::WriteLimiterRef;
46-
use crate::hummock::{HummockError, MemoryLimiter, SstableIterator};
45+
use crate::hummock::{MemoryLimiter, SstableIterator};
4746
use crate::mem_table::{merge_stream, KeyOp, MemTable};
4847
use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
4948
use crate::storage_value::StorageValue;
@@ -119,46 +118,9 @@ impl LocalHummockStorage {
119118
.await
120119
}
121120

122-
pub async fn wait_for_epoch(&self, epoch: u64) -> StorageResult<()> {
123-
// TODO(kwannoel): Refactor this copy-paste code.
124-
// TODO(kwannoel): Also we should take in read options, because only initial snapshot read
125-
// will require this.
126-
let wait_epoch = epoch;
127-
let mut receiver = self.version_update_notifier_tx.subscribe();
128-
let max_committed_epoch = *receiver.borrow_and_update();
129-
if max_committed_epoch >= wait_epoch {
130-
return Ok(());
131-
}
132-
loop {
133-
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
134-
Err(elapsed) => {
135-
// The reason that we need to retry here is batch scan in
136-
// chain/rearrange_chain is waiting for an
137-
// uncommitted epoch carried by the CreateMV barrier, which
138-
// can take unbounded time to become committed and propagate
139-
// to the CN. We should consider removing the retry as well as wait_epoch
140-
// for chain/rearrange_chain if we enforce
141-
// chain/rearrange_chain to be scheduled on the same
142-
// CN with the same distribution as the upstream MV.
143-
// See #3845 for more details.
144-
tracing::warn!(
145-
"wait_epoch {:?} timeout when waiting for version update elapsed {:?}s",
146-
wait_epoch,
147-
elapsed
148-
);
149-
continue;
150-
}
151-
Ok(Err(_)) => {
152-
return Err(HummockError::wait_epoch("tx dropped").into());
153-
}
154-
Ok(Ok(_)) => {
155-
let max_committed_epoch = *receiver.borrow();
156-
if max_committed_epoch >= wait_epoch {
157-
return Ok(());
158-
}
159-
}
160-
}
161-
}
121+
pub async fn wait_for_epoch(&self, wait_epoch: u64) -> StorageResult<()> {
122+
let receiver = self.version_update_notifier_tx.subscribe();
123+
wait_for_epoch(receiver, wait_epoch).await
162124
}
163125

164126
pub async fn iter_inner(

src/storage/src/hummock/utils.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ use std::ops::Bound::{Excluded, Included, Unbounded};
1818
use std::ops::{Bound, RangeBounds};
1919
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
2020
use std::sync::Arc;
21+
use std::time::Duration;
2122

2223
use bytes::Bytes;
2324
use risingwave_common::cache::CachePriority;
2425
use risingwave_common::catalog::{TableId, TableOption};
25-
use risingwave_hummock_sdk::can_concat;
2626
use risingwave_hummock_sdk::key::{
2727
bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey,
2828
};
29+
use risingwave_hummock_sdk::{can_concat, HummockEpoch};
2930
use risingwave_pb::hummock::{HummockVersion, SstableInfo};
31+
use tokio::sync::watch::Receiver;
3032
use tokio::sync::Notify;
3133

3234
use super::{HummockError, HummockResult};
@@ -550,6 +552,47 @@ pub(crate) fn filter_with_delete_range<'a>(
550552
})
551553
}
552554

555+
pub(crate) async fn wait_for_epoch(
556+
mut receiver: Receiver<HummockEpoch>,
557+
wait_epoch: u64,
558+
) -> StorageResult<()> {
559+
// avoid unnecessary check in the loop if the value does not change
560+
let max_committed_epoch = *receiver.borrow_and_update();
561+
if max_committed_epoch >= wait_epoch {
562+
return Ok(());
563+
}
564+
loop {
565+
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
566+
Err(elapsed) => {
567+
// The reason that we need to retry here is batch scan in
568+
// chain/rearrange_chain is waiting for an
569+
// uncommitted epoch carried by the CreateMV barrier, which
570+
// can take unbounded time to become committed and propagate
571+
// to the CN. We should consider removing the retry as well as wait_epoch
572+
// for chain/rearrange_chain if we enforce
573+
// chain/rearrange_chain to be scheduled on the same
574+
// CN with the same distribution as the upstream MV.
575+
// See #3845 for more details.
576+
tracing::warn!(
577+
"wait_epoch {:?} timeout when waiting for version update elapsed {:?}s",
578+
wait_epoch,
579+
elapsed
580+
);
581+
continue;
582+
}
583+
Ok(Err(_)) => {
584+
return Err(HummockError::wait_epoch("tx dropped").into());
585+
}
586+
Ok(Ok(_)) => {
587+
let max_committed_epoch = *receiver.borrow();
588+
if max_committed_epoch >= wait_epoch {
589+
return Ok(());
590+
}
591+
}
592+
}
593+
}
594+
}
595+
553596
#[cfg(test)]
554597
mod tests {
555598
use std::future::{poll_fn, Future};

0 commit comments

Comments
 (0)