12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: { BTreeMap , HashMap } ;
15
+ use std:: collections:: { BTreeMap , HashMap , HashSet } ;
16
16
use std:: ffi:: CString ;
17
17
use std:: fs;
18
18
use std:: path:: Path ;
@@ -293,16 +293,45 @@ impl MonitorService for MonitorServiceImpl {
293
293
request : Request < GetProfileStatsRequest > ,
294
294
) -> Result < Response < GetProfileStatsResponse > , Status > {
295
295
let metrics = global_streaming_metrics ( MetricLevel :: Info ) ;
296
- let executor_ids = & request. into_inner ( ) . executor_ids ;
296
+ let inner = request. into_inner ( ) ;
297
+ let executor_ids = & inner. executor_ids ;
298
+ let fragment_ids = HashSet :: from_iter ( inner. dispatcher_fragment_ids . into_iter ( ) ) ;
297
299
let stream_node_output_row_count = metrics
298
300
. mem_stream_node_output_row_count
299
301
. collect ( executor_ids) ;
300
302
let stream_node_output_blocking_duration_ms = metrics
301
303
. mem_stream_node_output_blocking_duration_ms
302
304
. collect ( executor_ids) ;
305
+
306
+ // Collect count metrics by fragment_ids
307
+ fn collect_by_fragment_ids < T : Collector > (
308
+ m : & T ,
309
+ fragment_ids : & HashSet < u32 > ,
310
+ ) -> HashMap < u32 , u64 > {
311
+ let mut metrics = HashMap :: new ( ) ;
312
+ for mut metric_family in m. collect ( ) {
313
+ for metric in metric_family. take_metric ( ) {
314
+ let fragment_id = get_label_infallible ( & metric, "fragment_id" ) ;
315
+ if fragment_ids. contains ( & fragment_id) {
316
+ let entry = metrics. entry ( fragment_id) . or_insert ( 0 ) ;
317
+ * entry += metric. get_counter ( ) . get_value ( ) as u64 ;
318
+ }
319
+ }
320
+ }
321
+ metrics
322
+ }
323
+
324
+ let dispatch_fragment_output_row_count =
325
+ collect_by_fragment_ids ( & metrics. actor_out_record_cnt , & fragment_ids) ;
326
+ let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids (
327
+ & metrics. actor_output_buffer_blocking_duration_ns ,
328
+ & fragment_ids,
329
+ ) ;
303
330
Ok ( Response :: new ( GetProfileStatsResponse {
304
331
stream_node_output_row_count,
305
332
stream_node_output_blocking_duration_ms,
333
+ dispatch_fragment_output_row_count,
334
+ dispatch_fragment_output_blocking_duration_ns,
306
335
} ) )
307
336
}
308
337
@@ -322,27 +351,14 @@ impl MonitorService for MonitorServiceImpl {
322
351
. into_vec ( )
323
352
}
324
353
325
- // Must ensure the label exists and can be parsed into `T`
326
- fn get_label < T : std:: str:: FromStr > ( metric : & Metric , label : & str ) -> T {
327
- metric
328
- . get_label ( )
329
- . iter ( )
330
- . find ( |lp| lp. get_name ( ) == label)
331
- . unwrap ( )
332
- . get_value ( )
333
- . parse :: < T > ( )
334
- . ok ( )
335
- . unwrap ( )
336
- }
337
-
338
354
let actor_output_buffer_blocking_duration_ns =
339
355
collect ( & metrics. actor_output_buffer_blocking_duration_ns ) ;
340
356
let actor_count = collect ( & metrics. actor_count ) ;
341
357
342
358
let actor_count: HashMap < _ , _ > = actor_count
343
359
. iter ( )
344
360
. map ( |m| {
345
- let fragment_id: u32 = get_label ( m, "fragment_id" ) ;
361
+ let fragment_id: u32 = get_label_infallible ( m, "fragment_id" ) ;
346
362
let count = m. get_gauge ( ) . get_value ( ) as u32 ;
347
363
( fragment_id, count)
348
364
} )
@@ -361,7 +377,7 @@ impl MonitorService for MonitorServiceImpl {
361
377
362
378
let actor_current_epoch = collect ( & metrics. actor_current_epoch ) ;
363
379
for m in & actor_current_epoch {
364
- let fragment_id: u32 = get_label ( m, "fragment_id" ) ;
380
+ let fragment_id: u32 = get_label_infallible ( m, "fragment_id" ) ;
365
381
let epoch = m. get_gauge ( ) . get_value ( ) as u64 ;
366
382
if let Some ( s) = fragment_stats. get_mut ( & fragment_id) {
367
383
s. current_epoch = if s. current_epoch == 0 {
@@ -380,7 +396,7 @@ impl MonitorService for MonitorServiceImpl {
380
396
let mut relation_stats: HashMap < u32 , RelationStats > = HashMap :: new ( ) ;
381
397
let mview_current_epoch = collect ( & metrics. materialize_current_epoch ) ;
382
398
for m in & mview_current_epoch {
383
- let table_id: u32 = get_label ( m, "table_id" ) ;
399
+ let table_id: u32 = get_label_infallible ( m, "table_id" ) ;
384
400
let epoch = m. get_gauge ( ) . get_value ( ) as u64 ;
385
401
if let Some ( s) = relation_stats. get_mut ( & table_id) {
386
402
s. current_epoch = if s. current_epoch == 0 {
@@ -403,8 +419,9 @@ impl MonitorService for MonitorServiceImpl {
403
419
let mut channel_stats: BTreeMap < String , ChannelStats > = BTreeMap :: new ( ) ;
404
420
405
421
for metric in actor_output_buffer_blocking_duration_ns {
406
- let fragment_id: u32 = get_label ( & metric, "fragment_id" ) ;
407
- let downstream_fragment_id: u32 = get_label ( & metric, "downstream_fragment_id" ) ;
422
+ let fragment_id: u32 = get_label_infallible ( & metric, "fragment_id" ) ;
423
+ let downstream_fragment_id: u32 =
424
+ get_label_infallible ( & metric, "downstream_fragment_id" ) ;
408
425
409
426
let key = format ! ( "{}_{}" , fragment_id, downstream_fragment_id) ;
410
427
let channel_stat = channel_stats. entry ( key) . or_insert_with ( || ChannelStats {
@@ -416,17 +433,18 @@ impl MonitorService for MonitorServiceImpl {
416
433
417
434
// When metrics level is Debug, `actor_id` will be removed to reduce metrics.
418
435
// See `src/common/metrics/src/relabeled_metric.rs`
419
- channel_stat. actor_count += if get_label :: < String > ( & metric, "actor_id" ) . is_empty ( ) {
420
- actor_count[ & fragment_id]
421
- } else {
422
- 1
423
- } ;
436
+ channel_stat. actor_count +=
437
+ if get_label_infallible :: < String > ( & metric, "actor_id" ) . is_empty ( ) {
438
+ actor_count[ & fragment_id]
439
+ } else {
440
+ 1
441
+ } ;
424
442
channel_stat. output_blocking_duration += metric. get_counter ( ) . get_value ( ) ;
425
443
}
426
444
427
445
let actor_output_row_count = collect ( & metrics. actor_out_record_cnt ) ;
428
446
for metric in actor_output_row_count {
429
- let fragment_id: u32 = get_label ( & metric, "fragment_id" ) ;
447
+ let fragment_id: u32 = get_label_infallible ( & metric, "fragment_id" ) ;
430
448
431
449
// Find out and write to all downstream channels
432
450
let key_prefix = format ! ( "{}_" , fragment_id) ;
@@ -438,8 +456,8 @@ impl MonitorService for MonitorServiceImpl {
438
456
439
457
let actor_input_row_count = collect ( & metrics. actor_in_record_cnt ) ;
440
458
for metric in actor_input_row_count {
441
- let upstream_fragment_id: u32 = get_label ( & metric, "upstream_fragment_id" ) ;
442
- let fragment_id: u32 = get_label ( & metric, "fragment_id" ) ;
459
+ let upstream_fragment_id: u32 = get_label_infallible ( & metric, "upstream_fragment_id" ) ;
460
+ let fragment_id: u32 = get_label_infallible ( & metric, "fragment_id" ) ;
443
461
444
462
let key = format ! ( "{}_{}" , upstream_fragment_id, fragment_id) ;
445
463
if let Some ( s) = channel_stats. get_mut ( & key) {
@@ -529,6 +547,7 @@ impl MonitorService for MonitorServiceImpl {
529
547
}
530
548
531
549
pub use grpc_middleware:: * ;
550
+ use risingwave_common:: metrics:: get_label_infallible;
532
551
533
552
pub mod grpc_middleware {
534
553
use std:: sync:: Arc ;
0 commit comments