Skip to content

revert: remove enable_stream_row_count config #10261 #11328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ def section_streaming(panels):
mv_filter = "executor_identity=~\".*MaterializeExecutor.*\""
table_type_filter = "table_type=~\"MATERIALIZED_VIEW\""
mv_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=mv_filter)}[$__rate_interval]) * on(actor_id) group_left(materialized_view_id, table_name) (group({metric("table_info", filter=table_type_filter)}) by (actor_id, materialized_view_id, table_name))) by (materialized_view_id, table_name)'
sink_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=sink_filter)}[$__rate_interval]) * on(actor_id) group_left(sink_name) (group({metric("sink_info")}) by (actor_id, sink_name))) by (sink_name)'
return [
panels.row("Streaming"),
panels.timeseries_rowsps(
Expand Down Expand Up @@ -705,26 +706,22 @@ def section_streaming(panels):
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s)",
"The figure shows the number of rows output by each sink executor actor per second.",
"The figure shows the number of rows output by each sink per second.",
[
panels.target(
f"rate({metric('stream_executor_row_count', filter=sink_filter)}[$__rate_interval])",
"sink={{executor_identity}} {{actor_id}} @ {{instance}}",
sink_throughput_query,
"sink {{sink_name}}",
),
],
),

panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized executor actor per second.",
"The figure shows the number of rows written into each materialized view per second.",
[
panels.target(
f"rate({metric('stream_executor_row_count', filter=mv_filter)}[$__rate_interval])",
"{{executor_identity}} {{actor_id}} @ {{instance}}",
),
panels.target(
mv_throughput_query,
"materialized view {{table_name}} table_id {{materialized_view_id}}",
"materialized view {{table_name}} table_id {{materialized_view_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,12 @@ serde_with::with_prefix!(batch_prefix "batch_");
/// It is put at [`StreamingConfig::developer`].
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct StreamingDeveloperConfig {
/// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
/// and might affect the prometheus performance. If you only need actor input and output
/// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
#[serde(default = "default::developer::stream_enable_executor_row_count")]
pub enable_executor_row_count: bool,

/// The capacity of the chunks in the channel that connects between `ConnectorSource` and
/// `SourceExecutor`.
#[serde(default = "default::developer::connector_message_buffer_size")]
Expand Down Expand Up @@ -1089,6 +1095,10 @@ pub mod default {
1024
}

pub fn stream_enable_executor_row_count() -> bool {
false
}

pub fn connector_message_buffer_size() -> usize {
16
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async_stack_trace = "ReleaseVerbose"
unique_user_stream_errors = 10

[streaming.developer]
stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
Expand Down
25 changes: 25 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use prometheus::{
use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::common::WorkerType;
use risingwave_pb::stream_plan::stream_node::NodeBody::Sink;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -163,6 +164,8 @@ pub struct MetaMetrics {
pub actor_info: IntGaugeVec,
/// A dummpy gauge metrics with its label to be the mapping from table id to actor id
pub table_info: IntGaugeVec,
/// A dummy gauge metrics with its label to be the mapping from actor id to sink id
pub sink_info: IntGaugeVec,

/// Write throughput of commit epoch for each stable
pub table_write_throughput: IntCounterVec,
Expand Down Expand Up @@ -527,6 +530,14 @@ impl MetaMetrics {
)
.unwrap();

let sink_info = register_int_gauge_vec_with_registry!(
"sink_info",
"Mapping from actor id to (actor id, sink name)",
&["actor_id", "sink_name",],
registry
)
.unwrap();

let l0_compact_level_count = register_histogram_vec_with_registry!(
"storage_l0_compact_level_count",
"level_count of l0 compact task",
Expand Down Expand Up @@ -653,6 +664,7 @@ impl MetaMetrics {
source_enumerator_metrics,
actor_info,
table_info,
sink_info,
l0_compact_level_count,
compact_task_size,
compact_task_file_count,
Expand Down Expand Up @@ -793,6 +805,19 @@ pub async fn start_fragment_info_monitor<S: MetaStore>(
}
}

if let Some(stream_node) = &actor.nodes {
if let Some(Sink(sink_node)) = &stream_node.node_body {
let sink_name = match &sink_node.sink_desc {
Some(sink_desc) => &sink_desc.name,
_ => "unknown",
};
meta_metrics
.sink_info
.with_label_values(&[&actor_id_str, sink_name])
.set(1);
}
}

// Report a dummy gauge metrics with (table id, actor id, table
// name) as its label

Expand Down
25 changes: 22 additions & 3 deletions src/stream/src/executor/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct WrapperExecutor {
input: BoxedExecutor,

extra: ExtraInfo,

enable_executor_row_count: bool,
}

impl WrapperExecutor {
Expand All @@ -53,6 +55,7 @@ impl WrapperExecutor {
actor_id: ActorId,
executor_id: u64,
metrics: Arc<StreamingMetrics>,
enable_executor_row_count: bool,
) -> Self {
Self {
input,
Expand All @@ -62,11 +65,13 @@ impl WrapperExecutor {
executor_id,
metrics,
},
enable_executor_row_count,
}
}

#[allow(clippy::let_and_return)]
fn wrap_debug(
_enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
_extra: ExtraInfo,
stream: impl MessageStream + 'static,
Expand All @@ -78,6 +83,7 @@ impl WrapperExecutor {
}

fn wrap(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
extra: ExtraInfo,
stream: impl MessageStream + 'static,
Expand All @@ -98,6 +104,7 @@ impl WrapperExecutor {

// Trace
let stream = trace::trace(
enable_executor_row_count,
info.clone(),
extra.input_pos,
extra.actor_id,
Expand All @@ -107,7 +114,7 @@ impl WrapperExecutor {
);

if cfg!(debug_assertions) {
Self::wrap_debug(info, extra, stream).boxed()
Self::wrap_debug(enable_executor_row_count, info, extra, stream).boxed()
} else {
stream.boxed()
}
Expand All @@ -117,12 +124,24 @@ impl WrapperExecutor {
impl Executor for WrapperExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(info, self.extra, self.input.execute()).boxed()
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute(),
)
.boxed()
}

fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(info, self.extra, self.input.execute_with_epoch(epoch)).boxed()
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute_with_epoch(epoch),
)
.boxed()
}

fn schema(&self) -> &Schema {
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::task::ActorId;
/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn trace(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
_input_pos: usize,
actor_id: ActorId,
Expand All @@ -38,14 +39,16 @@ pub async fn trace(

let span_name = pretty_identity(&info.identity, actor_id, executor_id);

let is_sink_or_mv = info.identity.contains("Materialize") || info.identity.contains("Sink");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fragile but I'm unsure if there's a better way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me open an issue to track this, a quick fix for Kaito

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It seems a little bit hacky here.


let new_span = || tracing::info_span!("executor", "otel.name" = span_name, actor_id);
let mut span = new_span();

pin_mut!(input);

while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
if let Message::Chunk(chunk) = &message {
if chunk.cardinality() > 0 {
if chunk.cardinality() > 0 && (enable_executor_row_count || is_sink_or_mv) {
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &span_name])
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ impl LocalStreamManagerCore {
actor_context.id,
executor_id,
self.streaming_metrics.clone(),
self.config.developer.enable_executor_row_count,
)
.boxed();

Expand Down