Skip to content

Commit e4b9a08

Browse files
authored
refactor(streaming): better traces for development (risingwavelabs#12024)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 65fc959 commit e4b9a08

File tree

12 files changed

+105
-37
lines changed

12 files changed

+105
-37
lines changed

docs/developer-guide.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,10 @@ The Rust components use `tokio-tracing` to handle both logging and tracing. The
272272
* Third-party libraries: warn
273273
* Other libraries: debug
274274
275-
If you need to adjust log levels, change the logging filters in `src/utils/runtime/src/lib.rs`.
275+
If you need to override the default log levels, launch RisingWave with the environment variable `RUST_LOG` set as described [here](https://docs.rs/tracing-subscriber/0.3/tracing_subscriber/filter/struct.EnvFilter.html).
276+
277+
There're also some logs designated for debugging purposes with target names starting with `events::`.
278+
For example, by setting `RUST_LOG=events::stream::message::chunk=trace`, all chunk messages will be logged as it passes through the executors in the streaming engine. Search in the codebase to find more of them.
276279
277280
278281
## Test your code changes

src/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ tracing-opentelemetry = "0.21"
9999
tracing-subscriber = "0.3.17"
100100
twox-hash = "1"
101101
url = "2"
102-
uuid = "1.4.1"
102+
uuid = { version = "1", features = ["v4"] }
103103

104104
[target.'cfg(not(madsim))'.dependencies]
105105
workspace-hack = { path = "../workspace-hack" }

src/common/src/array/data_chunk.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Display;
1516
use std::hash::BuildHasher;
1617
use std::sync::Arc;
1718
use std::{fmt, usize};
1819

1920
use bytes::Bytes;
21+
use either::Either;
2022
use itertools::Itertools;
2123
use rand::rngs::SmallRng;
2224
use rand::{Rng, SeedableRng};
@@ -65,6 +67,8 @@ pub struct DataChunk {
6567
}
6668

6769
impl DataChunk {
70+
pub(crate) const PRETTY_TABLE_PRESET: &str = "||--+-++| ++++++";
71+
6872
/// Create a `DataChunk` with `columns` and visibility. The visibility can either be a `Bitmap`
6973
/// or a simple cardinality number.
7074
pub fn new<V: Into<Vis>>(columns: Vec<ArrayRef>, vis: V) -> Self {
@@ -392,24 +396,31 @@ impl DataChunk {
392396
RowRef::new(self, pos)
393397
}
394398

395-
/// `to_pretty_string` returns a table-like text representation of the `DataChunk`.
396-
pub fn to_pretty_string(&self) -> String {
399+
/// Returns a table-like text representation of the `DataChunk`.
400+
pub fn to_pretty(&self) -> impl Display {
397401
use comfy_table::Table;
402+
403+
if self.cardinality() == 0 {
404+
return Either::Left("(empty)");
405+
}
406+
398407
let mut table = Table::new();
399-
table.load_preset("||--+-++| ++++++\n");
408+
table.load_preset(Self::PRETTY_TABLE_PRESET);
409+
400410
for row in self.rows() {
401411
let cells: Vec<_> = row
402412
.iter()
403413
.map(|v| {
404414
match v {
405-
None => "".to_owned(), // null
415+
None => "".to_owned(), // NULL
406416
Some(scalar) => scalar.to_text(),
407417
}
408418
})
409419
.collect();
410420
table.add_row(cells);
411421
}
412-
table.to_string()
422+
423+
Either::Right(table)
413424
}
414425

415426
/// Keep the specified columns and set the rest elements to null.
@@ -626,7 +637,7 @@ impl fmt::Debug for DataChunk {
626637
"DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
627638
self.cardinality(),
628639
self.capacity(),
629-
self.to_pretty_string()
640+
self.to_pretty()
630641
)
631642
}
632643
}
@@ -1007,7 +1018,7 @@ mod tests {
10071018
4,
10081019
);
10091020
assert_eq!(
1010-
chunk.to_pretty_string(),
1021+
chunk.to_pretty().to_string(),
10111022
"\
10121023
+---+---+
10131024
| 1 | 6 |

src/common/src/array/stream_chunk.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Display;
1516
use std::mem::size_of;
1617
use std::ops::{Deref, DerefMut};
1718
use std::sync::Arc;
1819
use std::{fmt, mem};
1920

21+
use either::Either;
2022
use itertools::Itertools;
2123
use rand::prelude::SmallRng;
2224
use rand::{Rng, SeedableRng};
@@ -26,6 +28,7 @@ use super::vis::VisMut;
2628
use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt};
2729
use crate::array::{DataChunk, Vis};
2830
use crate::buffer::Bitmap;
31+
use crate::catalog::Schema;
2932
use crate::estimate_size::EstimateSize;
3033
use crate::field_generator::VarcharProperty;
3134
use crate::row::Row;
@@ -199,16 +202,34 @@ impl StreamChunk {
199202
&self.ops
200203
}
201204

202-
/// `to_pretty_string` returns a table-like text representation of the `StreamChunk`.
203-
pub fn to_pretty_string(&self) -> String {
205+
/// Returns a table-like text representation of the `StreamChunk`.
206+
pub fn to_pretty(&self) -> impl Display {
207+
self.to_pretty_inner(None)
208+
}
209+
210+
/// Returns a table-like text representation of the `StreamChunk` with a header of column names
211+
/// from the given `schema`.
212+
pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display {
213+
self.to_pretty_inner(Some(schema))
214+
}
215+
216+
fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display {
204217
use comfy_table::{Cell, CellAlignment, Table};
205218

206219
if self.cardinality() == 0 {
207-
return "(empty)".to_owned();
220+
return Either::Left("(empty)");
208221
}
209222

210223
let mut table = Table::new();
211-
table.load_preset("||--+-++| ++++++");
224+
table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
225+
226+
if let Some(schema) = schema {
227+
assert_eq!(self.dimension(), schema.len());
228+
let cells = std::iter::once(String::new())
229+
.chain(schema.fields().iter().map(|f| f.name.clone()));
230+
table.set_header(cells);
231+
}
232+
212233
for (op, row_ref) in self.rows() {
213234
let mut cells = Vec::with_capacity(row_ref.len() + 1);
214235
cells.push(
@@ -229,7 +250,8 @@ impl StreamChunk {
229250
}
230251
table.add_row(cells);
231252
}
232-
table.to_string()
253+
254+
Either::Right(table)
233255
}
234256

235257
/// Reorder (and possibly remove) columns.
@@ -290,7 +312,7 @@ impl fmt::Debug for StreamChunk {
290312
"StreamChunk {{ cardinality: {}, capacity: {}, data: \n{}\n }}",
291313
self.cardinality(),
292314
self.capacity(),
293-
self.to_pretty_string()
315+
self.to_pretty()
294316
)
295317
} else {
296318
f.debug_struct("StreamChunk")
@@ -636,7 +658,7 @@ mod tests {
636658
U+ 4 .",
637659
);
638660
assert_eq!(
639-
chunk.to_pretty_string(),
661+
chunk.to_pretty().to_string(),
640662
"\
641663
+----+---+---+
642664
| + | 1 | 6 |

src/stream/src/executor/dispatch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ impl Dispatcher for HashDataDispatcher {
624624
// get hash value of every line by its key
625625
let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys);
626626

627-
tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty_string(), self.keys, vnodes);
627+
tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
628628

629629
let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
630630
.take(num_outputs)

src/stream/src/executor/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl MergeExecutor {
138138
}
139139
Message::Barrier(barrier) => {
140140
tracing::trace!(
141-
target: "events::barrier::path",
141+
target: "events::stream::barrier::path",
142142
actor_id = actor_id,
143143
"receiver receives barrier from path: {:?}",
144144
barrier.passed_actors

src/stream/src/executor/receiver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl Executor for ReceiverExecutor {
140140
}
141141
Message::Barrier(barrier) => {
142142
tracing::trace!(
143-
target: "events::barrier::path",
143+
target: "events::stream::barrier::path",
144144
actor_id = actor_id,
145145
"receiver receives barrier from path: {:?}",
146146
barrier.passed_actors

src/stream/src/executor/wrapper/trace.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,45 @@ pub async fn trace(
4747
pin_mut!(input);
4848

4949
while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
50-
if let Message::Chunk(chunk) = &message {
51-
if chunk.cardinality() > 0 && (enable_executor_row_count || is_sink_or_mv) {
52-
metrics
53-
.executor_row_count
54-
.with_label_values(&[&actor_id_string, &span_name])
55-
.inc_by(chunk.cardinality() as u64);
56-
tracing::trace!(?chunk, "chunk");
50+
// Trace the message in the span's scope.
51+
span.in_scope(|| match &message {
52+
Message::Chunk(chunk) => {
53+
if chunk.cardinality() > 0 {
54+
if enable_executor_row_count || is_sink_or_mv {
55+
metrics
56+
.executor_row_count
57+
.with_label_values(&[&actor_id_string, &span_name])
58+
.inc_by(chunk.cardinality() as u64);
59+
}
60+
tracing::trace!(
61+
target: "events::stream::message::chunk",
62+
cardinality = chunk.cardinality(),
63+
capacity = chunk.capacity(),
64+
"\n{}\n", chunk.to_pretty_with_schema(&info.schema),
65+
);
66+
}
5767
}
58-
}
68+
Message::Watermark(watermark) => {
69+
tracing::trace!(
70+
target: "events::stream::message::watermark",
71+
value = ?watermark.val,
72+
col_idx = watermark.col_idx,
73+
);
74+
}
75+
Message::Barrier(barrier) => {
76+
tracing::trace!(
77+
target: "events::stream::message::barrier",
78+
prev_epoch = barrier.epoch.prev,
79+
curr_epoch = barrier.epoch.curr,
80+
kind = ?barrier.kind,
81+
);
82+
}
83+
});
5984

85+
// Yield the message and update the span.
6086
match &message {
6187
Message::Chunk(_) | Message::Watermark(_) => yield message,
62-
63-
Message::Barrier(_barrier) => {
88+
Message::Barrier(_) => {
6489
// Drop the span as the inner executor has finished processing the barrier (then all
6590
// data from the previous epoch).
6691
let _ = std::mem::replace(&mut span, Span::none());

src/stream/src/task/barrier_manager.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ impl LocalBarrierManager {
101101

102102
/// Register sender for source actors, used to send barriers.
103103
pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender<Barrier>) {
104-
tracing::trace!(actor_id = actor_id, "register sender");
104+
tracing::trace!(
105+
target: "events::stream::barrier::manager",
106+
actor_id = actor_id,
107+
"register sender"
108+
);
105109
self.senders.entry(actor_id).or_default().push(sender);
106110
}
107111

@@ -129,6 +133,7 @@ impl LocalBarrierManager {
129133
};
130134
let to_collect: HashSet<ActorId> = actor_ids_to_collect.into_iter().collect();
131135
trace!(
136+
target: "events::stream::barrier::manager::send",
132137
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
133138
barrier,
134139
to_send,
@@ -167,7 +172,11 @@ impl LocalBarrierManager {
167172

168173
// Actors to stop should still accept this barrier, but won't get sent to in next times.
169174
if let Some(actors) = barrier.all_stop_actors() {
170-
trace!("remove actors {:?} from senders", actors);
175+
trace!(
176+
target: "events::stream::barrier::manager",
177+
"remove actors {:?} from senders",
178+
actors
179+
);
171180
for actor in actors {
172181
self.senders.remove(actor);
173182
}

src/stream/src/task/barrier_manager/managed_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl ManagedBarrierState {
205205
/// Collect a `barrier` from the actor with `actor_id`.
206206
pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) {
207207
tracing::trace!(
208-
target: "events::stream::barrier::collect_barrier",
208+
target: "events::stream::barrier::manager::collect",
209209
"collect_barrier: epoch = {}, actor_id = {}, state = {:#?}",
210210
barrier.epoch.curr,
211211
actor_id,

src/stream/tests/integration_tests/snapshot.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ where
154154
}
155155
SnapshotEvent::Chunk(chunk_str) => {
156156
let chunk = StreamChunk::from_pretty(chunk_str);
157-
*chunk_str = chunk.to_pretty_string();
157+
*chunk_str = chunk.to_pretty().to_string();
158158
tx.push_chunk(chunk);
159159
}
160160
SnapshotEvent::Watermark { col_idx, val } => tx.push_watermark(
@@ -191,10 +191,10 @@ fn run_until_pending(
191191
if options.sort_chunk {
192192
chunk = chunk.sort_rows();
193193
}
194-
let mut output = chunk.to_pretty_string();
194+
let mut output = chunk.to_pretty().to_string();
195195
if options.include_applied_result {
196196
let applied = store.apply_chunk(&chunk);
197-
output += &format!("\napplied result:\n{}", applied.to_pretty_string());
197+
output += &format!("\napplied result:\n{}", applied.to_pretty());
198198
}
199199
SnapshotEvent::Chunk(output)
200200
}

src/utils/runtime/src/logger.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets
4848
.with_target("foyer_memory", Level::WARN)
4949
.with_target("foyer_storage", Level::WARN)
5050
// disable events that are too verbose
51-
// if you want to enable any of them, find the target name and set it to `TRACE`
52-
// .with_target("events::stream::mview::scan", Level::TRACE)
5351
.with_target("events", Level::ERROR)
5452
}
5553

0 commit comments

Comments
 (0)