Skip to content

Commit 3696f42

Browse files
committed
Merge remote-tracking branch 'origin/main' into bz/potential-stuck
Signed-off-by: Bugen Zhao <[email protected]>
2 parents 9e25ef4 + ff623ee commit 3696f42

File tree

26 files changed

+1061
-711
lines changed

26 files changed

+1061
-711
lines changed

Cargo.lock

Lines changed: 48 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/dashboards/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

grafana/risingwave-dev-dashboard.dashboard.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,6 +1454,16 @@ def section_streaming_errors(outer_panels):
14541454
),
14551455
],
14561456
),
1457+
panels.timeseries_count(
1458+
"Source Reader Errors by Type",
1459+
"",
1460+
[
1461+
panels.target(
1462+
f"sum({metric('user_source_reader_error_count')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
1463+
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
1464+
),
1465+
],
1466+
),
14571467
],
14581468
),
14591469
]
@@ -2256,13 +2266,53 @@ def section_hummock_tiered_cache(outer_panels):
22562266
),
22572267
],
22582268
),
2269+
panels.timeseries_count(
2270+
"Refill Queue Length",
2271+
"",
2272+
[
2273+
panels.target(
2274+
f"sum(refill_queue_length) by (instance)",
2275+
"refill queue length @ {{instance}}",
2276+
),
2277+
],
2278+
),
22592279
panels.timeseries_ops(
2260-
"Refill",
2280+
"Refill Ops",
22612281
"",
22622282
[
22632283
panels.target(
2264-
f"sum(rate({metric('compute_refill_data_file_cache_count')}[$__rate_interval])) by (extra, instance)",
2265-
"refill data file cache - {{extra}} @ {{instance}}",
2284+
f"sum(rate({metric('data_refill_duration_count')}[$__rate_interval])) by (op, instance)",
2285+
"data file cache refill - {{op}} @ {{instance}}",
2286+
),
2287+
panels.target(
2288+
f"sum(rate({metric('data_refill_filtered_total')}[$__rate_interval])) by (instance)",
2289+
"data file cache refill - filtered @ {{instance}}",
2290+
),
2291+
panels.target(
2292+
f"sum(rate({metric('meta_refill_duration_count')}[$__rate_interval])) by (op, instance)",
2293+
"meta file cache refill - {{op}} @ {{instance}}",
2294+
),
2295+
],
2296+
),
2297+
panels.timeseries_latency(
2298+
"Refill Latency",
2299+
"",
2300+
[
2301+
*quantile(
2302+
lambda quantile, legend: panels.target(
2303+
f"histogram_quantile({quantile}, sum(rate({metric('data_refill_duration_bucket')}[$__rate_interval])) by (le, op, instance))",
2304+
f"p{legend} - " +
2305+
"data file cache refill - {{op}} @ {{instance}}",
2306+
),
2307+
[50, 90, 99, "max"],
2308+
),
2309+
*quantile(
2310+
lambda quantile, legend: panels.target(
2311+
f"histogram_quantile({quantile}, sum(rate({metric('meta_refill_duration_bucket')}[$__rate_interval])) by (le, instance))",
2312+
f"p{legend} - " +
2313+
"meta cache refill @ {{instance}}",
2314+
),
2315+
[50, 90, 99, "max"],
22662316
),
22672317
],
22682318
),

grafana/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

proto/meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ message GetReschedulePlanRequest {
431431
repeated uint32 include_worker_ids = 1;
432432
repeated uint32 exclude_worker_ids = 2;
433433
optional uint32 target_parallelism = 3;
434+
optional uint32 target_parallelism_per_worker = 4;
434435
}
435436

436437
message StableResizePolicy {

src/common/src/config.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,9 @@ pub struct StorageConfig {
518518
#[serde(default)]
519519
pub meta_file_cache: FileCacheConfig,
520520

521+
#[serde(default)]
522+
pub cache_refill: CacheRefillConfig,
523+
521524
/// Whether to enable streaming upload for sstable.
522525
#[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
523526
pub min_sst_size_for_streaming_upload: u64,
@@ -570,6 +573,21 @@ pub struct StorageConfig {
570573
pub unrecognized: Unrecognized<Self>,
571574
}
572575

576+
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
577+
pub struct CacheRefillConfig {
578+
#[serde(default = "default::cache_refill::data_refill_levels")]
579+
pub data_refill_levels: Vec<u32>,
580+
581+
#[serde(default = "default::cache_refill::timeout_ms")]
582+
pub timeout_ms: u64,
583+
584+
#[serde(default = "default::cache_refill::concurrency")]
585+
pub concurrency: usize,
586+
587+
#[serde(default, flatten)]
588+
pub unrecognized: Unrecognized<Self>,
589+
}
590+
573591
/// The subsection `[storage.data_file_cache]` and `[storage.meta_file_cache]` in `risingwave.toml`.
574592
///
575593
/// It's put at [`StorageConfig::data_file_cache`] and [`StorageConfig::meta_file_cache`].
@@ -617,9 +635,6 @@ pub struct FileCacheConfig {
617635
#[serde(default = "default::file_cache::reclaim_rate_limit_mb")]
618636
pub reclaim_rate_limit_mb: usize,
619637

620-
#[serde(default = "default::file_cache::refill_levels")]
621-
pub refill_levels: Vec<u32>,
622-
623638
#[serde(default, flatten)]
624639
pub unrecognized: Unrecognized<Self>,
625640
}
@@ -1110,10 +1125,20 @@ pub mod default {
11101125
pub fn reclaim_rate_limit_mb() -> usize {
11111126
0
11121127
}
1128+
}
11131129

1114-
pub fn refill_levels() -> Vec<u32> {
1130+
pub mod cache_refill {
1131+
pub fn data_refill_levels() -> Vec<u32> {
11151132
vec![]
11161133
}
1134+
1135+
pub fn timeout_ms() -> u64 {
1136+
6000
1137+
}
1138+
1139+
pub fn concurrency() -> usize {
1140+
100
1141+
}
11171142
}
11181143

11191144
pub mod auto_dump_heap_profile {

src/config/example.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ lfu_tiny_lru_capacity_ratio = 0.01
122122
rated_random_rate_mb = 0
123123
flush_rate_limit_mb = 0
124124
reclaim_rate_limit_mb = 0
125-
refill_levels = []
126125

127126
[storage.meta_file_cache]
128127
dir = ""
@@ -138,7 +137,11 @@ lfu_tiny_lru_capacity_ratio = 0.01
138137
rated_random_rate_mb = 0
139138
flush_rate_limit_mb = 0
140139
reclaim_rate_limit_mb = 0
141-
refill_levels = []
140+
141+
[storage.cache_refill]
142+
data_refill_levels = []
143+
timeout_ms = 6000
144+
concurrency = 100
142145

143146
[system]
144147
barrier_interval_ms = 1000

src/connector/src/parser/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,8 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
517517

518518
Err(error) => {
519519
tracing::warn!(%error, "message parsing failed, skipping");
520-
// This will throw an error for batch
521-
parser.source_ctx().report_user_source_error(error)?;
520+
// Skip for batch
521+
parser.source_ctx().report_user_source_error(error);
522522
continue;
523523
}
524524
}

src/connector/src/source/base.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use itertools::Itertools;
2525
use parking_lot::Mutex;
2626
use risingwave_common::array::StreamChunk;
2727
use risingwave_common::catalog::TableId;
28-
use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError};
28+
use risingwave_common::error::{ErrorCode, ErrorSuppressor, RwError};
2929
use risingwave_common::types::{JsonbVal, Scalar};
3030
use risingwave_pb::connector_service::PbTableSchema;
3131
use risingwave_pb::source::ConnectorSplit;
@@ -172,10 +172,9 @@ impl SourceContext {
172172
ctx
173173
}
174174

175-
pub(crate) fn report_user_source_error(&self, e: RwError) -> RwResult<()> {
176-
// Repropagate the error if batch
175+
pub(crate) fn report_user_source_error(&self, e: RwError) {
177176
if self.source_info.fragment_id == u32::MAX {
178-
return Err(e);
177+
return;
179178
}
180179
let mut err_str = e.inner().to_string();
181180
if let Some(suppressor) = &self.error_suppressor
@@ -198,7 +197,6 @@ impl SourceContext {
198197
&self.source_info.source_id.table_id.to_string(),
199198
])
200199
.inc();
201-
Ok(())
202200
}
203201
}
204202

src/ctl/src/cmd_impl/scale/resize.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
120120
exclude_workers,
121121
include_workers,
122122
target_parallelism,
123+
target_parallelism_per_worker,
123124
generate,
124125
output,
125126
yes,
@@ -132,8 +133,15 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
132133
let include_worker_ids =
133134
worker_input_to_worker_ids(include_workers.unwrap_or_default(), true);
134135

135-
if let Some(target) = target_parallelism && target == 0 {
136-
fail!("Target parallelism must be greater than 0");
136+
match (target_parallelism, target_parallelism_per_worker) {
137+
(Some(_), Some(_)) => {
138+
fail!("Cannot specify both target parallelism and target parallelism per worker")
139+
}
140+
(_, Some(_)) if include_worker_ids.is_empty() => {
141+
fail!("Cannot specify target parallelism per worker without including any worker")
142+
}
143+
(Some(target), _) if target == 0 => fail!("Target parallelism must be greater than 0"),
144+
_ => {}
137145
}
138146

139147
for worker_id in exclude_worker_ids.iter().chain(include_worker_ids.iter()) {
@@ -161,6 +169,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
161169
include_worker_ids,
162170
exclude_worker_ids,
163171
target_parallelism,
172+
target_parallelism_per_worker,
164173
}
165174
};
166175

src/ctl/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,15 @@ pub struct ScaleResizeCommands {
288288
#[clap(long)]
289289
target_parallelism: Option<u32>,
290290

291+
/// The target parallelism per worker, conflicts with `target_parallelism`, requires
292+
/// `include_workers` to be set.
293+
#[clap(
294+
long,
295+
requires = "include_workers",
296+
conflicts_with = "target_parallelism"
297+
)]
298+
target_parallelism_per_worker: Option<u32>,
299+
291300
/// Will generate a plan supported by the `reschedule` command and save it to the provided path
292301
/// by the `--output`.
293302
#[clap(long, default_value_t = false)]

src/frontend/src/catalog/system_catalog/information_schema/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
pub mod columns;
1616
pub mod tables;
17+
pub mod views;
1718

1819
pub use columns::*;
1920
pub use tables::*;
21+
pub use views::*;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::LazyLock;
16+
17+
use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
18+
use risingwave_common::types::DataType;
19+
20+
use crate::catalog::system_catalog::BuiltinView;
21+
22+
/// The view `views` contains all views defined in the current database. Only those views
23+
/// are shown that the current user has access to (by way of being the owner or having
24+
/// some privilege).
25+
/// Ref: [`https://www.postgresql.org/docs/current/infoschema-views.html`]
26+
///
27+
/// In RisingWave, `views` contains information about defined views.
28+
pub static INFORMATION_SCHEMA_VIEWS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
29+
name: "views",
30+
schema: INFORMATION_SCHEMA_SCHEMA_NAME,
31+
columns: &[
32+
(DataType::Varchar, "table_catalog"),
33+
(DataType::Varchar, "table_schema"),
34+
(DataType::Varchar, "table_name"),
35+
(DataType::Varchar, "view_definition"),
36+
],
37+
sql: "SELECT CURRENT_DATABASE() AS table_catalog, \
38+
s.name AS table_schema, \
39+
v.name AS table_name, \
40+
v.definition AS view_definition \
41+
FROM rw_catalog.rw_views v \
42+
JOIN rw_catalog.rw_schemas s ON v.schema_id = s.id \
43+
ORDER BY table_schema, table_name"
44+
.to_string(),
45+
});

src/frontend/src/catalog/system_catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ prepare_sys_catalog! {
377377
{ BuiltinCatalog::View(&PG_DEPEND) },
378378
{ BuiltinCatalog::View(&INFORMATION_SCHEMA_COLUMNS) },
379379
{ BuiltinCatalog::View(&INFORMATION_SCHEMA_TABLES) },
380+
{ BuiltinCatalog::View(&INFORMATION_SCHEMA_VIEWS) },
380381
{ BuiltinCatalog::Table(&RW_DATABASES), read_rw_database_info },
381382
{ BuiltinCatalog::Table(&RW_SCHEMAS), read_rw_schema_info },
382383
{ BuiltinCatalog::Table(&RW_USERS), read_rw_user_info },

0 commit comments

Comments
 (0)